Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize distributed delay message in Database

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "how to achieve distributed delay messages in the database". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Local delay

Before implementing delayed messages for distributed message queues, let's think about how we usually implement some latency functions on our own applications. In Java, we can accomplish our delay function in the following ways:

ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor inherits ThreadPoolExecutor. When we submit tasks, we will first submit them to a priority queue in DelayedWorkQueue and sort them according to the expiration time. This priority queue is our heap structure, and the complexity of sorting tasks submitted each time is O (logN). Then when we take the task, we will take our task from the top of the stack, that is, the task with the least delay. One advantage of ScheduledThreadPoolExecutor is that execution latency tasks can support multithreaded parallel execution because it inherits from ThreadPoolExecutor.

Timer:Timer is also done using the priority queue structure, but it has no inherited thread pool, is relatively independent, does not support multithreading, and can only use a single thread.

Distributed message queuing delay

It is relatively simple for us to achieve local delay, just use the ready-made ones in Java, so what are the difficulties in the implementation of distributed message queues?

There are many students will first think of us to achieve distributed message queue delay task, whether we can directly use the local set, use ScheduledThreadPoolExecutor,Timer, of course, this is possible, the premise is your message volume is very small, but our distributed message queue is often enterprise-level middleware, the amount of data is very large, then our pure memory solution is certainly not feasible. So we have the following solutions to solve our problem.

1 database

Generally speaking, a database is an easy way for us to think of, and we can usually create a table like this:

CREATE TABLE `delay_ message` (

`id` bigint (20) unsigned NOT NULL AUTO_INCREMENT

`excute_ time`bigint (16) DEFAULT NULL COMMENT 'execution time, ms level'

`body`varchar (4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'message body'

PRIMARY KEY (`id`)

KEY `time_ index` (`excute_ time`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

In this table, we use excute_time to represent our real execution time and index it, and then in our message service, we start a scheduled task to regularly scan the executable messages from the database, and then start the execution, as shown below:

The method of using the database is a relatively primitive method, which is usually used to do the function of how many minutes an order expires before the concept of delayed message. And this method is usually limited to our single business, if we want to expand to our enterprise-level middleware, it is not possible, because mysql due to the characteristics of BTree, as the maintenance of the secondary index will become more and more expensive, resulting in slower and slower writes, so this scheme is usually not considered.

RocksDB/LevelDB

We previously introduced that RocketMQ has only implemented 18 Level delay messages in the open source version, but many companies have made their own set of delay messages based on RocketMQ to support any time delay messages. Meituan encapsulated RocketMQ using LevelDB to encapsulate delay messages, and in Didi's open source DDMQ, RocksDB was used to encapsulate the delay message part of RocketMQ.

Its principle is basically similar to that of Mysql, as shown in the following figure:

Step1: when DDMQ sends a message, it has a proxy layer to distribute the message, because there are multiple message queues, kafka,rocketMQ, and so on. If it is a delayed message, it will send the message to RockesDB's storage.

Step2: the data is forwarded and delivered to the RocketMQ cluster through scheduled task rotation scanning.

Step3: consumers spend.

Why is the same database RocksDB more appropriate than Mysql? Because the characteristic of RocksDB is LSM tree, its usage scenario is suitable for a large number of writes, which is more consistent with the message queue scenario, so it is also selected by Didi and Meituan as the storage medium for delayed message encapsulation.

2 time wheel + disk storage

Before the time round, let's go back to ScheduledThreadPoolExecutor and Timer that we used to implement local delay. They are all done using priority queues, which are essentially heap structures, and the time complexity of heap structure insertion is O (LogN). If our memory is unlimited in the future, we use priority queues to store delayed messages, but with the increase of messages. Our message insertion efficiency will also become less and less efficient, so how can we make our message insertion efficiency not become less efficient as the number of messages increases? The answer is the time wheel.

What is the time wheel? In fact, we can simply think of it as a multidimensional array. In many frameworks, the time wheel is used to do some scheduled tasks to replace our Timer. For example, I mentioned an article about the local cache Caffeine. In Caffeine, it is a two-tier time wheel, that is, a two-dimensional array, whose one-dimensional data represents larger time dimensions such as seconds, minutes, hours, days, etc., and its two-dimensional data represents a smaller time dimension, such as an interval within a second. When a TimeWhile [I] [j] is located, its data structure is actually a linked list that records our Node. In Caffeine, we use the time wheel to record the data that we have expired at a certain time, and then process it.

Because the time wheel is an array structure, its insertion complexity is O (1). After we have solved the efficiency, but our memory is still not unlimited, how do we use the time wheel? Of course, the answer is disk. Time wheel + disk storage has been implemented in Qunar's open source QMQ. Here, in order to describe how to convert it into the structure in RocketMQ, the implementation figure is as follows:

Step 1: the producer sends a delayed message to CommitLog, which uses the trick of secretly changing Topic to achieve the later effect.

Step 2: there is a Reput task in the background that is pulled regularly to delay the Message related to Topic.

Step 3: determine whether the Message is in the current time round, if not, then come to Step4, and if so, deliver the message directly into the time round.

Step 4: find the scheduleLog to which the current message belongs, and then write it in. Qunar is divided into one hour by default, which can be adjusted according to the business.

Step 5: the time wheel will periodically preload the scheduleLog for the next time period into memory.

Step 6: messages arriving at the point will be restored to topic and delivered to CommitLog again. If the delivery is successful, dispatchLog will be recorded here. The reason for recording is that because the time wheel is in memory, you don't know where it has been executed. If you hang up at the last 1s clock, all the data before this time round will have to be reloaded. This is used to filter messages that have been delivered.

Personally, I think the time wheel + disk storage is a little more orthodox than the above RocksDB. It can be completed without relying on other middleware, and the availability is naturally higher. Of course, it is possible to implement the two solutions of Aliyun's RocketMQ.

3 redis

There are also many companies in the community use Redis to do delay messages, in Redis there is a data structure is Zest, that is, ordered set, it can achieve a function similar to our priority queue, it is also a heap structure, so the complexity of the insertion algorithm is still O (logN), but because Redis is fast enough, this piece can be ignored. This benchmark, which is not compared, is just a guess. Some students will ask, redis is not the pure memory of k _ memory v, the same should also be subject to memory limitations ah, why would you choose him?

In fact, in this scenario, Redis is easy to scale horizontally when a Redis does not have enough memory, and two or more can be used here to meet our needs. The schematic diagram of the redis delay message (originally from: https://www.cnblogs.com/lylife/p/7881950.html) is as follows:

Delayed Messages Pool: Redis Hash structure, key is message ID,value is specific message, of course, disk or database can be used instead. The contents of all our messages are mainly stored here.

Delayed Queue: ZSET data structure, value is message ID,score is execution time, where Delayed Queue can be scaled horizontally to increase the amount of data we can support.

Worker Thread Pool: there are multiple Worker, which can be deployed on multiple machines to form a cluster, and all the Worker in the cluster are coordinated through ZK to allocate Delayed Queue.

How do we know that the message in Delayed Queue has expired? There are two ways:

Each Worker is scanned regularly, and the minimum execution time of the ZSET is taken out if it arrives. This method wastes resources when there are few messages. When there are a lot of messages, the delay time is not accurate due to the lack of timely training.

Because the first method has many problems, so here we draw lessons from some ideas in Timer. Through wait-notify, we can achieve a better delay effect, and resources will not be wasted. The first time is to get the minimum time in ZSET, and then wait (execution time-current time), so that there is no need to waste resources arriving time will automatically respond. If there is a new message coming in from the current ZSET, and it is smaller than the message we are waiting for, then notify wakes up directly, retrieves the smaller message, and then wait, and so on.

This is the end of the content of "how to implement distributed delay messages in the database". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report