Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use message queuing in Redis

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces how to use message queue in Redis, which has a certain reference value. Interested friends can refer to it. I hope you can learn a lot after reading this article.

When it comes to message queuing middleware, we all think of RabbitMQ, RocketMQ and Kafka to implement asynchronous messaging for applications. These are professional message queuing middleware with more features than we can understand.

However, the use of these message middleware is complex, such as RabbitMQ, create Exchange before sending messages, create Queue, then bind Exchange and Queue through some rules, make routing-key when sending messages, and control header messages. This is just the producer, and the consumer has to repeat the above series of tedious steps before consuming the message.

Redis's message queue is not a professional message queue, it does not have many of the advanced features of message queuing, nor does it have an ack guarantee. If you have the ultimate pursuit of message reliability, please turn to professional MQ middleware.

Asynchronous message queuing

Problem 1: empty queue

For pop operations, when the message queue is empty, the client will fall into a dead loop of pop, resulting in a large number of life-wasting empty polling, causing the client CPU to pull up, while the QPS of Redis is also pulled up.

For blocking reads, the queue goes to sleep when there is no data, and wakes up as soon as the data arrives. The above problem is solved perfectly.

Problem 2: idle connection is disconnected

The scheme of blocking reads seems perfect, which leads to another problem: idle connections. If the thread is blocked all the time, the client connection to Redis becomes an idle connection. If the idle time is too long, the Redis server will actively disconnect to reduce the occupation of idle resources. At this point, blpop/brpop will throw an exception.

Therefore, we need to be careful when writing client (application) consumers, catch exceptions, and try again.

Application 1: delay queue

There are generally three strategies for handling lock failures in Redis's distributed locks:

An exception is thrown directly, and the front end reminds the user whether to continue the operation

Sleep, try again later.

Put the request in the delay queue and try again later

The delay queue in Redis can be realized by zset (ordered list) data structure. We serialize the message as a string as the value of zse, and the expiration processing time (delay time) of the message as score. Then the expiration time is obtained by polling the zset for processing, and the key is removed from the zset through zrem to represent the successful consumption, and then the task is processed.

The core code is as follows:

/ / production\ public void delay (T msg) {\ TaskItem task = new TaskItem ();\ task.id = UUID.randomUUID (). ToString (); / / assign a unique uuid\ task.msg = msg;\ String s = JSON.toJSONString (task); / / fastjson serialization\ jedis.zadd (queueKey, System.currentTimeMillis () + 5000, s) / / plug into the delay queue and try again after 5 seconds.\ / consume\ public void loop () {\ while (! Thread.interrupted ()) {\ / / zrangeByScore parameter 0. System.currentTimeMills () represents the data from redis to score in the range from 0 to the current time of the system. 0 inf means to take an extended incoming score as-inf from 0, and + inf represents the maximum and minimum values in the zset, respectively. \ if (values.isEmpty ()) {\ try {\ Thread.sleep (500); / the break will continue\}\ catch (InterruptedException e) {\ break;\}\ continue;\}\ String s = values.iterator () .next () / / consumption queue\ if (jedis.zrem (queueKey, s) > 0) {/ / captured. To take into account the multi-threaded lock scramble, only rem successfully represents the successful consumption of a message. \ TaskItem task = JSON.parseObject (s, TaskType); / / fastjson deserializes\ this.handleMsg (task.msg);\}

The above code is in multithreading for the situation where the same task is competed by multiple threads, although it is possible to process the task after zrem to avoid the situation that a task is consumed multiple times. But for those threads that get the task but do not consume it successfully, it is a waste of time to take a task. So consider optimizing this logic through lua scripting. Move zrangeByScore and zrem together to the server for atomic operation, and it can be solved perfectly.

Thank you for reading this article carefully. I hope the article "how to use message queue in Redis" shared by the editor will be helpful to you. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report