Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deliver Apache Pulsar delayed messages

2025-04-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

How to carry out Apache Pulsar delay message delivery, I believe that many inexperienced people do not know what to do. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Introduction

Apache Pulsar is a multi-tenant, high-performance inter-service message transmission solution, which supports multi-tenancy, low latency, read-write separation, cross-region replication, rapid expansion, flexible fault tolerance and so on. The MQ team of Tencent data platform has done in-depth research on Pulsar and a large number of improvements in performance and stability, and has been launched in Tencent Cloud message queue TDMQ. This article mainly introduces the implementation of Pulsar delayed message delivery, and hopes to communicate with you.

What is delayed message delivery

Delayed message delivery is very common in MQ application scenarios. It means that a message is not delivered immediately after it is sent to the MQ server, but is delivered to the consumer after a fixed time according to the attributes in the message. It is generally divided into two types: timing message and delayed message:

Timed message: Producer sends the message to the MQ server, but does not expect the message to be delivered immediately, but is postponed to be delivered to Consumer for consumption at a certain time after the current point in time.

Delayed message: Producer sends the message to the MQ server, but does not expect the message to be delivered immediately, but is delayed for a certain period of time before it is delivered to Consumer for consumption.

In the industry, Tencent Cloud's CMQ and Aliyun's RocketMQ also support delayed message delivery:

CMQ: the message delay period is defined as "flight status". You can configure the delay range by setting DelaySeconds. The value range is 0-3600 seconds, that is, the maximum invisible duration of the message is 1 hour.

RocketMQ: open source version delay messages are temporarily stored in an internal topic, which supports specific level, such as timing 5sMagne10sMagne1m, etc. The commercial version supports arbitrary time precision.

Open source NSQ, RabbitMQ, ActiveMQ, and Pulsar also have built-in processing capabilities for deferred messages. Although each MQ project is used and implemented differently, the core implementation idea is the same: Producer sends a delayed message to a Topic, Broker puts the delayed message in temporary storage for temporary storage, and the delay tracking service (Delayed Tracker Service) checks whether the message expires and delivers the expired message.

Second, the use scenario of delayed message delivery

Delay message delivery is to suspend the processing of the current message and trigger delivery at a certain time in the future. There are many practical application scenarios, such as exception detection retry, order timeout cancellation, reservation reminder, and so on.

The service request is abnormal. You need to put the exception request in a separate queue and try again every 5 minutes.

The user buys the goods, but has not been paid all the time. The user needs to be reminded to pay regularly, and the order will be closed when the time is out.

Make an appointment for an interview or meeting, send a notice half an hour before the start of the interview or meeting to remind you again

TDMQ recently has a Case that uses Pulsar delay messages: the business needs to associate the log messages of two systems. One of the systems may time out or fail to query Hbase, so the failed associated tasks need to be scheduled again when the cluster is idle.

Third, how to use Pulsar to delay message delivery

Pulsar first introduced the feature of delayed message delivery in 2.4.0. By using delayed message in Pulsar, you can accurately specify the time of delayed delivery. There are two ways: deliverAfter and deliverAt. Where deliverAt can specify a specific timest deliverAfter can specify how long after the current execution. The essence of the two methods is the same, and the Client accountant calculates the timestamp and sends it to Broker.

1. Send by deliverAfter

Producer.newMessage () .deliverAfter (long time, TimeUnit unit) .send ()

2. Send by deliverAt

Producer.newMessage () .deliverAt (long timestamp) .send ()

In Pulsar, you can support long-span delayed messages, such as one month or half a year; at the same time, in a Topic, both delayed messages and non-delayed messages are supported. The following figure shows the specific process of delaying messages in Pulsar:

The m1/m3/m4/m5 sent by producer has different delay time. M2 is a normal message that does not need to be delayed. When consumer is consumed, ack will be performed according to different delay time.

Fourth, the implementation principle of Pulsar delayed message delivery.

As can be seen from the above usage, Pulsar supports second-precision delay message delivery, which is different from the delay of fixed-time level supported by open source RocketMQ.

It is relatively simple for Pulsar to implement delayed message delivery. All delayed messages will be recorded by Delayed Message Tracker with their corresponding index. Index consists of three parts: timestamp | LedgerID | EntryID, where LedgerID | EntryID is used to locate the message. Timestamp is used not only to record the delivery time, but also to sort the delayed index priority queue.

Delayed Message Tracker maintains a delayed index priority queue in out-of-heap memory and sorts the heap according to the delay time. The shortest delay time is placed on the head, and the longer the delay time is, the later it is. When consuming, consumer will first go to Delayed Message Tracker to check whether there is a message that needs to be delivered when it expires. If there is an expired message, take the corresponding index from the Tracker and find the corresponding message for consumption. If there is no expired message, then consume the normal message directly.

If there is a Broker outage or ownership transfer of topic in the cluster, Pulsar will rebuild the delayed index queue to ensure that delayed messages can work properly.

V. challenges faced by Pulsar in delaying message delivery

From the implementation principle of delayed message delivery in Pulsar, we can see that this method is simple and efficient, has less intrusiveness to the Pulsar kernel, and can support delayed messages at any time. But at the same time, it is found that the implementation of Pulsar cannot support the large-scale use of delayed messages for the following two main reasons:

1. The delayed index queue is limited by memory.

The delayed index of a deferred message consists of three long, which is not expensive for small-scale deferred messages. However, because index queues are subscription-level, for the same partition of topic, there are as many index queues as there are subscription; at the same time, the more messages are delayed and the longer the delay is, the more memory will be consumed by index queues.

2. Delayed index queue reconstruction time cost

As mentioned above, if there is a Broker outage or ownership transfer of topic in the cluster, Pulsar will rebuild the delayed index queue. For large-scale delayed messages with a long span, the rebuild time may reach the hour level. In order to reduce the reconstruction time of delayed index queue, although more partition can be allocated to topic to improve the concurrency of reconstruction, it does not completely solve the problem of reconstruction time and cost.

VI. Pulsar delays the delivery of messages in the future

Pulsar's current delayed message delivery scheme is simple and efficient, but there are still risks when dealing with large-scale delayed messages. With regard to delayed message delivery, the next step of the Community and data platform MQ team will focus on supporting large-scale delayed messages. The solution under discussion is to add a time partition to the delayed index queue. Broker only loads the current recent time slice delayed index into memory, and the rest of the time slice partition persists the disk. The example figure is shown below:

In the figure above, we partition the delayed index queue at intervals of 5 minutes. M5 and M1 are placed in time partition 1, which is in memory because of the nearest delay time. M4 and m3 are in time partition 2, and index is stored on disk. This scheme can not only reduce the time cost of delayed index queue reconstruction, but also reduce the dependence on memory.

After reading the above, have you mastered how to delay the delivery of Apache Pulsar messages? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report