Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement message delay queue based on RabbitMQ in SpringBoot

2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

Editor to share with you how to achieve message delay queue based on RabbitMQ in SpringBoot, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's learn about it!

Delay queue usage scenario

> in many business scenarios, delay queues can achieve many functions. In such services, they are generally non-real-time, need to be delayed, and need to be compensated for retry.

Order timeout closed: in payment scenarios, orders that are not paid within 30 minutes or 1 hour after creation will be cancelled automatically.

SMS or email notification: when registering or issuing an order, it is necessary to send relevant information by SMS or email after 1 minute or a specific time. This kind of business itself is not related to the main business, and the general practice is to send it asynchronously.

Retry scenario: for example, when an exception occurs in the first notification, the message notification will be retried and sent a few minutes later.

Implementation of delay queue with RabbitMQ

> delay queuing is not provided directly in RabbitMQ, but it can be implemented by using the features of TTL (Time-To-Live, time to live) and DLX (Dead-Letter-Exchange, dead letter queue switch).

Survival time (Time-To-Live referred to as TTL)

> TTL,TTL can be set for queues and messages in RabbitMQ, respectively. This indicates the maximum time a message can survive in the queue. When a message is set to TTL or when a message enters a queue with TTL set, the message will die into Dead Letter** after TTL time. If both the TTL of the message and the TTL of the queue are configured, the smaller value will be taken.

Dead letter Exchange (Dead Letter Exchanges abbreviated as DLX)

> as mentioned in the previous knowledge point, a message or queue with TTL will eventually become Dead Letter. When a message becomes dead-letter in one queue, it can be re-sent to another switch. This switch is DLX, and the queue bound to this DLX is dead-letter queue.

A message becomes a dead letter generally due to the following circumstances

The message was rejected

Message expires

The queue reached its maximum length.

Therefore, the function of delay queue can be simulated through the characteristics of TTL and DLX. When the message in the queue times out and becomes a dead letter, the message dead letter is re-sent to the configured switch and then distributed to the real consumption queue. So to put it simply, we can create two queues, one for sending messages and one for forwarding destination queues after the message expires.

Implementation of delay queue by integrating RabbitMQ with SpringBoot

> the following practical instructions are made using SpringBoot integrated RabbitMQ. If the notification fails (the address is not available or the connection times out), transfer the message to the delay queue and resend it after a specified time.

0. Introduce pom dependency

Org.springframework.boot spring-boot-starter-amqp cn.hutool hutool-http 4.5.16 cn.hutool hutool-json 4.5.16 org.springframework.boot spring-boot-starter-web

1. Write rabbitmq configuration file (critical configuration) RabbitConfig.java

/ * @ ClassName class name: RabbitConfig * @ Description function description: *

* TODO*

* @ date creation date: July 17, 2019 * @ version version number: V1.0 *

* * revision record * * oKong created this feature on July 17, 2019. *

* / @ Configurationpublic class RabbitConfig {@ Autowired ConnectionFactory connectionFactory; / * Consumer threads can be notified by setting a large number of threads * / @ Value ("${http.notify.concurrency:50}") int concurrency / * * the number of consumer threads in delay queue can be set * / @ Value ("${http.notify.delay.concurrency:20}") int delayConcurrency; @ Bean public RabbitAdmin rabbitAdmin () {return new RabbitAdmin (connectionFactory) } @ Bean public DirectExchange httpMessageNotifyDirectExchange (RabbitAdmin rabbitAdmin) {/ / whether the durable is persisted / / whether the autoDelete is automatically deleted, that is, the switch automatically deletes DirectExchange directExchange = new DirectExchange (ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false); directExchange.setAdminsThatShouldDeclare (rabbitAdmin); return directExchange when the server or customer server is offline. } / / set message queue @ Bean public Queue httpMessageStartQueue (RabbitAdmin rabbitAdmin) {/ * create receiving queue, 4 parameters name-queue name durable-false, no holding exclusive-true, exclusive autoDelete-true Automatically delete * / Queue queue = new Queue (ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false) Queue.setAdminsThatShouldDeclare (rabbitAdmin); return queue;} / / queue bound switch @ Bean public Binding bindingStartQuene (RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) {Binding binding = BindingBuilder.bind (httpMessageStartQueue) .to (httpMessageNotifyDirectExchange) .with (ApplicationConstant.HTTP_MESSAGE_START_RK); binding.setAdminsThatShouldDeclare (rabbitAdmin); return binding } @ Bean public Queue httpMessageOneQueue (RabbitAdmin rabbitAdmin) {Queue queue = new Queue (ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false); queue.setAdminsThatShouldDeclare (rabbitAdmin); return queue;} @ Bean public Binding bindingOneQuene (RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) {Binding binding = BindingBuilder.bind (httpMessageOneQueue) .to (httpMessageNotifyDirectExchange) .with (ApplicationConstant.HTTP_MESSAGE_ONE_RK) Binding.setAdminsThatShouldDeclare (rabbitAdmin); return binding } / /-set delay queue-start-@ Bean public Queue httpDelayOneQueue () {/ / name-queue name / / durable-true / / exclusive-false / / autoDelete-false return QueueBuilder.durable ("http.message.dlx.one" ") / / the following is the point: when it becomes a dead letter queue Will be forwarded to queues routed as x-dead-letter-exchange and x-dead-letter-routing-key. WithArgument ("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE). WithArgument ("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK) .withArgument ("x-message-ttl") 1 "60" 1000) / / 1 minute expiration time (in milliseconds) When it expires, it becomes a dead letter queue, and then forwards. Build () } / / bind to the switch @ Bean public Binding bindingDelayOneQuene (RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) {Binding binding = BindingBuilder.bind (httpDelayOneQueue) .to (httpMessageNotifyDirectExchange) .with ("delay.one"); binding.setAdminsThatShouldDeclare (rabbitAdmin); return binding } / /-set delay queue-end-/ / it is recommended to separate the normal queue from the delay processing queue / / set the listening container @ Bean ("notifyListenerContainer") public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer () {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory (); factory.setAcknowledgeMode (AcknowledgeMode.MANUAL) / / Manual ack factory.setConnectionFactory (connectionFactory); factory.setPrefetchCount (1); factory.setConcurrentConsumers (concurrency); return factory;} / set listening container @ Bean ("delayNotifyListenerContainer") public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer () {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory (); factory.setAcknowledgeMode (AcknowledgeMode.MANUAL); / / Manual ack factory.setConnectionFactory (connectionFactory); factory.setPrefetchCount (1) Factory.setConcurrentConsumers (delayConcurrency); return factory;}}

ApplicationConstant.java

Public class ApplicationConstant {/ * exchange queue to send http notifications * / public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange"; / * configure message queuing and routing key values * / public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start"; public static final String HTTP_MESSAGE_START_RK = "rk.start" Public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one"; public static final String HTTP_MESSAGE_ONE_RK = "rk.one" / * the delay queue relationship corresponding to the notification queue, that is, the queue information sent to the next one after the expiration of the queue, can be added according to the actual situation, of course, it can also be automatically generated according to certain rules * / public static final Map delayRefMap = new HashMap () {/ * / private static final long serialVersionUID =-779823216035682493L {put (HTTP_MESSAGE_START_QUEUE_NAME, "delay.one");};}

Simply put, create a normal message sending queue to receive the parameters of the http message request and make the http request at the same time. At the same time, create a deferred queue, set its x-dead-letter-exchange, x-dead-letter-routing-key, and XMMI message-TTL values, and forward it to the normal queue. Use a map object to maintain a relationship. When a normal message is abnormal, the queue name of the delay queue that needs to be sent. Of course, the time scenarios are summarized, which can be dynamically configured or dynamically mapped according to certain rules as needed.

two。 Create a listening class for message consumption operation. Here @ RabbitListener is used to consume messages (of course, you can also use SimpleMessageListenerContainer for message configuration). A normal message listening and delay queue listening are created. Since the exception notification is generally a low probability event, it can be configured differently according to different listening containers.

/ * @ ClassName class name: HttpMessagerLister * @ Description function description: http Notification Consumer Monitoring API *

* TODO*

* @ date creation date: July 17, 2019 * @ version version number: V1.0 *

* * revision record * * oKong created this feature on July 17, 2019. *

* / @ Component@Slf4jpublic class HttpMessagerLister {@ Autowired HttpMessagerService messagerService; @ RabbitListener (id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer") public void httpMessageNotifyConsumer (Message message, Channel channel) throws Exception {doHandler (message, channel) } @ RabbitListener (id= "httpDelayMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer") public void httpDelayMessageNotifyConsumer (Message message, Channel channel) throws Exception {doHandler (message, channel);} private void doHandler (Message message, Channel channel) throws Exception {String body = new String (message.getBody (), "utf-8"); String queue = message.getMessageProperties (). GetConsumerQueue () Log.info ("received notification request: {}, queue name: {}", body, queue); / / message object transformation try {HttpEntity httpNotifyDto = JSONUtil.toBean (body, HttpEntity.class); channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false); / / send notification messagerService.notify (queue, httpNotifyDto) } catch (Exception e) {log.error (e.getMessage ()); / / ack channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false);}

HttpMessagerService.java: this kind of class is the key to the real processing of messages. Log recording is not performed here. In real scenarios, it is strongly recommended to store message notifications in the log to prevent future information from being viewed. At the same time, it is also possible to send messages again on a regular basis after the number of retries fails through the sending status.

@ Component@Slf4jpublic class HttpMessagerService {@ Autowired AmqpTemplate mqTemplate; public void notify (String queue,HttpEntity httpEntity) {/ / initiate request log.info ("initiate http request: {}", httpEntity); try {switch (httpEntity.getMethod (). ToLowerCase ()) {case "POST": HttpUtil.post (httpEntity.getUrl (), httpEntity.getParams ()) Break; case "GET": default: HttpUtil.get (httpEntity.getUrl (), httpEntity.getParams ());}} catch (Exception e) {/ / exception, put String nextRk = ApplicationConstant.delayRefMap.get (queue) in delay queue If (ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals (queue)) {/ / if it is already the message queue of the last delay queue, it can be directly placed in the database to be re-sent by the subsequent timing policy ("http notification has failed N times, enter the timing to initiate notification, url= {}", httpEntity.getUrl () } else {log.warn ("http resend Notification: {}, Notification queue rk: {}, original queue: {}", httpEntity.getUrl (), nextRk, queue); mqTemplate.convertAndSend (ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr (httpEntity));}

3. Create control layer services (in real scenarios, such as SpringCloud microservices, you usually create an api interface to be invoked by other services)

@ Slf4j@RestController@Api (tags = "http test interface") public class HttpDemoController {@ Autowired AmqpTemplate mqTemplate; @ PostMapping ("/ send") @ ApiOperation (value= "send", notes = "send http test") public String sendHttp (@ RequestBody HttpEntity httpEntity) {/ / send http request log.info ("start http request, post asynchronous message: {}", httpEntity) MqTemplate.convertAndSend (ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr (httpEntity)); return "sent successfully: url=" + httpEntity.getUrl ();}}

4. Profile adds configuration information related to RabbitMQ

Spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/# Notification-number of consumer threads set large point large probability is the number of consumer threads in the http.notify.concurrency=150# delay queue that can be notified can be set small http.notify.delay.concurrency=10

5. Write startup classes.

@ SpringBootApplication@Slf4jpublic class DelayQueueApplication {public static void main (String [] args) throws Exception {SpringApplication.run (DelayQueueApplication.class, args); log.info ("spring-boot-rabbitmq-delay-queue-chapter38 service starts!");}}

6. Start the service. Use swagger for simple call testing.

Normal notification:

2019-07-20 23 INFO 52V 23.792 INFO 65216-[nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController: start initiating http request Issue asynchronous message: HttpEntity (url=www.baidu.com, params= {aqui1}, method=get) 2019-07-20 23 params= 52params= 23.794 INFO 65216-[TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister: received notification request: {"method": "get", "params": {"a": 1}, "url": "www.baidu.com"} Queue name: http.message.start2019-07-20 23 TaskExecutor-97 52bank 23.794 INFO 65216-[TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService: initiate http request: HttpEntity (url=www.baidu.com, params= {aqui1}, method=get)

Exception notification: access an address that does not exist

2019-07-20 23 c.l.l.s.c.controller.HttpDemoController 53 INFO 14.699 65216-[nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController: start initiating a http request Issue asynchronous messages: HttpEntity (url=www.baidu.com1, params= {aqui1}, method=get) 2019-07-20 23 INFO 5314.705 INFO 65216-[TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister: received notification request: {"method": "get", "params": {"a": 1}, "url": "www.baidu.com1"} Queue name: http.message.start2019-07-20 23 http 53 INFO 14.705 INFO 65216-[TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService: initiate http request: HttpEntity (url=www.baidu.com1, params= {aqui1}, method=get) 2019-07-20 23 PVR 53 TaskExecutor-84 14.706 WARN 65216-[TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService: http resend Notification: www.baidu.com1 Notification queue rk: delay.one, original queue: http.message.start

In the RabbitMQ background, you can see that there are messages in the http.message.dlx.one queue that require delayed processing, which will be forwarded to the http.message.one queue after a minute.

After a minute, you can see that the message book is consumed again.

2019-07-20 23 c.l.l.s.chapter38.mq.HttpMessagerLister: received notification request: {"method": "get", "params": {"a": 1}, "url": "www.baidu.com1"} Queue name: http.message.one2019-07-20 23 http 54http 14.723 INFO 65216-[TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService: start initiating http request: HttpEntity (url=www.baidu.com1, params= {aqui1}, method=get) 2019-07-20 23 Vista 54VR 14.723 WARN 65216-[TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService: http Notification has failed N times Enter the timing to initiate notification, and some best practices of url=www.baidu.com1

> in formal scenarios, generally speaking, the compensation or retry mechanism will not be sent. If it happens, it is generally due to problems with the third-party business system, so generally speaking, when replenishing, it should be operated during off-peak periods. Therefore, delay listeners should be stopped during peak periods and consumed during off-peak periods. At the same time, it can also be put into different delay queues according to different notification types to ensure the normal service. Here is a brief description of how to dynamically stop or start the demo listener. Generally speaking, the delay listener is obtained by using the RabbitListenerEndpointRegistry object, and then dynamically stopped or enabled. You can set the id attribute of @ RabbitListener to obtain it directly. Of course, you can also directly obtain all listeners and make custom judgments.

@ Autowired RabbitListenerEndpointRegistry registry; @ GetMapping ("/ set") @ ApiOperation (value = "set", notes = "set the status of the message listener") public String setSimpleMessageListenerContainer (String status) {if ("1" .equals (status)) {registry.getListenerContainer ("httpDelayMessageNotifyConsumer"). Start ();} else {registry.getListenerContainer ("httpDelayMessageNotifyConsumer"). Stop ();} return status;}

Here, it is just a simple demonstration. In a real scene, you can use a timer to determine whether the current peak period is at its peak, and then dynamically set the status of the listener.

These are all the contents of the article "how to implement message delay queuing based on RabbitMQ in SpringBoot". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report