In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the knowledge of "how to achieve a delay queue". In the operation of practical cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Definition of delay queue
First of all, I believe everyone is familiar with the data structure of queue, which is a first-in-first-out data structure. The elements in the ordinary queue are ordered, and the elements that enter the queue first will be preferentially taken out for consumption.
The biggest difference between the delay queue and the ordinary queue is reflected in its delay attribute. The elements of the ordinary queue are first-in, first-out, and processed according to the order of joining the queue, while the elements in the delay queue specify a delay time when they join the queue. Indicates that they want to be able to process after the specified time. In a sense, the structure of delay queue is not like a queue, but more like an ordered heap structure weighted by time.
Application scenario
The usage scenario I encountered when developing business requirements is like this. Users can subscribe to different Wechat or QQ template messages in Mini Program. Product students can create a message push plan on the management side of Mini Program, and push messages to all users who subscribe to template messages when the specified time node is reached.
If it is only a single Mini Program service, then perhaps a scheduled task, or even manual timing execution can be the most convenient and fastest to complete this requirement, but we hope to be able to abstract a message subscription module service for all businesses to use, then we need a general system solution, this time we need to use delay queue.
In addition to the typical requirements I have encountered above, the application scenarios of delay queues are actually very extensive, such as the following scenarios:
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
If the new order is not paid within 15 minutes, it will be cancelled automatically.
The company's meeting reservation system will notify all users who have booked the meeting half an hour before the start of the meeting after the meeting is booked successfully.
If the safety work order is not processed for more than 24 hours, the WeChat group of enterprises will be automatically pulled to remind the relevant responsible persons.
After users place an order for takeout, they will be reminded that the takeout boy is about to time out when there are still 10 minutes left before the timeout.
For scenarios where the amount of data is small and the timeliness requirement is not so high, a relatively simple way is to poll the database, for example, to poll all the data in the database every second to deal with all the expired data. For example, if I were the developer of the company's internal meeting reservation system, I might adopt this scheme. Because the amount of data in the whole system must not be very large, and there is not much difference between 30 minutes in advance and 29 minutes in advance before the start of the meeting.
However, if the amount of data to be processed is relatively large, real-time requirements are relatively high, such as the automatic timeout of unpaid within 15 minutes of all new orders on Taobao every day, of an order of magnitude as high as one million or even tens of millions. At this time, if you dare to poll the database, you are afraid to be killed by your boss, or you will probably be killed by operation and maintenance classmates if you are not beaten to death by your boss.
In this scenario, we need to use our protagonist today, the delay queue. The delay queue is listed as we provide an efficient solution for dealing with a large number of messages that require delayed consumption. Without saying much, let's take a look at several common solutions for delayed queuing and their respective advantages and disadvantages.
Implementation scheme Redis ZSet
We know that Redis has an ordered set of data structures in ZSet,ZSet that each element has a corresponding Score,ZSet that all elements are sorted by their Score.
Then we can use Redis's ZSet to implement a delay queue through the following operations:
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
Queue operation: ZADD KEY timestamp task, we will need to deal with the task, according to its need to delay the processing time to join the ZSet as Score. The time complexity of Redis's ZAdd is O (logN), and N is the number of elements in ZSet, so we can join the queue relatively efficiently.
Start a process (for example, every other second) to query the element with the smallest Score in ZSet through the ZREANGEBYSCORE method, the specific operation is: ZRANGEBYSCORE KEY-inf + inf limit 0 1 WITHSCORES. There are two cases of query results:
a. The score obtained by the query is less than or equal to the current timestamp, indicating that the task needs to be executed, and the task will be processed asynchronously.
b. The score of the query is greater than the current timestamp. Since the query operation just took out the element with the lowest score, it means that all the tasks in the ZSet have not yet reached the time to be executed, then continue the query after dormant for one second.
Similarly, the time complexity of ZRANGEBYSCORE operation is O (logN + M), where N is the number of elements in ZSet and M is the number of elements in query, so our regular query operation is also more efficient.
Here we carry a set of back-end architecture of Redis to implement delay queue from the Internet, which makes a series of optimizations on the original ZSet implementation of Redis to make the whole system more stable, robust, able to cope with high concurrency scenarios, and have better scalability, which is a good architecture design. The overall architecture diagram is as follows:
Its core design ideas are as follows:
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
Routing delayed message tasks to different Redis Key via the hash algorithm has two major benefits:
a. It avoids the problem that when a KEY stores more delayed messages, the queue operation and query operation become slower (the time complexity of both operations is O (logN)).
b. The system has better horizontal scalability. When the amount of data surges, we can quickly expand the whole system by increasing the number of Redis Key to resist the growth of the amount of data.
Each Redis Key corresponds to the establishment of a processing process, called the Event process, which polls the Key through the ZRANGEBYSCORE method described in step 2 above to find out whether there are delayed messages to be processed.
All Event processes are only responsible for distributing messages, and the specific business logic is handled asynchronously through an additional message queue, and the benefits are obvious:
a. On the one hand, the Event process is only responsible for distributing messages, so its processing speed is very fast, and messages are less likely to pile up because of the complexity of business logic.
b. On the other hand, after using an additional message queue, the scalability of message processing will be better, and we can expand the message processing capacity of the whole system by increasing the number of consumer processes.
The Event process uses Zookeeper to select the main single process to deploy to avoid the accumulation of messages in Redis Key after the downtime of the Event process. Once the leader host of Zookeeper goes down, Zookeeper automatically selects a new leader host to process messages in Redis Key.
From the above discussion, we can see that the implementation of delay queue through Redis Zset is a more intuitive solution that can land quickly. And we can rely on the persistence of Redis itself to achieve persistence, using Redis clusters to support high concurrency and high availability is a good implementation of delay queues.
RabbitMQ
RabbitMQ itself does not directly provide support for delay queues. We rely on RabbitMQ's TTL and dead-letter queues to achieve the effect of delay queues. Let's first take a look at the dead-letter queue of RabbitMQ and the TTL function.
Dead letter queue
Dead letter queue is actually a message processing mechanism of RabbitMQ. When RabbmitMQ produces and consumes messages, the message will become "dead letter" when it encounters the following conditions:
Hongmeng official Strategic Cooperation to build HarmonyOS Technology Community
The message is rejected by basic.reject/ basic.nack and is not re-delivered to requeue=false
Message timed out and not consumed, that is, TTL expired
Message queue reaches maximum length
Once the message becomes a dead letter, it is re-delivered to the dead-letter switch (Dead-Letter-Exchange), and then the dead-letter switch is forwarded to the corresponding dead-letter queue according to the binding rules, and listening to the queue can cause the message to be consumed again.
Message lifetime TTL
TTL (Time-To-Live) is an advanced feature of RabbitMQ that represents the maximum lifetime of a message in milliseconds. If a message is not consumed within the time set by TTL, it becomes a dead letter and enters the dead letter queue we mentioned above.
There are two different ways to set the TTL attribute of a message. One way is to set the TTL expiration time of the entire queue directly when creating the queue. All messages entering the queue are set to a uniform expiration time. Once the message expires, it will be discarded and enter the dead letter queue. The reference code is as follows:
Map args = new HashMap (); args.put ("x-message-ttl", 6000); channel.queueDeclare (queueName, durable, exclusive, autoDelete, args)
This method is more suitable when the delay time of the delay queue is a fixed value.
Another way is to set it for a single message. The reference code is as follows. The expiration time of the message is set to 6 seconds:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder (); builder.expiration ("6000"); AMQP.BasicProperties properties = builder.build (); channel.basicPublish (exchangeName, routingKey, mandatory, properties, "msg content" .getBytes ())
If we need to set different delay times for different messages, the above TTL setting for the queue will not meet our needs, and we need to use this TTL setting for a single message.
It is important to note, however, that with the TTL set in this way, the message may not die on time, because RabbitMQ only checks whether the first message is out of date. For example, if the first message sets a 20s TTL and the second message sets a 10s TTL, RabbitMQ will wait until the first message expires before allowing the second message to expire.
The solution to this problem is also simple: you only need to install a plug-in for RabbitMQ:
Https://www.rabbitmq.com/community-plugins.html
After installing this plug-in, all messages will expire according to the set TTL.
Implementation of delay queue by RabbitMQ
Well, after introducing the dead letter queue and TTL of RabbitMQ, we are only one step away from implementing the delay queue.
Smart readers may have discovered that TTL is the time when messages in a delay queue are delayed. If we set TTL as its delay time and deliver it to the ordinary queue of RabbitMQ without consuming it, then after the time of TTL, the message will be automatically delivered to the dead letter queue. At this time, we use the consumer process to consume the messages in the dead letter queue in real time, thus achieving the effect of delay queue.
From the following figure, you can directly see the overall process of implementing a delay queue using RabbitMQ:
Using RabbitMQ to implement delay queues, we can make good use of some features of RabbitMQ, such as reliable message delivery, reliable message delivery, and dead letter queue to ensure that messages are consumed at least once and messages that are not properly processed will not be discarded. In addition, through the characteristics of RabbitMQ cluster, the problem of single point of failure can be well solved, and the delay queue will not be unavailable or messages will not be lost due to the failure of a single node.
TimeWheel
TimeWheel time wheel algorithm is an ingenious and efficient algorithm to implement delay queue, which is used in various frameworks such as Netty,Zookeeper,Kafka.
Time wheel
As shown in the figure above, the time wheel is a circular queue that stores delayed messages, and its underlying layer is implemented in an array, which can be traversed efficiently. Each element in this circular queue corresponds to a delayed task list, which is a two-way circular linked list, each item in which represents a delayed task that needs to be executed.
The time wheel has a dial pointer that represents the time currently referred to by the time wheel, which moves forward over time and handles the list of delayed tasks at the corresponding location.
Add a deferred task
Because the size of the time wheel is fixed and each element in the time wheel is a two-way circular linked list, we can add delayed tasks to the time wheel under the time complexity of O (1).
In the following figure, for example, we have a time wheel like this. When the dial pointer points to the current time 2, we need to add a new task with a delay of 3 seconds. We can quickly calculate that the position of the delayed task in the time wheel is 5 and add it to the end of the task list on position 5.
Multi-layer time wheel
Everything has been great so far, but careful students may have found that the size of the time wheel above is fixed, only 12 seconds. What should we do if we have a task that needs to be delayed by 200 seconds? Directly expand the size of the entire time wheel? This is obviously not desirable, because if we do so, we need to maintain a very large time wheel, memory is unacceptable, and when the underlying array is large, the addressing efficiency will be reduced, affecting performance.
For this reason, Kafka introduces the concept of multi-layer time wheel. In fact, the concept of multi-layer time wheel is very similar to the concept of hour hand, minute hand and second hand on our mechanical watch. when the current time cannot be expressed by second hand alone, it is expressed by minute hand combined with second hand. Similarly, when the expiration time of a task exceeds the time range represented by the current time wheel, an attempt is made to add it to the upper time wheel, as shown in the following figure:
The time range represented by the whole time wheel of the first layer is 0-12 seconds, and the time range represented by each grid of the second layer time wheel is 12 seconds, that is, 12 seconds. So the time range that the whole second layer time wheel can represent is 12-12-144 seconds, and the range of the third layer time wheel is 1728 seconds, the fourth layer is 20736 seconds, and so on.
For example, now we need to add a delay message with a delay of 200 seconds, and we find that it has exceeded the time range represented by the first time wheel, so we need to continue to look at the upper time round. Add it to the position of the second time round 200max 12 = 17, and then we find that 17 also exceeds the range of the second time round, so we need to move on to the upper level. Add it to the position of 17thumb 12 = 2 of the third tier time wheel.
The core process of adding deferred tasks and promoting time round scrolling in Kafka is as follows, where Bucket is the delayed task queue in the time round, and DelayQueue introduced by Kafka solves the problem of inefficient time round scrolling caused by the empty Bucket:
The delay queue implemented by the time wheel can support the efficient trigger of a large number of tasks. And in the implementation of Kafka's time wheel algorithm, DelayQueue is also introduced, which uses DelayQueue to push the time wheel roll, while the add and delete operations of delayed tasks are placed in the time round, this design greatly improves the execution efficiency of the whole delay queue.
Summary
Delay queue is widely used in our daily development. This paper introduces three different schemes to implement delay queue, each of which has its own characteristics. For example, the implementation scheme of Redis is the easiest to understand and can land quickly, but Redis is based on memory after all. Although there is a data persistence scheme, there is still the possibility of data loss. The implementation scheme of RabbitMQ, due to the characteristics of RabbitMQ itself, such as reliable message transmission, reliable message delivery and dead letter queue, can guarantee that the message is consumed at least once and the message that has not been processed correctly will not be discarded, so that the reliability of the message is guaranteed. Finally, the time wheel algorithm of Kafka is the most difficult to understand among the three implementation schemes, but it is also a very ingenious one.
This is the end of "how to implement a delay queue". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.