Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the two ways of implementing delay queue in RabbitMQ?

2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you about the two ways to achieve delay queue in RabbitMQ. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.

There are all kinds of scheduled tasks. Common scheduled tasks, such as log backup, may be backed up at 3 o'clock every morning. This kind of fixed-time scheduled task is usually easily realized by using cron expression. There are also some special scheduled tasks, such as watching the ticking time bomb in the movie and exploding after 3 minutes. This kind of scheduled task is not easy to describe with cron, because the start time is uncertain. We sometimes encounter similar requirements in our development, such as:

In e-commerce projects, when we place an order, we usually need to pay within 20 minutes or 30 minutes, otherwise the order will enter the exception handling logic and be cancelled, then into the exception handling logic, it can be regarded as a delay queue.

I bought a smart casserole, which can be used to cook porridge. I put all the ingredients in the pot before going to work, and then set what time to start cooking porridge, so that I can eat delicious porridge after work. Then this porridge cooking instruction can also be seen as a delayed task, put in a delay queue and executed when the time is up.

The company's meeting reservation system will notify all users who have booked the meeting half an hour before the start of the meeting after the meeting is booked successfully.

If the safety work order is not processed for more than 24 hours, the WeChat group of enterprises will be automatically pulled to remind the relevant responsible persons.

After users place an order for takeout, they will be reminded that the takeout boy is about to time out when there are still 10 minutes left before the timeout.

...

We need delay queues in many scenarios.

This article takes RabbitMQ as an example to talk to you about how to play delay queues.

Overall, there are two ways to implement scheduled tasks on RabbitMQ:

The timed task is realized by using the message expiration and private message queue mechanism of RabbitMQ.

It is relatively simple to use RabbitMQ's rabbitmq_delayed_message_exchange plug-in to implement scheduled tasks.

Let's look at the two uses separately.

1. Install the plug-in with plug-in 1.1

First of all, we need to download the rabbitmq_delayed_message_exchange plug-in, which is an open source project on GitHub. We can download it directly:

Https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

Choose the version that suits you, and I choose the latest version 3.9.0 here.

After the download is complete, execute the following command on the command line to copy the download file to the Docker container:

Docker cp. / rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins

Here the first parameter is the file address on the host, and the second parameter is copied to the location of the container.

Next, execute the following command to enter the RabbitMQ container:

Docker exec-it some-rabbit / bin/bash

After entering the container, execute the following command to enable the plug-in:

Rabbitmq-plugins enable rabbitmq_delayed_message_exchange

After enabling successfully, you can also view all installed plug-ins with the following command to see if there are any plug-ins we just installed, as follows:

Rabbitmq-plugins list

The complete execution of the command is shown in the following figure:

OK, after the configuration is complete, let's execute the exit command to exit the RabbitMQ container. And start coding.

1.2 sending and receiving messages

Next, let's start sending and receiving messages.

First, let's create a Spring Boot project to introduce Web and RabbitMQ dependencies, as follows:

After the project is created successfully, configure the basic information of RabbitMQ in application.properties, as follows:

Spring.rabbitmq.host=localhostspring.rabbitmq.password=guestspring.rabbitmq.username=guestspring.rabbitmq.virtual-host=/

Next, provide a configuration class for RabbitMQ:

@ Configurationpublic class RabbitConfig {public static final String QUEUE_NAME = "javaboy_delay_queue"; public static final String EXCHANGE_NAME = "javaboy_delay_exchange"; public static final String EXCHANGE_TYPE = "x-delayed-message"; @ Bean Queue queue () {return new Queue (QUEUE_NAME, true, false, false);} @ Bean CustomExchange customExchange () {Map args = new HashMap () Args.put ("x-delayed-type", "direct"); return new CustomExchange (EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);} @ Bean Binding binding () {return BindingBuilder.bind (queue ()) .to (customExchange ()) .with (QUEUE_NAME) .noargs ();}}

Here is mainly the definition of the switch is different, friends need to pay attention to.

The switch we use here is CustomExchange, which is provided in Spring. When creating a CustomExchange, there are five parameters, which are as follows:

Switch name.

Switch type, this place is fixed.

Whether the switch is persistent.

If there is no queue bound to the switch, whether the switch is deleted.

Other parameters.

The last args parameter specifies the type of message distribution on the switch, which is known as direct, fanout, topic and header. The switch will distribute messages in any way in the future.

Next, let's create a message consumer:

@ Componentpublic class MsgReceiver {private static final Logger logger = LoggerFactory.getLogger (MsgReceiver.class); @ RabbitListener (queues = RabbitConfig.QUEUE_NAME) public void handleMsg (String msg) {logger.info ("handleMsg, {}", msg);}}

Just print the contents of the message.

Next, write a unit test method to send the message:

@ SpringBootTestclass MqDelayedMsgDemoApplicationTests {@ Autowired RabbitTemplate rabbitTemplate; @ Test void contextLoads () throws UnsupportedEncodingException {Message msg = MessageBuilder.withBody (("hello Jiangnan Rain" + new Date ()) .getBytes ("UTF-8")) .setHeader ("x-delay", 3000) .build (); rabbitTemplate.convertAndSend (RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);}}

Set the delay time of the message in the header.

All right, then start the Spring Boot project, then run the unit test method to send a message, and the final console prints the log as follows:

You can see from the log that the message delay has been achieved.

2. DLX implementation of delay queue 2.1 delay queue implementation idea

The idea of delay queue implementation is also very simple, which is DLX (dead signal switch) + TTL (message timeout).

We can think of the dead letter queue as a delay queue.

Specifically, it goes like this:

If a message needs to be delayed for 30 minutes, we set the validity period of the message to 30 minutes, configure the dead-letter switch and dead-letter routing_key for the message, and do not set consumers for the message queue, then 30 minutes later, the message enters the dead-letter queue because it is not consumed by consumers. At this time, we have a consumer in the dead-letter queue. As soon as the message entered the dead letter queue, it was immediately consumed.

This is the idea of implementing delay queues, isn't it very simple?

2.2 case

Next, Brother Song uses a simple case to demonstrate the specific implementation of the delay queue.

First of all, have a startup RabbitMQ ready.

Then we create a Spring Boot project that introduces RabbitMQ dependencies:

Then configure the basic connection information of RabbitMQ in application.properties:

Spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672

Next, let's configure two message queues: a normal queue and a dead letter queue:

@ Configurationpublic class QueueConfig {public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name"; public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name"; public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key"; public static final String DLX_QUEUE_NAME = "dlx_queue_name"; public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name"; public static final String DLX_ROUTING_KEY = "dlx_routing_key" / * * Dead letter queue * @ return * / @ Bean Queue dlxQueue () {return new Queue (DLX_QUEUE_NAME, true, false, false);} / * * Dead letter switch * @ return * / @ Bean DirectExchange dlxExchange () {return new DirectExchange (DLX_EXCHANGE_NAME, true, false) } / * bind dead-letter queue and dead-letter switch * @ return * / @ Bean Binding dlxBinding () {return BindingBuilder.bind (dlxQueue ()) .to (dlxExchange ()) .with (DLX_ROUTING_KEY) } / * * ordinary message queuing * @ return * / @ Bean Queue javaboyQueue () {Map args = new HashMap (); / / set message expiration time args.put ("x-message-ttl", 10000010); / / set dead letter switch args.put ("x-dead-letter-exchange", DLX_EXCHANGE_NAME) / / set dead letter routing_key args.put ("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue (JAVABOY_QUEUE_NAME, true, false, false, args);} / * ordinary switch * @ return * / @ Bean DirectExchange javaboyExchange () {return new DirectExchange (JAVABOY_EXCHANGE_NAME, true, false) } / * bind the normal queue and the corresponding switch * @ return * / @ Bean Binding javaboyBinding () {return BindingBuilder.bind (javaboyQueue ()) .to (javaboyExchange ()) .with (JAVABOY_ROUTING_KEY);}}

Although this configuration code is a little longer, the principle is actually simple.

The configuration can be divided into two groups, the first group is configured with dead letter queue, and the second group is configured with normal queue. Each group consists of a message queue, a message switch and a Binding.

When you configure message queuing, specify a dead-letter queue for message queuing.

When configuring the expiration time of messages in the queue, the default unit of time is milliseconds.

Next, we configure a consumer for the dead letter queue, as follows:

@ Componentpublic class DlxConsumer {private static final Logger logger = LoggerFactory.getLogger (DlxConsumer.class); @ RabbitListener (queues = QueueConfig.DLX_QUEUE_NAME) public void handle (String msg) {logger.info (msg);}}

Print out the message when you receive it.

That's it.

Start the project.

Finally, we send a message in the unit test:

SpringBootTestclass DelayQueueApplicationTests {@ Autowired RabbitTemplate rabbitTemplate; @ Test void contextLoads () {System.out.println (new Date ()); rabbitTemplate.convertAndSend (QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");}}

There is nothing to say about this, but a normal message is sent, and after 10 seconds the message will be printed out among the consumers of the dead letter queue.

These are the two ways of implementing delay queue in RabbitMQ shared by Xiaobian. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report