Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement message queue and delayed message queue in Redis

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article will explain in detail how to implement message queuing and delayed message queuing in Redis. The editor thinks it is very practical, so I share it with you for reference. I hope you can get something after reading this article.

Several commands of list

Lpush (left push)

Stored on the left side of the queue

Rpush (right push)

Stored on the right side of the queue

Lpop (left pop)

Taken out from the left side of the queue

Rpop (right pop)

Taken out from the right side of the queue

The above four commands allow list to help us implement the queue or stack. The queue is first-in, first-out, and stack is first-in, first-out.

So the queue can be implemented using lpush + rpop or rpush + lpop

The implementation of the stack is lpush + lpop or rpush + rpop.

Use commands to demonstrate queues

Producers release messages

First of all, we use rpush to add five elements to a queue called notify-queue, that is, 12345, that is, to publish messages as producers.

Consumer consumption news

Since producers use rpush, consumers need to use lpop. You can take a look at the following figure. We continue to consume messages on notify-queue, and read them sequentially, from 1 to 5. Finally, there are no messages in the queue, and the pop-up is always empty.

Empty polling problem

When using lpop to consume messages above, we can see that every time we go to pop after message consumption, all we read is an empty message.

The above command is executed manually, but if the written code program keeps going to pop data (pulling data), it will cause empty polling (useless reading)

It not only increases the CPU consumption of the client, but also increases the QPS of redis, and it is also useless operations, which may cause other clients to respond slowly to redis access.

Solution A (hibernation)

Since empty polling will make the resource consumption of both the client and redis become higher, we can let the client sleep for 1s when it receives empty data, and then pull the data after 1s, which can reduce the consumption.

Thread.sleep (1000)

This scheme is also flawed, that is, the delay of message consumption increases. If there is only one consumer, the delay is 1s, that is, after empty polling, it happens to be dormant, but at this time there is news coming. You still have to wait until 1s to wake up before you can spend.

If there are multiple consumers, because the sleep time of each consumer is divergent, it will reduce some delay, but is there a better way to achieve almost zero delay?

Solution B (blocking reads)

In redis, there are actually two commands about fetching data from the queue, that is, blocking read.

Blpop (blocking left pop)

Brpop (blocking right pop)

Blocking read will go into hibernation when the queue has no data. Once a message comes, it will react immediately and read the data, so replacing lpop/rpop with blpop/brpop can solve the problem of message delay.

Continue to join the team with 3 attributes, 6, 7, 8

Use blpop to read the queue. The last parameter is the waiting time for blocking the read. If there is no message beyond this time, nil will be returned. You can continue to repeat the blpop operation at this time.

The problem of automatic disconnection of idle connections blocking reads

When the client uses blocking reading, if the blocking time is too long, the service will generally be treated as an idle connection, thus actively disconnecting it and reducing the resource consumption of useless connections. At this time, the client will throw an exception.

So note that when the client uses blocking reads, exceptions are caught so that they can be handled accordingly, such as retry.

Java client implements message queuing

The idea is the same as above, but from the command line client redis-cli to java language, one or more threads to publish rpush

One or more threads consume blpop, and the completed code is: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue

Publisher

Subscriber

Implementation of delay queue

Delay queue means that after a period of time after the message is sent, it is consumed by the consumer, rather than after it is sent, the consumer can immediately read the

Zset can help us do this. First of all, zset can be sorted through score, and score can save a timestamp, so every time we publish a message, we use the current timestamp plus a delayed timestamp.

Then, when consumers retrieve the message, they intercept the data of zset to get the message that has satisfied the current time (that is, take the data whose score is less than or equal to the current timestamp, and the score is less than or equal to the current timestamp means that the message has expired. If it is greater than that, it will take a while to consume).

Key commands zadd (publisher), zrangebyscore (subscriber), zrem (delete after subscriber consumes data)

Command implementation

We used zadd to add four pieces of data, namely, 1, 2, 3 seconds (fake saying, this is actually just a score), and a kafka that can only be consumed after 10 seconds.

If we have already reached the third second, we take the data in zset that is greater than or equal to 1 second and less than or equal to 3 seconds, because the data in this interval is exactly what we can consume, and we can see that we have taken out three pieces of data that meet the conditions.

If you can only consume one data at a time, you can add a limit restriction. You can take out the first consumable data in the following figure, redis.

Also note that it is different from list's lpop/ and blpop (they pop up and automatically delete the data in the original queue)

Although the data is obtained, it will be read by others if it is not deleted by zrem, because it still exists in zset.

However, zrem may be deleted (consumed) by others, so the code still needs to judge whether the return value of zrem is greater than 0 and whether we are successful in preemption of this message, and then consume correctly after success.

Code implementation

Publisher

Subscriber

Test delay effect

Full code address: https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue

Optimization, implemented using lua

In the delay queue implemented above, there is a problem, that is, when you use zrem to determine whether to grab this data, you may not get it. If you continue to read it, you may not be able to grab it for several rounds, and resources will be wasted, so you can optimize it through lua scripts.

Let zrangebyscore and zrem become an atomized operation, which can avoid multi-threaded competition and waste of resources that can not be grabbed.

This is the end of the article on "how to implement message queuing and delayed message queuing in Redis". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report