In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the knowledge of "how to achieve delay queue in RabbitMQ". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
What is a delay queue?
Delay queue: as its name implies, it is a queue used for message delay consumption. But it is also an ordinary queue, so it has the characteristics of an ordinary queue, in contrast, the characteristic of delay is its biggest feature. The so-called delay is to delay how long the message we need will be consumed. Ordinary queues are consumed instantly, while delayed queues are consumed according to the delay time and how long it will take.
Delay queue usage scenario
If the order is not paid within ten minutes, it will be cancelled automatically.
Regular push of member renewal
After successful registration, users will be reminded by SMS if they do not log in within three days.
After a scheduled meeting, participants need to be notified ten minutes in advance of the scheduled time to attend the meeting.
Coupon expiration reminder
The core application content is basically based on the need to set the expiration time.
How to implement delay queue in RabbitMQ
Mode 1, through the advanced features of RabbitMQ TTL and cooperate with the dead letter queue
Mode 2. Install the rabbitmq_delayed_message_exchange plug-in
Advanced feature TTL in RabbitMQ
What is TTL? TTL is a property of a message or queue in RabbitMQ, indicating the maximum survival time of a message or all messages in the queue, in milliseconds. Why should the delay queue introduce it? TTL is a message expiration policy. After the message has survived in the queue for a specified period of time, the message will be discarded directly by changing the queue. There is no good delay queue in RabbitMQ. We can use the advanced feature of TTL, and then cooperate with the dead letter queue to achieve the function of delay queue.
So, how do you set this TTL value? There are two ways. The first is to set the "x-message-ttl" property of the queue when creating the queue, as follows: method 1:
Map args = new HashMap (); args.put ("x-message-ttl", 6000); channel.queueDeclare (queueName, durable, exclusive, autoDelete, args)
In this way, the message is set to TTL, and once the message expires, it is discarded by the queue
Method 2:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder (); builder.expiration ("6000"); AMQP.BasicProperties properties = builder.build (); channel.basicPublish (exchangeName, routingKey, mandatory, properties, "msg body" .getBytes ())
In this way, even if the message expires, it is not necessarily discarded immediately, because whether the message expires is determined before it is about to be delivered to the consumer, and if there is a serious message backlog in the current queue, expired messages may survive for a long time.
In addition, it should be noted that if TTL is not set, the message will never expire, and if TTL is set to 0, the message will be discarded unless it can be delivered directly to the consumer at this time.
How to implement delay queue in RabbitMQ
Step 1: create a normal queue, specify the expiration time of the message, and specify the dead-letter switch and the dead-letter exchange queue to be delivered after the message expires.
Step 2: create a dead letter queue and a dead letter switch
RabbitMQ implements delay queue instance package com.example.demo;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException / * @ author echo * @ date 2021-01-14 14:35 * / public class TopicDealProductTest {/ * delay queue switch * / private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay"; / * * Dead letter queue switch * / private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead" / * * delay queue * / private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay"; / * * Dead letter queue * / private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead"; / * * delay queue ROUTING_KEY * / private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey" / * * delay queue ROUTING_KEY * / private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672; public static void main (String [] args) throws IOException, TimeoutException, InterruptedException {Connection connection = createConnection (); / / create a channel Channel channel = connection.createChannel () SendMsg (channel); Thread.sleep (10000); closeConnection (connection, channel);} private static void sendMsg (Channel channel) throws IOException {/ / create delay queue and delay switch channel.exchangeDeclare (DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT); Map map = new HashMap (16) / / specify the dead letter switch map.put ("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD) on the delay switch; / / specify the dead letter queue routing-key map.put ("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY) on the delay switch. / / set a delay queue extension of 10s map.put ("x-message-ttl", 10000); / / create a delay queue channel.queueDeclare (DIRECT_QUEUE_DELAY, true, false, false, map); / / bind a delay queue channel.queueBind (DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY) on the delay switch / / create a dead-letter queue and channel.exchangeDeclare (DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null); / / create a dead-letter queue channel.queueDeclare (DIRECT_QUEUE_DEAD, true, false, false, null); / / bind a dead-letter queue channel.queueBind (DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY) on a dead-letter switch Channel.basicPublish (DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, null, "hello world" .getBytes ());} private static void closeConnection (Connection connection, Channel channel) throws IOException, TimeoutException {/ / close resource channel.close (); connection.close ();} private static Connection createConnection () throws IOException, TimeoutException {/ / create connection factory ConnectionFactory factory = new ConnectionFactory () / / set the link parameters of RabbitMQ factory.setHost (IP_ADDRESS); factory.setPort (PORT); factory.setUsername ("echo"); factory.setPassword ("123456"); / / establish a link with RabbitMQ return factory.newConnection ();}}
At this point, it is not difficult to find that we only make use of the feature of TTL to discard messages to the specified queue when they expire, and the dead letter queue is actually an ordinary queue.
After execution, let's take a look at the results. In Exchange, we created two switches and two queues, but there are differences between the two queues and the switches. Let's look at the picture.
We can see that the Features flags of the two queues are different.
TTL: the expiration time of the message in the queue
DLX: this queue is bound to a dead letter switch.
DLK: the ROUTING_KEY of the dead letter queue bound to this queue
Only after our execution is completed, we can see that the message is first delivered to the delay, and the messages in the queue are delivered to the dead queue after reaching the expiration time.
So we introduced TTL and setting AMQP.BasicProperties above, there are some differences between the two. The former is to set the expiration time of queue messages, and the second is to set the expiration time of each message. So what's the difference between them?
The difference between setting each message and setting TTL
In fact, the difference between the two ways is how to determine whether the message is going to be discarded. The queue set by TTL discards the message as soon as it reaches its expiration time. If it is the latter, there may be a lot of messages in our queue, and then the expiration time of each message is different. At this time, if many messages with no set expiration time are blocked at the exit of the queue and are not consumed, the messages at the back of the queue will not be discarded if the expiration time is set in time. Only when the message with the expiration time is set to the position where the queue should be consumed will it be judged.
How to use AMQP.BasicProperties? Package com.example.demo;import com.rabbitmq.client.*;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/** * @ author echo * @ date 2021-01-14 14:35 * / public class TopicDealProductTest {/ * * delay queue switch * / private static final String DIRECT_EXCHANGE_DELAY = "dir_exchange_delay" / * * Dead letter queue switch * / private static final String DIRECT_EXCHANGE_DEAD = "dir_exchange_dead"; / * * delay queue * / private static final String DIRECT_QUEUE_DELAY = "dir.queue.delay"; / * * Dead letter queue * / private static final String DIRECT_QUEUE_DEAD = "dir.queue.dead" / * * delay queue ROUTING_KEY * / private static final String DIRECT_DELAY_ROUTING_KEY = "delay.queue.routingKey"; / * delay queue ROUTING_KEY * / private static final String DIRECT_DEAD_ROUTING_KEY = "dead.queue.routingKey"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672 Public static void main (String [] args) throws IOException, TimeoutException, InterruptedException {Connection connection = createConnection (); / / create a channel Channel channel = connection.createChannel (); sendMsg (channel); Thread.sleep (10000); closeConnection (connection, channel) } private static void sendMsg (Channel channel) throws IOException {/ / create delay queue and delay switch channel.exchangeDeclare (DIRECT_EXCHANGE_DELAY, BuiltinExchangeType.DIRECT); Map map = new HashMap (16); / / specify dead letter switch map.put ("x-dead-letter-exchange", DIRECT_EXCHANGE_DEAD) on the delay switch Map.put ("x-dead-letter-routing-key", DIRECT_DEAD_ROUTING_KEY); / set the extended length of delay queue 10s// map.put ("x-message-ttl", 10000); / / create delay queue channel.queueDeclare (DIRECT_QUEUE_DELAY, true, false, false, map) / bind channel.queueBind (DIRECT_QUEUE_DELAY, DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY) to delay switch; / / create dead-letter queue and dead-letter switch channel.exchangeDeclare (DIRECT_EXCHANGE_DEAD, BuiltinExchangeType.TOPIC, true, false, null); / / create dead-letter queue channel.queueDeclare (DIRECT_QUEUE_DEAD, true, false, false, null) / / bind the dead letter queue channel.queueBind (DIRECT_QUEUE_DEAD, DIRECT_EXCHANGE_DEAD, DIRECT_DEAD_ROUTING_KEY) on the dead letter switch; AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder (); builder.expiration ("10000"); AMQP.BasicProperties properties = builder.build () Channel.basicPublish (DIRECT_EXCHANGE_DELAY, DIRECT_DELAY_ROUTING_KEY, false, properties, "hello world" .getBytes ());} private static void closeConnection (Connection connection, Channel channel) throws IOException, TimeoutException {/ / close resource channel.close (); connection.close ();} private static Connection createConnection () throws IOException, TimeoutException {/ / create connection factory ConnectionFactory factory = new ConnectionFactory () / / set the link parameters of RabbitMQ factory.setHost (IP_ADDRESS); factory.setPort (PORT); factory.setUsername ("echo"); factory.setPassword ("123456"); / / establish a link with RabbitMQ return factory.newConnection ();}}
After we have completed the operation, we can see that the effect is the same as the one we used before.
The difference between the two ways to set the expiration time is that one uniformly sets the expiration time, and the other specifies each expiration time. But it does not affect the implementation of our delay queue, so how do we choose?
How to choose the TTL setting method?
It is most reasonable to choose the usage scene according to the characteristics of the two ways. If we use it as a delay queue, if we want to apply the characteristics of the delay queue to the actual scene, and have a high demand for timeliness, we suggest that we choose the first method.
Summary
The implementation of delay queue is not difficult, the key is that we need to know one of his principles, to understand RabbitMQ, his TTL and his dead letter. After mastering these characteristics, we can make good use of the delay queue. The delay queue is also very helpful to our work, but RabbiTMQ doesn't have a native delay queue, and just because we implemented it in this way doesn't mean we have to choose it. In fact, there are many ways, such as the DelayQueu in Java, the time wheel of kafka and so on.
This is the end of "how to implement the delay queue in RabbitMQ". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.