Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the Pub/Sub pattern of Redis

2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

How to understand the Pub/Sub mode of Redis, in view of this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible method.

Redis also supports the publish / subscribe (Pub/Sub) mode of messages, which is similar to the middleware activemq. Subscribers (Subscriber) can subscribe to the channels they are interested in (Channel), and publishers (Publisher) can send messages to designated channels (Channel). In this way, the sender and receiver of the message can be decoupled. In addition, because of the dynamic Subscribe and Unsubscribe, it can also improve the flexibility and expansibility of the system.

It is assumed that there is an available Redis environment (either single node or cluster).

Using the Pub/Sub of Pub/Sub normal channel in redis-cli

First use a client to subscribe to the channel:

In the above figure, redis-cli is used as the client to connect to Redis, and then the SUBSCRIBE command is used. The following parameters indicate that two channel, china and hongkong, are subscribed. You can see that the output of the "SUBSCRIBE china hongkong" command is six lines (which can be divided into two groups, each with a Message). Because subscriptions, unsubscriptions, and published messages are all sent through Message, the first element of a message is the message type, which can be of the following types:

Subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.

Unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.

Message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.

-- from http://redis.io/topics/pubsub

The subscription command in the figure above will cause messages sent to the two channel to be received by the client. It is important to note that after entering subscribe mode, the redis-cli client will no longer be able to respond to any other commands:

A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.

The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT

-- from http://redis.io/topics/pubsub

The official website says that the client cannot use any commands other than the above commands under subscribe. But I used the above commands in the Subscribe state and did not respond at all. That is, after subscribing to channel using redis-cli, the client will not be able to respond to any commands. Unless you press (ctrl+c), the action is not to unsubscribe, but to exit redis-cli, which will return to the shell command line.

With regard to this situation, I have not found any explanation for this situation on the official website, and many people have asked and searched for it on the Internet. I think the reasonable explanation is:

On this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

-- from http://stackoverflow.com/questions/17621371/redis-unsubscribe

That is to say, the client described on the official website does not contain the redis-cli used here, so it can behave differently from other client. Let's not worry about this problem, but we'll test it with jedis later. )

Next, use a client to publish the message:

You can see that a new client uses the PUBLISH command to post a message called "China News" to the china channel, and then take a look at the subscriber side:

As you can see, this message has been received. As you can see, the first parameter in the received message is the type "message", the second parameter is the channel name "china", and the third parameter is the message content "China News", which is consistent with the structure of the message type mentioned at the beginning.

Pub/Sub of wildcard

Redis also supports subscription and publishing of wildcards. Clients can subscribe to channel messages that satisfy one or more rules, and the corresponding commands are PSUBSCRIBE and PUNSUBSCRIBE. Next, we use another redis-cli client to subscribe to "chi*" channel, as shown in the figure:

Similar to the output of subscribe/unsubscribe, you can see that the first part is the message type "psubscribe", the second part is the subscription rule "chi*", and the third part is the number of rules currently subscribed by the client.

Then post a message to china, the channel, and both subscribers should be able to receive the message:

The actual test results are the same as expected. It should be noted that subscriber 2 subscribed through wildcards and received a message of type "pmessage":

Pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.

-- from http://redis.io/topics/pubsub

The second part is the matching pattern "chi*", the third part is the actual channel name "china", and the fourth part is the message content "China Daily".

Let's post another message to chinnna, and only subscriber 2 can receive the message:

Similarly, after entering subscription mode using PSUBSCRIBE, the redis-cli can no longer listen to any other commands, and to exit this mode, you can only use ctrl+c.

Using Jedis to implement Pub/Sub

Jedis is a Java implementation of the Redis client, which can also be found in http://redis.io/clients#java.

Here, maven is used to manage package dependencies. Since Log4j is used to output logs, log4j's jar package is used:

Redis.clients jedis 2.8.0 log4j log4j 1.2.17

The JedisPubSub abstract class in Jedis provides the ability to subscribe and cancel. To handle events related to subscribing and unsubscribing to certain channel, we need to extend the JedisPubSub class and implement the relevant methods:

Package com.demo.redis;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;public class Subscriber extends JedisPubSub {/ / Note that the abstract class JedisPubSub private static final Logger LOGGER = Logger.getLogger (Subscriber.class); @ Override public void onMessage (String channel, String message) {LOGGER.info (String.format ("Message.) Channel:% s, Msg:% s ", channel, message);} @ Override public void onPMessage (String pattern, String channel, String message) {LOGGER.info (" PMessage. Pattern:% s, Channel:% s, Msg:% s ", pattern, channel, message);} @ Override public void onSubscribe (String channel, int subscribedChannels) {LOGGER.info (" onSubscribe ");} @ Override public void onUnsubscribe (String channel, int subscribedChannels) {LOGGER.info (" onUnsubscribe ") } @ Override public void onPUnsubscribe (String pattern, int subscribedChannels) {LOGGER.info ("onPUnsubscribe");} @ Override public void onPSubscribe (String pattern, int subscribedChannels) {LOGGER.info ("onPSubscribe");}}

With subscribers, we need another publisher:

Package com.demo.redis;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;public class Publisher {private static final Logger LOGGER = Logger.getLogger (Publisher.class); private final Jedis publisherJedis; private final String channel; public Publisher (Jedis publisherJedis, String channel) {this.publisherJedis = publisherJedis; this.channel = channel } / * constantly read the input, then publish it to channel, and stop publishing if you encounter quit. * / public void startPublish () {LOGGER.info ("Type your message (quit for terminate)"); try {BufferedReader reader = new BufferedReader (new InputStreamReader (System.in)); while (true) {String line = reader.readLine (); if (! "quit" .equals (line)) {publisherJedis.publish (channel, line) } else {break;} catch (IOException e) {LOGGER.error ("IO failure while reading input", e);}

For simplicity, the publisher receives input from the console, then publishes the entered message to the specified channel, and if quit is entered, stops publishing the message.

Then there is the main function:

Package com.demo.redis;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;public class Program {public static final String CHANNEL_NAME = "MyChannel"; / / my Redis here is a cluster. Both 192.168.56.101 and 192.168.56.102 can use public static final String REDIS_HOST = "192.168.56.101"; public static final int REDIS_PORT = 7000 Private final static Logger LOGGER = Logger.getLogger (Program.class); private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig (); private final static JedisPool JEDIS_POOL = new JedisPool (POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0); public static void main (String [] args) throws Exception {final Jedis subscriberJedis = JEDIS_POOL.getResource (); final Jedis publisherJedis = JEDIS_POOL.getResource (); final Subscriber subscriber = new Subscriber () / / subscription thread: receive the message new Thread (new Runnable () {public void run () {try {LOGGER.info ("Subscribing to\" MyChannel\ ". This thread will be blocked. "); / / subscribe to messages on the CHANNEL_NAME using subscriber, after which the thread enters subscription mode and blocks. SubscriberJedis.subscribe (subscriber, CHANNEL_NAME); / / the following code LOGGER.info ("Subscription ended.") is executed only when the unsubscribe () method is called;} catch (Exception e) {LOGGER.error ("Subscribing failed.", e) }) .start (); / / main thread: new Publisher (publisherJedis, CHANNEL_NAME) .startPublish (); publisherJedis.close (); / / Unsubscribe subscriber.unsubscribe (); subscriberJedis.close ();}}

The channel name, address and port to connect to the redis are defined in the main class Program, and JedisPool is used to obtain the Jedis instance. Because the subscriber (subscriber) blocks the thread after entering the subscription state, a new thread (new Thread ()) is created as the subscription thread and the main thread is used to publish the message. When the publisher (the new Publisher in the class) stops publishing the message (just enter quit in the console), unsubscribe the subscriber (subscriber.unsubscribe () method). At this point, the subscription thread unblocks, prints the finished log, and exits.

Before running the program, you also need a simple log4j configuration to observe the output:

Log4j.rootLogger=INFO,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d {HH:mm:ss}% m% n

Run Program, and the following is the execution result:

As a result, when the subscriber subscribes, the subscription thread blocks, the Publisher in the main thread receives the input, publishes the message to the MyChannel, and the subscriber who subscribes to the channel receives the message and prints it.

A brief Analysis of Jedis Source Code about using UNSUBSCRIBE

When you start using redis-cli, you cannot use the UNSUBSCRIBE and PUNSUBSCRIBE commands after subscriber has entered the listening state. Now in Jedis, when the subscription thread is blocking, you can unblock it by calling the unsubscribe () method of changing subscriber in the main thread. Check the Jedis source code. In fact, this method simply sends a UNSUBSCRIBE command to redis:

Therefore, the use of the UNSUBSCRIBE command on the "client" is supported here.

About subscribers receiving messages

Before receiving the message, you need to subscribe to the channel. After the subscription is completed, a loop will be executed, which will block until the Client has no number of subscriptions, as shown below:

The other lines omitted in the middle are mainly used to parse the received Redis response. This code also determines the message type of the response based on the first part of the response, then parses the subsequent contents of the response one by one, and finally adjusts the corresponding method back and forth according to the parsed message type and uses the subsequent parsed content as parameters. The omitted content is as follows:

Final byte [] resp = (byte []) firstObj;if (Arrays.equals (SUBSCRIBE.raw, resp)) {subscribedChannels = ((Long) reply.get (2)) .intValue (); final byte [] bchannel = (byte []) reply.get (1); final String strchannel = (bchannel = = null)? Null: SafeEncoder.encode (bchannel); / / call the onSubscribe method, which implements onSubscribe (strchannel, subscribedChannels) in our Subscriber class;} else if (Arrays.equals (UNSUBSCRIBE.raw, resp)) {subscribedChannels = ((Long) reply.get (2)). IntValue (); final byte [] bchannel = (byte []) reply.get (1); final String strchannel = (bchannel = null)? Null: SafeEncoder.encode (bchannel); / / call the onUnsubscribe method, which implements onUnsubscribe (strchannel, subscribedChannels) in our Subscriber class;} else if (Arrays.equals (MESSAGE.raw, resp)) {final byte [] bchannel = (byte []) reply.get (1); final byte [] bmesg = (byte []) reply.get (2); final String strchannel = (bchannel = null)? Null: SafeEncoder.encode (bchannel); final String strmesg = (bmesg = = null)? Null: SafeEncoder.encode (bmesg); / / call the onMessage method, which implements onMessage (strchannel, strmesg) in our Subscriber class;} else if (Arrays.equals (PMESSAGE.raw, resp)) {final byte [] bpattern = (byte []) reply.get (1); final byte [] bchannel = (byte []) reply.get (2); final byte [] bmesg = (byte []) reply.get (3); final String strpattern = (bpattern = = null)? Null: SafeEncoder.encode (bpattern); final String strchannel = (bchannel = = null)? Null: SafeEncoder.encode (bchannel); final String strmesg = (bmesg = = null)? Null: SafeEncoder.encode (bmesg); / / call onPMessage method, which implements onPMessage (strpattern, strchannel, strmesg) in our Subscriber class;} else if (Arrays.equals (PSUBSCRIBE.raw, resp)) {subscribedChannels = ((Long) reply.get (2)). IntValue (); final byte [] bpattern = (byte []) reply.get (1); final String strpattern = (bpattern = = null)? Null: SafeEncoder.encode (bpattern); onPSubscribe (strpattern, subscribedChannels);} else if (Arrays.equals (PUNSUBSCRIBE.raw, resp)) {subscribedChannels = ((Long) reply.get (2)). IntValue (); final byte [] bpattern = (byte []) reply.get (1); final String strpattern = (bpattern = = null)? Null: SafeEncoder.encode (bpattern); / / call the onPUnsubscribe method, which implements onPUnsubscribe (strpattern, subscribedChannels) in our Subscriber class;} else {/ / for return message types that are not defined by other Redis, directly report throw new JedisException ("Unknown message type:" + firstObj);}

These are the reasons why we need to implement these methods in Subscriber (these methods are not abstract, you can choose the methods used by the implementation).

This is the answer to the question on how to understand Redis's Pub/Sub mode. I hope the above content can be of some help to you. If you still have a lot of doubts to solve, you can follow the industry information channel to learn more about it.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report