Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand TTL and DLX of Java RabbitMQ

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces you how to understand Java RabbitMQ TTL and DLX, the content is very detailed, interested friends can refer to, hope to be helpful to you.

Overview of TTL1 and TTL of RabbitMQ

The full name of RabbitMQ's TTL is Time-To-Live, which indicates the validity period of the message. If a message is not consumed in the queue and lasts longer than TTL, the message becomes "Dead Message" and can no longer be consumed later. There are two ways to set up TTL:

The first is that when you declare a queue, set it in the properties of the queue so that all messages in the queue have the same validity period.

The second is to set properties to the message when sending the message, and you can set a different TTL for each message.

If both methods are set, the smaller one shall prevail. The difference between the two: if the validity period is set when the queue is declared, the message will be deleted if it expires; if it is the validity period set when the message is sent, the message will not be deleted immediately if it expires, because whether the message expires or not is judged when it is delivered to the consumer. As for why it should be handled in this way, it is easy to figure out: in the first way, the valid period of messages in the queue is the same, the first queue is in the queue header, and the header is also the earliest message to expire. RabbitMQ starts a scheduled task to scan the queue header for expired messages. In the second way, the expiration time of each message is different, so you can filter out expired messages only by traversing the entire queue, which is too inefficient, and it is not feasible at all after a large number of messages. You can wait until the message is to be delivered to consumers to determine deletion. Although the deletion is not timely enough, it does not affect the function, in fact, it is to trade space for time.

If TTL is not set, the message is permanently valid (the default message is not invalidated). If TTL is set to 0, it means that if the message cannot be consumed immediately, it will be discarded immediately. This feature can partially replace the immediate parameter previously supported by RabbitMQ3.0. The reason for this is that when the immediate parameter fails to deliver, the basic.return method will return the message body (this function can be implemented using dead letter queues).

2. Set the message validity period 2.1. Set the validity period through the queue.

Remember the method we declared earlier, queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments). The last parameter of this method can set the property of the queue, which is named x-message-ttl in milliseconds. If you don't know what the queue properties are, you can check the web console where the queue is added.

The specific code is as follows:

/ / sets the validity period of all messages on the queue in millisecond Map argss = new HashMap (); arguments.put ("x-message-ttl", 5000); / / 5-second channel.queueDeclare (queueName, durable, exclusive, autoDelete, arguments)

The list of queues on the viewing console is as follows: d means persistence, and TTL means the validity period of the message is set.

After a few seconds, it was found that the message no longer existed.

You can also use the command line mode of RabbitMQ to set:

Rabbitmqctl set_policy TTL ". *" {"message-ttl": 60000}'--apply-to queues

You can also call through the HTTP API:

$curl-I-u guest:guest-H "content-type:application/json"-XPUT-d' {"auto_delete": false, "durable": true, "arguments": {"x-message-ttl": 60000}} 'http://ip:15672/api/queues/{vhost}/{queuename}2.2, set the validity period when sending a message

When sending a message, the basicPublish method can set the property parameter, which sets the validity period of the message in milliseconds through the expiration property. The code is as follows

Builder bd = new AMQP.BasicProperties (). Builder (); bd.deliveryMode (2); / / persistent bd.expiration ("100000"); / / set message validity for 100s BasicProperties pros = bd.build (); String message = "Test ttl message"; channel.basicPublish (EXCHANGE_NAME, "error", true,false, pros, message.getBytes ())

You can also set it through the HTTPAPI API:

$curl-I-u guest:guest-H "content-type:application/json"-XPOST-d' {"properties": {"expiration": "60000"}, "routing_key": "routingkey", "payload": "my body", "payload_encoding": "string"} 'http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

The complete code for setting the validity period of the message through the queue and setting the validity period through the attribute when publishing the message is as follows: after you can run it, observe the console, and you can find that when the message is set at the same time, the validity period of the message is smaller. Project GitHub address https://github.com/RookieMember/RabbitMQ-Learning.git.

Package cn.wkp.rabbitmq.newest.ttl; import java.util.HashMap;import java.util.Map; import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.AMQP.BasicProperties.Builder;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection; import cn.wkp.rabbitmq.util.ConnectionUtil / * @ ClassName: Send * @ Description: message validity * @ author wkg * @ date: 11:28:22 on September 1st, 2021 * / public class Send {private final static String EXCHANGE_NAME = "ttl_exchange"; private final static String QUEUE_NAME = "ttl_queue" Public static void main (String [] argv) throws Exception {/ / get the connection and mq channel Connection connection = ConnectionUtil.getConnection (); / / create the channel Channel channel = connection.createChannel () from the connection; / / declare the switch channel.exchangeDeclare (EXCHANGE_NAME, "direct", true) / / * 1: set the validity period through the queue 2: set the validity period through the message property, if the smaller is set * / / declare the queue Map arguments=new HashMap () / / sets the validity period of all messages on the queue in millisecond arguments.put ("x-message-ttl", 5000); / 5 seconds channel.queueDeclare (QUEUE_NAME, true, false, false, arguments); / / binds channel.queueBind (QUEUE_NAME, EXCHANGE_NAME, "error") Builder bd = new AMQP.BasicProperties () .builder (); bd.deliveryMode (2); / / persistent bd.expiration ("100000"); / / set message validity for 100s BasicProperties pros = bd.build (); String message = "Test ttl message" Channel.basicPublish (EXCHANGE_NAME, "error", true,false, pros, message.getBytes ()); System.out.println ("Sent message:" + message); / / close channel and connection channel.close (); connection.close ();}} 3. Set queue validity period (not commonly used, only for understanding)

When adding a queue in the web console above, we see a x-expires parameter that allows the queue to expire and delete automatically if it is "unused" within a specified period of time, which means that no consumer,queue on the queue has not been redeclared and the basic.get command has not been called within the expiration period. This method can be used, for example, for RPC-style 's reply queue, in which many queue are created but never used.

The server ensures that the queue is deleted after the expiration time, but does not guarantee how timely the deletion will be. After the server restarts, the timeout of the persisted queue will be recalculated. The x-expires parameter value is in milliseconds, obeys the same constraints as x-message-ttl, and cannot be set to 0. Therefore, if this parameter is set to 1000, it means that the queue will be deleted if it is not used within 1 second.

Map args = new HashMap (); args.put ("x-expires", 18000); / / queue validity period of 18 seconds channel.queueDeclare ("myqueue", false, args); what are DLX1 and DLX of RabbitMQ?

DLX is the abbreviation of Dead-Letter-Exchange, which means dead letter switch.

Its function is actually used to receive dead message (dead message). So what is dead letter news? There are several situations in which a general message becomes a dead letter message:

Message is rejected (Basic.Reject/Basic.Nack), and the requeue parameter is set to false

Message expires

The queue reaches the maximum length

When a message becomes a dead-letter message in one queue, it can be sent to another switch, which is called DLX, and the queue bound to DLX becomes a dead-letter queue. When there is a dead letter in this queue, RabbitMQ will immediately automatically republish the message to the set DLX, which will be routed to the dead letter queue bound to the DLX. Messages in this queue can be monitored for processing, which works in conjunction with the ability to set the message's TTL to 0 to compensate for the imrnediate parameter.

2. What is the use of DLX

Because messages that are not consumed normally and set requeue to false will enter the dead letter queue, we can monitor the messages in the consumer dead letter queue to observe and analyze the problems of the system. DLX also plays a very important role in implementing delay queues combined with TTL (delay queues are widely used: for example, how long it takes to place an order to shut down automatically For example, those of us who have connected to the third-party payment system must know that a notify_url will be sent in our order to receive the payment result. If we do not respond to a successful payment to the third party, it will continue to call the notify_url that notifies us from time to time, and will not notify us after more than a few times. The general notification frequency is 0 seconds-5 seconds-30 seconds-5 minutes-30 minutes-1 hour-6 hours-12 hours. For example, our household appliances are turned off regularly. These scenarios can be implemented with delay queues.

3. How to use DLX

When we add queues in the web console, we see two DLX-related parameters: x-dead-letter-exchange and x-dead-letter-routing-key. X-dead-letter-exchange sets the DLX of the queue; x-dead-letter-routing-key sets the routing key of the dead-letter message when it enters the DLX. This may not be set. If not, the routing key of the original queue is used by default.

The client can set the x-dead-letter-exchange parameter when declaring the queue through the channel.queueDeclare method, as shown in the following code

Channel.exchangeDeclare ("dlx_exchange", "direct"); / / create DLX: dlx_exchangeMap args = new HashMap (); args.put ("x-dead-letter-exchange", "dlx_exchange"); / / set DLXargs.put ("x-dead-letter-routing-key", "dlx-routing-key"); / / set the routing key of DLX (optional) / / add DLXchannel.queueDeclare ("myqueue", false, false, false, args) to queue myqueue

The above may be abstract. Let's use a concrete example to demonstrate the specific use of DLX:

Package cn.wkp.rabbitmq.newest.dlx; import java.util.Date;import java.util.HashMap;import java.util.Map; import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.MessageProperties; import cn.wkp.rabbitmq.util.ConnectionUtil; public class SendDLX {public static void main (String [] args) throws Exception {Connection connection = ConnectionUtil.getConnection (); Channel channel = connection.createChannel () / / declare a switch, use channel.exchangeDeclare ("dlx_exchange", "topic", true, false, null) as dead-letter switch; / / declare a queue, do dead-letter queue with channel.queueDeclare ("dlx_queue", true, false, false, null) / / queue bound to the switch channel.queueBind ("dlx_queue", "dlx_exchange", "dlx.*"); channel.exchangeDeclare ("normal_exchange", "fanout", true, false, null); Map arguments=new HashMap (); arguments.put ("x-message-ttl", 5000) / / set the validity period of the message for 1 second. After it expires, it becomes a private message, and then enters DLX arguments.put ("x-dead-letter-exchange", "dlx_exchange"); / / sets DLX arguments.put ("x-dead-letter-routing-key", "dlx.test") / / set the routing key of DLX (optional) / / add DLX channel.queueDeclare ("normal_queue", true, false, false, arguments) to queue normal_queue; channel.queueBind ("normal_queue", "normal_exchange", "") Channel.basicPublish ("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("Test Dead message") .getBytes (); System.out.println ("time to send message:" + ConnectionUtil.formatDate (new Date (); channel.close (); connection.close ();}}

The above is the sender's code. After running, the observation console can see the following:

The binding of the dead-letter queue dlx_queue is as follows, which has been bound to the dead-letter switch dlx_exchange (topic type). The routing key is "dlx.*".

The queue normal_queue is bound as follows, which has been bound to the switch normal_exchange (fanout type)

The queues view is as follows: DLX and DLK represent the routing key that sets the dead-letter switch and the dead-letter message to the normal_queue, and we see that the message has been routed to the dead-letter queue. The whole process is as follows:

The message is sent to switch normal_exchange and then routed to queue normal_queue

Because the queue normal_queue has no consumer, the message becomes a dead message when it expires.

The dead letter message carries the set x-dead-letter-routing-key=dlx.test into the dead letter switch dlx_exechage.

The routing key bound by dlx_exechage and dlx_queue is "dlx.*", and the routing key dlx.test of the dead message is routed to the dlx.queue in accordance with this rule.

Then we add consumers to the dead letter queue as follows: let's test the time when the dead letter message enters DLX and delete the previous dead letter message first.

Package cn.wkp.rabbitmq.newest.dlx; import java.io.IOException;import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope; import cn.wkp.rabbitmq.util.ConnectionUtil; public class RecvDLX {public static void main (String [] argv) throws Exception {Connection connection = ConnectionUtil.getConnection () Final Channel channel = connection.createChannel (); channel.exchangeDeclare ("dlx_exchange", "topic", true, false, null); channel.queueDeclare ("dlx_queue", true, false, false, null); channel.queueBind ("dlx_queue", "dlx_exchange", "dlx.*") / / means that the consumer will not distribute the new message to it until it receives the message in the queue but does not return the confirmation result. Channel.basicQos (1) Consumer consumer = new DefaultConsumer (channel) {@ Override public void handleDelivery (String consumerTag, Envelope envelope, BasicProperties properties, byte [] body) throws IOException {System.out.println ("consumers receive messages:" + new String (body) + " Current time: "+ ConnectionUtil.formatDate (new Date () / / consumers manually send ack reply channel.basicAck (envelope.getDeliveryTag (), false);}}; System.out.println ("consuming messages in dead letter queue = =") / / listening queue channel.basicConsume ("dlx_queue", false, consumer);}}

The running result is as follows (first run the dead letter queue consumer, then run the producer): we see that the message is consumed by the consumer of the dead letter queue 10 milliseconds after it expires, obviously, the message is sent to DLX immediately after it becomes dead letter.

Consume messages in the dead letter queue = =

The consumer received the message: test the dead letter message, current time: 2021-09-24 16-30-15-15-40

Time to send message: 2021-09-24 17 purl 5700 purl 730

On how to understand Java RabbitMQ TTL and DLX to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report