Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement distributed deferred tasks based on rabbitmq delay plug-in

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

In this article, the editor introduces in detail "how to achieve distributed deferred tasks based on the rabbitmq deferred plug-in". The content is detailed, the steps are clear, and the details are handled properly. I hope this article "how to achieve distributed deferred tasks based on the rabbitmq deferred plug-in" can help you solve your doubts.

I. usage scenarios of deferred tasks

1. The order was issued successfully and was not paid for 30 minutes. Payment timeout and automatic cancellation of order

2. the order is signed and received, and no evaluation is made 7 days after signing. The order is timed out and not evaluated, and the system is praised by default.

3. The order was issued successfully, the merchant did not receive the order for 5 minutes, and the order was cancelled.

4. Push SMS to remind you when delivery times out.

5. Three-day probationary period for members. After the expiration of three days, users will be notified on time that the trial product has expired.

.

For the scenes with long delay and low real-time performance, we can use the way of task scheduling to poll regularly. Such as: xxl-job.

Today, we will talk about the implementation of delay queues. There are many ways to implement delay queues, such as:

1. For example, RabbitMQ-based queue ttl+ dead-letter routing policy: by setting the timeout and unconsumed time of a queue, in conjunction with the dead-letter routing policy, when the arrival time is not consumed, the message will be routed to the specified queue.

two。 RabbitMQ-based delay queue plug-in (rabbitmq-delayed-message-exchange): the delay queue can be achieved by adding a delay parameter (headers.put ("x-delay", 5000)) to the request header when sending messages. (by the way, Aliyun's paid version of rabbitMQ currently supports delayed messages within one day.) limitation: the current design of the plug-in is not really suitable for scenarios that contain a large number of delayed messages (such as hundreds of thousands or millions). For details, see # / issues/72 another source of variability of the plug-in is dependent on Erlang timers, after a certain number of long-time timers are used in the system. They begin to compete for scheduler resources.

3. Use the zset ordering of redis to poll each element in zset, and then migrate the content to the queue to be consumed (redisson has been implemented)

4. Use the expiration notification policy of redis's key to set the expiration time of a key as the delay time, and notify the client after expiration (this method relies on the redis expiration check mechanism key for more delay; the pubsub of Redis will not be persisted, and the server will be discarded if it goes down).

II. Installation of components

The installation of rabbitMQ depends on the erlang locale, so we need to download the environment installer for erlang. There are many installation tutorials on the Internet, which will not be repeated here. It is important to note that the versions supported by the deferred plug-in match.

Plug-in Git official address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

When you successfully install the plug-in and run the rabbitmq management background, you can see that this option is added to the type type in the new exchange.

Delay queue implementation of RabbitMQ delay queue plug-in 1. Basic principles

Through the switch declared by x-delayed-message, its messages will not be queued immediately after they are released, but will be saved to Mnesia (a distributed database management system), which is suitable for telecom and other Erlang applications that need to run continuously and have soft real-time features. At present, there is not a lot of information.

The plug-in will try to confirm whether the message has expired. First, make sure that the delay range of the message is Delay > 0, Delay =

< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。 2、核心组件开发走起 引入maven依赖 org.springframework.boot spring-boot-starter-amqp application.yml简单配置 rabbitmq: host: localhost port: 5672 virtual-host: / RabbitMqConfig配置文件 package com.example.code.bot_monomer.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.ExchangeBuilder;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @author: shf description: date: 2022/1/5 15:00 */@Configurationpublic class RabbitMQConfig { /** * 普通 */ public static final String EXCHANGE_NAME = "test_exchange"; public static final String QUEUE_NAME = "test001_queue"; public static final String NEW_QUEUE_NAME = "test002_queue"; /** * 延迟 */ public static final String DELAY_EXCHANGE_NAME = "delay_exchange"; public static final String DELAY_QUEUE_NAME = "delay001_queue"; public static final String DELAY_QUEUE_ROUT_KEY = "key001_delay"; //由于阿里rabbitmq增加队列要额外收费,现改为各业务延迟任务共同使用一个queue:delay001_queue //public static final String NEW_DELAY_QUEUE_NAME = "delay002_queue"; @Bean public CustomExchange delayMessageExchange() { Map args = new HashMap(); args.put("x-delayed-type", "direct"); //自定义交换机 return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Queue delayMessageQueue() { return new Queue(DELAY_QUEUE_NAME, true, false, false); } @Bean public Binding bindingDelayExchangeAndQueue(Queue delayMessageQueue, Exchange delayMessageExchange) { return new Binding(DELAY_QUEUE_NAME, Binding.DestinationType.QUEUE, DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUT_KEY, null); //return BindingBuilder.bind(delayMessageQueue).to(delayMessageExchange).with("key001_delay").noargs(); } /** * 交换机 */ @Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); //return new TopicExchange(EXCHANGE_NAME, true, false); } /** * 队列 */ @Bean public Queue orderQueue() { //return QueueBuilder.durable(QUEUE_NAME).build(); return new Queue(QUEUE_NAME, true, false, false, null); } /** * 队列 */ @Bean public Queue orderQueue1() { //return QueueBuilder.durable(NEW_QUEUE_NAME).build(); return new Queue(NEW_QUEUE_NAME, true, false, false, null); } /** * 交换机和队列绑定关系 */ @Bean public Binding orderBinding(Queue orderQueue, Exchange orderExchange) { //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs(); return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null); } /** * 交换机和队列绑定关系 */ @Bean public Binding orderBinding1(Queue orderQueue1, Exchange orderExchange) { //return BindingBuilder.bind(queue).to(exchange).with("#.delay").noargs(); return new Binding(NEW_QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "test001_common", null); }} MqDelayQueueEnum枚举类 package com.example.code.bot_monomer.enums;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;/** * @author: shf description: 延迟队列业务枚举类 * date: 2021/8/27 14:03 */@Getter@NoArgsConstructor@AllArgsConstructorpublic enum MqDelayQueueEnum { /** * 业务0001 */ YW0001("yw0001", "测试0001", "yw0001"), /** * 业务0002 */ YW0002("yw0002", "测试0002", "yw0002"); /** * 延迟队列业务区分唯一Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean 可通过 Spring 的上下文获取 */ private String beanId; public static String getBeanIdByCode(String code) { for (MqDelayQueueEnum queueEnum : MqDelayQueueEnum.values()) { if (queueEnum.code.equals(code)) { return queueEnum.beanId; } } return null; }} 模板接口处理类:MqDelayQueueHandle package com.example.code.bot_monomer.service.mqDelayQueue;/** * @author: shf description: RabbitMQ延迟队列方案处理接口 * date: 2022/1/10 10:46 */public interface MqDelayQueueHandle { void execute(T t);} 具体业务实现处理类 @Slf4j@Component("yw0001")public class MqTaskHandle01 implements MqDelayQueueHandle { @Override public void execute(String s) { log.info("MqTaskHandle01.param=[{}]",s); //TODO }} 注意:@Component("yw0001") 要和业务枚举类MqDelayQueueEnum中对应的beanId保持一致。 统一消息体封装类 /** * @author: shf description: date: 2022/1/10 10:51 */@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic class MqDelayMsg { /** * 业务区分唯一key */ @NonNull String businessCode; /** * 消息内容 */ @NonNull T content;} 统一消费分发处理Consumer package com.example.code.bot_monomer.service.mqConsumer;import com.alibaba.fastjson.JSONObject;import com.example.code.bot_monomer.config.common.MqDelayMsg;import com.example.code.bot_monomer.enums.MqDelayQueueEnum;import com.example.code.bot_monomer.service.mqDelayQueue.MqDelayQueueHandle;import org.apache.commons.lang3.StringUtils;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;/** * @author: shf description: date: 2022/1/5 15:12 */@Slf4j@Component//@RabbitListener(queues = "test001_queue")@RabbitListener(queues = "delay001_queue")public class TestConsumer { @Autowired ApplicationContext context; /** * RabbitHandler 会自动匹配 消息类型(消息自动确认) * * @param msgStr * @param message */ @RabbitHandler public void taskHandle(String msgStr, Message message) { try { MqDelayMsg msg = JSONObject.parseObject(msgStr, MqDelayMsg.class); log.info("TestConsumer.taskHandle:businessCode=[{}],deliveryTag=[{}]", msg.getBusinessCode(), message.getMessageProperties().getDeliveryTag()); String beanId = MqDelayQueueEnum.getBeanIdByCode(msg.getBusinessCode()); if (StringUtils.isNotBlank(beanId)) { MqDelayQueueHandle handle = (MqDelayQueueHandle) context.getBean(beanId); handle.execute(msg.getContent()); } else { log.warn("TestConsumer.taskHandle:MQ延迟任务不存在的beanId,businessCode=[{}]", msg.getBusinessCode()); } } catch (Exception e) { log.error("TestConsumer.taskHandle:MQ延迟任务Handle异常:", e); } }} 最后简单封装个工具类 package com.example.code.bot_monomer.utils;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.example.code.bot_monomer.config.RabbitMQConfig;import com.example.code.bot_monomer.config.common.MqDelayMsg;import org.apache.commons.lang3.StringUtils;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.lang.NonNull;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import java.time.temporal.ChronoUnit;import java.util.Objects;import lombok.extern.slf4j.Slf4j;/** * @author: shf description: MQ分布式延迟队列工具类 date: 2022/1/10 15:20 */@Slf4j@Componentpublic class MqDelayQueueUtil { @Autowired private RabbitTemplate template; @Value("${mqdelaytask.limit.days:2}") private Integer mqDelayLimitDays; /** * 添加延迟任务 * * @param bindId 业务绑定ID,用于关联具体消息 * @param businessCode 业务区分唯一标识 * @param content 消息内容 * @param delayTime 设置的延迟时间 单位毫秒 * @return 成功true;失败false */ public boolean addDelayQueueTask(@NonNull String bindId, @NonNull String businessCode, @NonNull Object content, @NonNull Long delayTime) { log.info("MqDelayQueueUtil.addDelayQueueTask:bindId={},businessCode={},delayTime={},content={}", bindId, businessCode, delayTime, JSON.toJSONString(content)); if (StringUtils.isAnyBlank(bindId, businessCode) || Objects.isNull(content) || Objects.isNull(delayTime)) { return false; } try { //TODO 延时时间大于2天的先加入数据库表记录,后由定时任务每天拉取2次将低于2天的延迟记录放入MQ中等待到期执行 if (ChronoUnit.DAYS.between(LocalDateTime.now(), LocalDateTime.now().plus(delayTime, ChronoUnit.MILLIS)) >

= mqDelayLimitDays) {/ / TODO} else {this.template.convertAndSend (RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUT_KEY, JSONObject.toJSONString (MqDelayMsg.builder (). BusinessCode (businessCode) .content (content) .build ()) Message-> {/ / Note the long type can be used here, in millisecond units. Set header message.getMessageProperties () .setHeader ("x-delay", delayTime) Return message;});} catch (Exception e) {log.error ("MqDelayQueueUtil.addDelayQueueTask:bindId= {} businessCode= {} exception:", bindId, businessCode, e); return false;} return true } / * * undo delay message * @ param bindId service binding ID, used to associate specific message * @ param businessCode business differentiation unique identifier * @ return success true; failed false * / public boolean cancelDelayQueueTask (@ NonNull String bindId, @ NonNull String businessCode) {if (StringUtils.isAnyBlank (bindId,businessCode)) {return false } try {/ / TODO query DB, if the message still exists, delete} catch (Exception e) {log.error ("MqDelayQueueUtil.cancelDelayQueueTask:bindId= {} businessCode= {} exception:", bindId, businessCode, e); return false;} return true } / * modify delay message * @ param bindId service binding ID, which is used to associate specific message * @ param businessCode business differentiation unique identifier * @ param content message content * @ param delayTime set delay time unit * @ return successful true Failed false * / public boolean updateDelayQueueTask (@ NonNull String bindId, @ NonNull String businessCode, @ NonNull Object content, @ NonNull Long delayTime) {if (StringUtils.isAnyBlank (bindId, businessCode) | | Objects.isNull (content) | | Objects.isNull (delayTime)) {return false } try {/ / TODO query DB, there is no false returned for the message, and there are database table records that are added to the database for a long time when the delay is judged or the delay time for entering mq / / TODO is more than 2 days. Later, the scheduled task pulls the delay record less than 2 days twice a day into MQ to wait for the expiration of if (ChronoUnit.DAYS.between (LocalDateTime.now (), LocalDateTime.now (). Plus (delayTime, ChronoUnit.MILLIS)) > = mqDelayLimitDays) {/ / TODO} else {this.template.convertAndSend (RabbitMQConfig.DELAY_EXCHANGE_NAME) RabbitMQConfig.DELAY_QUEUE_ROUT_KEY, JSONObject.toJSONString (MqDelayMsg.builder (). BusinessCode (businessCode) .content (content) .build ()), message-> {/ / Note that the time can be of long type, millisecond unit Set header message.getMessageProperties () .setHeader ("x-delay", delayTime) Return message;});} catch (Exception e) {log.error ("MqDelayQueueUtil.updateDelayQueueTask:bindId= {} businessCode= {} exception:", bindId, businessCode, e); return false;} return true;}}

Attach the test class:

/ * description: delay queue Test * * @ author: shf date: 14:18 on 2021-8-27 * / @ RestController@RequestMapping ("/ mq") @ Slf4jpublic class MqQueueController {@ Autowired private MqDelayQueueUtil mqDelayUtil; @ PostMapping ("/ addQueue") public String addQueue () {mqDelayUtil.addDelayQueueTask ("00001", MqDelayQueueEnum.YW0001.getCode (), "delay0001 Test", 3000L); return "SUCCESS";}}

Paste the field settings of the DB record table

Cooperate with xxl-job scheduled tasks.

As the delivered message cannot be modified, you should be careful to set the delayed message! And need to cooperate with the business side, such as: the delay time is less than 2 days (the number of days of this time can be adjusted, you can also set the threshold unit to hours, depending on business requirements) messages do not support modification and cancellation. Delayed messages beyond 2 days support revocation and modification. It is important to note that you need to bind and associate the specific operation business unique ID ID to revoke or modify the associated operation. (PS: if the delay time is set beyond 2 days, it will first be saved to the DB record table, which will be pulled by the scheduled task every day and released to the delayed alignment within 2 days.)

At a more stable point, in order to prevent inconsistencies caused by operation time errors in messages entering DB records, you can query the DB record table before consuming unified Consumer consumption and distribution to see whether the message has been revoked and deleted (adding a delete mark field record), and the current time is greater than or equal to the expiration execution time recorded in the DB table before it can be distributed for execution, otherwise it can be discarded.

After reading this, the article "how to implement distributed deferred tasks based on rabbitmq delay plug-in" has been introduced. If you want to master the knowledge points of this article, you still need to practice and use it yourself. If you want to know more about related articles, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report