In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
This article is about the case studies of Redis and cohort. The editor thought it was very practical, so I shared it with you as a reference. Let's follow the editor and have a look.
Summary
Redis can be used not only as a cache server, but also as a message queue. Its list type is inherently supported as a message queue. As shown in the following figure:
Because the list of Redis is implemented using a two-way linked list, saving the head and tail nodes, it is very fast to insert elements on both sides of the list.
Common queue implementation
So you can use Redis's List to implement message queues directly, with two simple instructions, lpush and rpop, or rpush and lpop. A simple example is as follows:
Store the message side (message producer):
Package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; import java.util.concurrent.TimeUnit; / * message producer * @ author yamikaze * / public class Producer extends Thread {public static final String MESSAGE_KEY = "message:queue"; private Jedis jedis; private String producerName; private volatile int count; public Producer (String name) {this.producerName = name; init () } private void init () {jedis = MyJedisFactory.getLocalJedis ();} public void putMessage (String message) {Long size = jedis.lpush (MESSAGE_KEY, message); System.out.println (producerName + ": the number of messages currently outstanding is:" + size); count++;} public int getCount () {return count @ Override public void run () {try {while (true) {putMessage (StringUtils.generate32Str ()); TimeUnit.SECONDS.sleep (1);}} catch (InterruptedException e) {} catch (Exception e) {e.printStackTrace () } public static void main (String [] args) throws InterruptedException {Producer producer = new Producer ("myProducer"); producer.start (); for (;;) {System.out.println ("main: number of stored messages:" + producer.getCount ()); TimeUnit.SECONDS.sleep (10);}
Message processing side (message consumer):
Package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; / * message consumer * @ author yamikaze * / public class Customer extends Thread {private String customerName; private volatile int count; private Jedis jedis; public Customer (String name) {this.customerName = name; init ();} private void init () {jedis = MyJedisFactory.getLocalJedis () } public void processMessage () {String message = jedis.rpop (Producer.MESSAGE_KEY); if (message! = null) {count++; handle (message);}} public void handle (String message) {System.out.println (customerName + "processing message, message content:" + message + "this is Article" + count+ ") } @ Override public void run () {while (true) {processMessage ();}} public static void main (String [] args) {Customer customer = new Customer ("yamikaze"); customer.start ();}}
It seems not bad, but there is a problem with the message consumer in the above example, that is, you need to constantly call the rpop method to see if the message in the List is waiting to be processed. Each call initiates a connection, which can cause unnecessary waste. You might use methods such as Thread.sleep () to get consumer threads to spend again at regular intervals, but there are two problems with doing so:
1) if the speed of the producer is faster than that of the consumer, the length of the message queue will increase all the time, and it will take up a lot of memory space over time.
2) if the sleep time is too long, it can not deal with some timely messages, and if the sleep time is too short, it will also cause a lot of overhead on the connection.
So you can use the brpop instruction, which returns only if there is an element, and if not, it blocks until the timeout returns null, so the consumer can change the processMessage to this:
Public void processMessage () {/ * brpop supports multiple lists (queues) * brpop instructions support queue priorities, such as in this case MESSAGE_KEY has a higher priority than testKey (sequentially determined). * if there are elements in both lists, the elements in the list with high priority will be returned first, so returning MESSAGE_KEY * 0 first means there is no limit to waiting, and it will always be blocked here * / List messages = jedis.brpop (0, Producer.MESSAGE_KEY, "testKey"). If (messages.size ()! = 0) {/ / because this instruction can listen on multiple Key, it returns a list / / list consisting of 2 items, 1) list name, 2) data String keyName = messages.get (0) / / if the message returned is MESSAGE_KEY if (Producer.MESSAGE_KEY.equals (keyName)) {String message = messages.get (1); handle (message);}} System.out.println ("= =");}
Then you can run Customer, empty the console, and you can see that the program has no output and is blocked here in brpop. Then in the client that opens Redis, enter the instruction client list to see that there are currently two connections.
A queue that produces multiple consumption at a time.
In addition to supporting message queuing, Redis provides a set of commands to support the publish / subscribe mode. The queue of multiple consumption can be produced at one time by using the pub/sub mode of Redis.
1) publish
The PUBLISH directive can be used to publish a message in the format PUBLISH channel message
The return value indicates the number of subscriptions to the message.
2) subscribe
The SUBSCRIBE instruction is used to receive a message in the format SUBSCRIBE channel
You can see that you entered subscription mode after using the SUBSCRIBE instruction, but did not receive the message sent by publish, because the subscription received it only before the message was sent. For other instructions in this mode, you can only see the reply. There are three types of responses:
1. If subscribe, the second value represents the subscribed channel, and the third value indicates the number of subscribed channels? (understood as a serial number?)
2. If message (message), the second value is the channel that generated the message, and the third value is the message
3. In the case of unsubscribe, the second value represents the unsubscribed channel, and the third value represents the number of subscriptions to the current client.
You can use the instruction UNSUBSCRIBE to unsubscribe. If no parameter is added, all channels subscribed by the SUBSCRIBE directive will be unsubscribed.
Redis also supports wildcard-based message subscriptions, using the directive PSUBSCRIBE (pattern subscribe), for example:
Try the push message again and you will get the following result:
You can see that the publish instruction returns 2, while the subscriber side receives the message twice. This is because the PSUBSCRIBE instruction can repeat subscription channels. Channels subscribed with the PSUBSCRIBE instruction also use the instruction PUNSUBSCRIBE instruction to unsubscribe, which cannot unsubscribe to the channel subscribed by SUBSCRIBE, and similarly, UNSUBSCRIBE cannot unsubscribe to the channel subscribed by the PSUBSCRIBE instruction. At the same time, PUNSUBSCRIBE instructs that wildcards are not expanded.
For example: PUNSUBSCRIBE * will not match to channel.*, so if you want to unsubscribe to channel.*, you should write PUBSUBSCRIBE channel.*.
The code example is as follows:
Package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; / * message publisher * @ author yamikaze * / public class Publisher {public static final String CHANNEL_KEY = "channel:message"; private Jedis jedis; public Publisher () {jedis = MyJedisFactory.getLocalJedis () } public void publishMessage (String message) {if (StringUtils.isBlank (message)) {return;} jedis.publish (CHANNEL_KEY, message);} public static void main (String [] args) {Publisher publisher = new Publisher (); publisher.publishMessage ("Hello Redis!");}}
Simply send a message.
Message subscriber:
Package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; import java.util.concurrent.TimeUnit; / * message subscriber client * @ author yamikaze * / public class SubscribeClient {private Jedis jedis; private static final String EXIT_COMMAND = "exit"; public SubscribeClient () {jedis = MyJedisFactory.getLocalJedis () } public void subscribe (String... channel) {if (channel = = null | | channel.length = score) {String element = tuple.getElement (); Long orderId = jedis.zrem ("orderId", element) If (orderId > 0) {System.out.println (new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .format (new Date ()) + ": redis consumed a task: the order OrderId for consumption is" + element) }} static class DelayMessage implements Runnable {@ Override public void run () {try {cdl.await (); consumerDelayMessage ();} catch (InterruptedException e) {e.printStackTrace () } public static void main (String [] args) {AppTest appTest = new AppTest (); appTest.productionDelayMessage (); for (int I = 0; I < 10; iTunes +) {new Thread (new DelayMessage ()). Start (); cdl.countDown ();}
The results are as follows:
Thank you for reading! This is the end of the case study on Redis and queue. I hope the above content can be of some help to you, so that you can learn more knowledge. If you think the article is good, you can share it and let more people see it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.