Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Redis delay queue

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

In this issue, the editor will bring you about how to achieve Redis delay queue. The article is rich in content and analyzes and describes for you from a professional point of view. I hope you can get something after reading this article.

Delay queue, as its name implies, is a message queue with delay function. So, in what scenario do I need such a queue?

1. Background

Let's take a look at the following business scenarios:

When the order has been unpaid, how to close the order in time? how to check periodically whether the order in the refund state has been refunded successfully? when the order has not been notified of the status of the downstream system for a long time, how to implement the ladder strategy of synchronizing the order status when the system notifies the upstream system that the payment is successful, the upstream system returns the notification fails. How to send asynchronous notification at different frequencies: 15s 3m 10m 30m 30m 1h 2h 6h 15h

1.1 solution

The easiest way is to scan the meter regularly. For example, for the order payment failure requirements are relatively high, every 2S scan the table to check the expired order for active order closing operation. The advantage is simple, but the disadvantage is that the global scan of the table every minute is a waste of resources. If you encounter a large amount of orders whose table data orders are about to expire, it will cause order delay.

Using RabbitMq or other MQ to modify the implementation of delay queue, the advantage is open source, ready-made stable implementation, the disadvantage is: MQ is a message middleware, if the team technology stack already has MQ, that's fine, if not, then it is a bit expensive to deploy a set of MQ to delay the queue.

Using the zset and list features of Redis, we can use redis to implement a delay queue RedisDelayQueue

two。 Design goal

Real-time: allow a certain amount of second error high availability: support stand-alone, support cluster support message deletion: business will delete specified message reliability at any time: guarantee to be consumed at least once message persistence: based on Redis's own persistence feature, if Redis data is lost, it means delayed message loss, but it can be used as master / slave and cluster guarantee. This can be considered for subsequent optimization to persist messages to MangoDB

3. Design scheme

The design mainly includes the following points:

Use the entire Redis as a message pool, store messages as KV, use ZSET as the priority queue, use LIST structure according to Score maintenance priority, consume ZSET and LIST storage message addresses (each KEY of the message pool) on a first-in-first-out basis, customize routing objects, store ZSET and LIST names, route messages from ZSET to the correct LIST in a point-to-point manner, use timer maintenance routes to achieve message latency according to TTL rules

3.1 Design drawings

Or based on the like delay queue design, optimization and code implementation. Like the design.

3.2 data structure

ZING:DELAY_QUEUE:JOB_POOL is a Hash_Table structure that stores information about all delay queues. KV structure: K=prefix+projectName field = topic+jobId data transferred from the client. When consuming, send back the ordered collection of ZING:DELAY_QUEUE:BUCKET delay queues ZSET, store K=ID and the required execution timestamp, sort the ZING:DELAY_QUEUE:QUEUE LIST structure according to the timestamp, and each Topic stores the JOB that needs to be consumed.

The picture is for reference only and can basically describe the execution of the whole process. The picture comes from the reference blog at the end of the article.

3.3 Lifecycle of the task

Add a new JOB, which will insert a piece of data into the ZING:DELAY_QUEUE:JOB_POOL to record the business side and consumer side. ZING:DELAY_QUEUE:BUCKET will also insert a record, and the record execution timestamp handling thread will go to ZING:DELAY_QUEUE:BUCKET to find out which timestamp execution RunTimeMillis is smaller than the current time, and delete all these records. At the same time, it parses what the Topic of each task is, and then PUSH these tasks to the list corresponding to TOPIC. In the list ZING:DELAY_QUEUE:QUEUE corresponding to TOPIC, each LIST of TOPIC will have a listening thread to obtain the data to be consumed in batch in LIST, and all the obtained data will be thrown to the consumer thread pool of the TOPIC. The consumer thread pool execution will go to ZING:DELAY_QUEUE:JOB_POOL to look up the data structure, return it to the callback structure, and execute the callback method.

3.4 Design essentials

3.4.1 basic concepts

JOB: tasks that need to be processed asynchronously are the basic unit Topic in the delay queue: a set of Job of the same type (queue). For consumers to subscribe

3.4.2 message structure

Each JOB must contain the following attributes

Unique identification of the jobId:Job. Used to retrieve and delete the specified Job information topic:Job type. It can be understood as the delay required by the specific business name delay:Job. Unit: seconds. (the server will convert it to absolute time) body:Job content for consumers to do specific business processing, and store retry in json format: number of failed retries url: notify URL

3.5 Design details

3.5.1 how to consume ZING:DELAY_QUEUE:QUEUE quickly

The simplest way to implement this is to scan in seconds using a timer. In order to ensure the timeliness of message execution, you can set a Redis per 1s request to determine whether there is a JOB to be consumed in the queue. But there will be a problem. If there is no consumable JOB in queue, frequent scanning is meaningless and a waste of resources. Fortunately, there is a BLPOP blocking primitive in LIST. If there is data in list, it will be returned immediately. If there is no data, it will remain blocked there until data is returned. You can set the blocking timeout, and the timeout will return NULL. The specific implementation methods and strategies will be introduced in the code.

3.5.2 avoid repeated message handling and consumption caused by timing

Redis's distributed locks are used to control message handling, so as to avoid problems caused by repeated message handling. Distributed locks are used to ensure the execution frequency of timers.

4. Core code implementation

4.1 Technical Note

Technology stack: SpringBoot,Redisson,Redis, distributed lock, timer

Note: this project does not realize the multi-Queue consumption in the design solution, only one QUEUE is enabled, which will be optimized later.

4.2 Core entity

4.2.1 New objects for Job

/ * * message structure * * @ author look at the world * @ date January 15, 2020 * / @ Datapublic class Job implements Serializable {private static final long serialVersionUID = 1L; / * the unique logo of Job. Used to retrieve and delete the specified Job information * / @ NotBlank private String jobId; / * Job type. It can be understood as the delay required for the specific business name * / @ NotBlank private String topic; / * Job. Unit: seconds. (the server will convert it to absolute time) * / private Long delay; / * * Job content for consumers to do specific business processing, and store * / @ NotBlank private String body; / * * failed retries * / private int retry = 0 in json format; / * * notify URL * / @ NotBlank private String url;}

4.2.2 Job Delete object

/ * * message structure * * @ author look at the world * @ date January 15, 2020 * / @ Datapublic class JobDie implements Serializable {private static final long serialVersionUID = 1L; / * the unique logo of Job. Used to retrieve and delete the specified Job information * / @ NotBlank private String jobId; / * Job type. Can be understood as a specific business name * / @ NotBlank private String topic;}

4.3 handling thread

/ * * * handling thread * * @ author open eyes to see the world * @ date 17 January 2020 * / @ Slf4j@Componentpublic class CarryJobScheduled {@ Autowired private RedissonClient redissonClient; / * * * start timing start JOB message * / @ Scheduled (cron = "* / 1 *") public void carryJobToQueue () {System.out.println ("carryJobToQueue->") RLock lock = redissonClient.getLock (RedisQueueKey.CARRY_THREAD_LOCK); try {boolean lockFlag = lock.tryLock (LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (! lockFlag) {throw new BusinessException (ErrorMessageEnum.ACQUIRE_LOCK_FAIL);} RScoredSortedSet bucketSet = redissonClient.getScoredSortedSet (RD_ZSET_BUCKET_PRE) Long now = System.currentTimeMillis (); Collection jobCollection = bucketSet.valueRange (0, false, now, true); List jobList = jobCollection.stream (). Map (String::valueOf) .requests (Collectors.toList ()); RList readyQueue = redissonClient.getList (RD_LIST_TOPIC_PRE); readyQueue.addAll (jobList); bucketSet.removeAllAsync (jobList) } catch (InterruptedException e) {log.error ("carryJobToQueue error", e);} finally {if (lock! = null) {lock.unlock ();}

4.4 consumption thread

@ Slf4j@Componentpublic class ReadyQueueContext {@ Autowired private RedissonClient redissonClient; @ Autowired private ConsumerService consumerService; / * TOPIC consumption thread * / @ PostConstruct public void startTopicConsumer () {TaskManager.doTask (this::runTopicThreads, "start TOPIC consumption thread") } / * * start TOPIC consumption thread * catch all possible exceptions and ensure that While (true) can not interrupt * / @ SuppressWarnings ("InfiniteLoopStatement") private void runTopicThreads () {while (true) {RLock lock = null; try {lock = redissonClient.getLock (CONSUMER_TOPIC_LOCK) } catch (Exception e) {log.error ("runTopicThreads getLock error", e);} try {if (lock = = null) {continue } / / distributed lock time is 1s longer than Blpop blocking time. When the lock is released, the lock has been released in timeout. Unlock error boolean lockFlag = lock.tryLock (LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (! lockFlag) {continue } / / 1. Get the data to be consumed in ReadyQueue: RBlockingQueue queue = redissonClient.getBlockingQueue (RD_LIST_TOPIC_PRE); String topicId = queue.poll (60, TimeUnit.SECONDS); if (StringUtils.isEmpty (topicId)) {continue;} / / 2. Get job meta-information content RMap jobPoolMap = redissonClient.getMap (JOB_POOL_KEY); Job job = jobPoolMap.get (topicId); / / 3. Consumption FutureTask taskResult = TaskManager.doFutureTask (()-> consumerService.consumerMessage (job.getUrl (), job.getBody ()), job.getTopic () + "- > consumption JobId-- >" + job.getJobId ()); if (taskResult.get ()) {/ / 3.1 consumption succeeded. Delete job information jobPoolMap.remove (topicId) of JobPool and DelayBucket. } else {int retrySum = job.getRetry () + 1 / / 3.2 if consumption fails, rejoin Bucket according to policy / / if the number of retries is greater than 5, delete the data in jobPool and persist it to DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry ()) {jobPoolMap.remove (topicId); continue } job.setRetry (retrySum); long nextTime = job.getDelay () + RetryStrategyEnum.getDelayTime (job.getRetry ()) * 1000; log.info ("next retryTime is [{}]", DateUtil.long2Str (nextTime)); RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet (RedisQueueKey.RD_ZSET_BUCKET_PRE) DelayBucket.add (nextTime, topicId); / / 3.3 failed to update meta information jobPoolMap.put (topicId, job);}} catch (Exception e) {log.error ("runTopicThreads error", e) } finally {if (lock! = null) {try {lock.unlock ();} catch (Exception e) {log.error ("runTopicThreads unlock error", e) }}}

4.5 add and remove JOB

/ * author why * @ date interface provided to external services * / @ Slf4j@Servicepublic class RedisDelayQueueServiceImpl implements RedisDelayQueueService {@ Autowired private RedissonClient redissonClient; / * add job meta-information * * @ param job meta-information * / @ Override public void addJob (Job job) {RLock lock = redissonClient.getLock (ADD_JOB_LOCK + job.getJobId ()) Try {boolean lockFlag = lock.tryLock (LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (! lockFlag) {throw new BusinessException (ErrorMessageEnum.ACQUIRE_LOCK_FAIL);} String topicId = RedisQueueKey.getTopicId (job.getTopic (), job.getJobId ()); / / 1. Add job to JobPool RMap jobPool = redissonClient.getMap (RedisQueueKey.JOB_POOL_KEY); if (jobPool.get (topicId)! = null) {throw new BusinessException (ErrorMessageEnum.JOB_ALREADY_EXIST);} jobPool.put (topicId, job); / / 2. Add job to DelayBucket RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet (RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add (job.getDelay (), topicId);} catch (InterruptedException e) {log.error ("addJob error", e);} finally {if (lock! = null) {lock.unlock () Delete job information * * @ param job meta information * / @ Override public void deleteJob (JobDie jobDie) {RLock lock = redissonClient.getLock (DELETE_JOB_LOCK + jobDie.getJobId ()); try {boolean lockFlag = lock.tryLock (LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS) If (! lockFlag) {throw new BusinessException (ErrorMessageEnum.ACQUIRE_LOCK_FAIL);} String topicId = RedisQueueKey.getTopicId (jobDie.getTopic (), jobDie.getJobId ()); RMap jobPool = redissonClient.getMap (RedisQueueKey.JOB_POOL_KEY); jobPool.remove (topicId); RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet (RedisQueueKey.RD_ZSET_BUCKET_PRE) DelayBucket.remove (topicId);} catch (InterruptedException e) {log.error ("addJob error", e);} finally {if (lock! = null) {lock.unlock ();}

5. Content to be optimized

Currently, there is only one Queue queue to store messages. When a large number of messages to be consumed are accumulated, the timeliness of message notification will be affected. The improved method is to enable multiple Queue for message routing, and then start multiple consumption threads for consumption, providing that the throughput message is not persisted, which is risky. Messages will be persisted to the MangoDB later.

6. Source code

For more details of the source code, please obtain it at the address below

RedisDelayQueue implements zing-delay-queue (https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue) RedissonStarter redisson-spring-boot-starter (https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter) Project Application zing-pay (https://gitee.com/whyCodeData/zing-pay)

7. Referenc

Https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764 above is how to implement the Redis delay queue shared by the editor. If you happen to have similar doubts, please refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report