Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement kafka delay message based on sqlite

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "how to achieve kafka delay message based on sqlite". In daily operation, I believe many people have doubts about how to achieve kafka delay message based on sqlite. Xiaobian consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubt of "how to achieve kafka delay message based on sqlite". Next, please follow the editor to study!

1. Demand

Delayed messages (or timed messages) are a common function point in business systems. Common business scenarios such as:

1) time-out cancellation of the order

2) recall notification for users who are offline for more than a specified period of time

3) notify the guardian after how long the cell phone disappears.

The popular implementation schemes are as follows:

1) regular polling of the database, scanning the record that reaches the delay time, business processing, and deleting the record

2) jdk comes with delay queue (DelayQueue), or optimized time round algorithm

3) redis ordered set

4) distributed message queues that support delayed messages

However, all the above schemes have various defects:

1) if the regular polling interval is small, it will put a lot of pressure on the database, and the distributed micro-service architecture is not well adapted.

2) jdk has its own delay queue, which takes up a high amount of memory, and the message will be lost when the service is restarted, so the distributed micro-service architecture is difficult to adapt.

3) the ordered collection of redis is suitable, but the memory is expensive, and the distributed micro-service architecture is not suitable.

4) currently, mainstream RocketMQ does not support delayed messages with arbitrary delay. The performance of RabbitMQ or ActiveMQ is not good enough, and it is troublesome to send and configure messages. Kafka does not support delayed messages.

Therefore, I want to implement a delayed message forwarding middleware that adapts to distributed micro-service architecture, high performance and convenient for business systems to use.

2. Realization ideas

To ensure high performance, it is recommended to use kafka or RocketMQ as a distributed message queue. Currently, kafka delay messages are implemented based on sqlite.

The current implementation idea is based on kafka, which is actually applicable to any MQ product.

2.1 overall implementation ideas

2.2 Program business logic

1) the business system first pushes the delayed message to the unified delayed message queue

2) regularly read the delayed messages of the delayed message queue, save them locally, and submit the offset

3) regularly scan the messages that reach the delay period locally and forward them to the actual business message queue

4) Delete local delay messages

2.3 implementation details

1) A business process uses a sqlite database file, which can be executed concurrently to improve performance.

2) use snowflake algorithm to generate id.

3) when there is no delayed message, the thread sleeps for a certain period of time to reduce the pressure on the kafka cluster and the local io.

4) sqlite is used for local storage.

2.4 dependency Framework

1) kafka-client

2) sqlite

3) slf4j+log4j2

4) jackson

3. Performance test

Test machine: i5-6500, 16GB memory, mechanical hard disk

Delay message size: 1kb

Concurrent processing: 1

Has been tested simply locally, performance performance:

1) one concurrent processing can store, forward and delete about 15000 delayed messages in one second, and two can reach 30000 messages / s.

2) the processing of 10,000 records at one time is the appropriate batch size obtained after many comparative tests.

The performance of two other local storage schemes was also tested:

1) directly save and read json files with poor read and write performance (about 1200 records / s, slow frequent creation, opening and closing files, random disk io)

2) RocksDB has very good read and write performance (97000 records / s), but the filtering delay message performance is too poor. When the amount of data is greater than 100w, the performance is not as good as sqlite, and the runtime takes up a lot of memory and cpu resources.

4. Deployment 4.1 system environment dependence

1) jdk 1.8

2) kafka 1.1.0

You can replace it with a jar package that matches the actual kafka version (there will be no conflict. If the jar package version is inconsistent with the kafka service version, there may be exceptions [unable to pull messages, failed to submit, etc.]).

Kafka_version in pom.xml can be modified

1.1.0

Just repackage it. The current program can be deployed independently and is non-intrusive to existing projects.

4.2 installation

1) after performing maven packaging in the root directory of the project, the dev_ops file will be generated

2) execute java-jar kafka_delay_sqlite-20220102.jar in the dev_ops directory to start the program

3) if you need to modify the configuration, you can create a kafka.properties file in the dev_ops directory and set a custom configuration

The default configuration is as follows:

# kafka connects to url [ip:port,ip:port …] Kafka.url=127.0.0.1:9092# delay message local storage path, it is recommended to use the absolute value kafka.delay.store.path=/data/kafka_delay# uniform delay message topickafka.delay.topic=common_delay_msg# consumer group idkafka.delay.group.id=common_delay_app# concurrent processing number. Restriction: workers is less than or equal to number of topic partitions kafka.delay.workers=2

4) the business side sends kafka messages to topic (common_delay_msg)

Message body parameter description:

{"topic": "actual business topic", "messageKey": "key of the message, influence sent to that partition", "message": "business message content", "delayTime": 1641470704}

DelayTime: specifies the delay time limit and second-level timestamp

Message body case:

{"topic": "cancel_order", "messageKey": "123456", "message": "{\" orderId\ ": 123456789123456,\" userId\ ":\" yhh\ "}", "delayTime": 1641470704} 4.3 Program Migration

Copy the delay message to save the directory to the new machine, restart the deployment and start the program. (the directory where the configuration item is located is kafka.delay.store.path=/data/kafka_delay)

4.4 troubleshoot the log

The log is output to / logs/kafka_delay/ by default, and the log output is asynchronous.

System.log records logs above the info level of the system, and info level logs are not output immediately, so some logs may be lost when the program is restarted.

Exception.log records the log above the warn level of the system, and the log is configured to output immediately. The program restarts normally and will not lose the log. Just focus on this log.

If you need to customize the log configuration, you can configure it in log4j2.xml.

If you want to debug locally, you can unravel the comments, otherwise there is no log output on the console:

5. Points for attention

1) due to the dormancy mechanism when the thread is idle, the delay message may be delayed by up to 8 seconds.

If you think the delay time is relatively large, you can modify the configuration of the source code and repackage it.

KafkaUtils.subscribe ()

MsgTransferTask.run ()

2) the current program is strictly dependent on the system clock. Note that the clock of the deployment server of the configuration program is the same as that of the business server.

3) it is recommended to configure a multiple of 2 partitions for Unified delay message queuing (common_delay_msg)

4) each kafka.delay.workers needs about 200 mb of memory, and the default configuration is 2. Jvm recommends configuring more than 1 GB of memory to avoid frequent gc.

When the workers increases, do not decrease it, otherwise some sqlite databases will not be accessed by threads and messages will be lost.

The greater the number of concurrent processing, the higher the efficiency of deferred message processing, but you need to be careful not to be greater than the number of partitions of topic.

How many concurrent processes you need to test will reach the upper limit of disk io and network bandwidth.

At present, the main bottleneck of the program lies in disk io and network bandwidth, and the actual memory and cpu resources are very low.

5) when the program is running, do not operate the delayed message saving directory, that is, the files in it.

6) currently, the message mode will not be abandoned under normal circumstances, but when the program is restarted, it is possible to send messages repeatedly, and downstream business systems need to do a good job of idempotent processing.

If the kafka cluster is abnormal, it is currently configured to resend 16 times, and if it still cannot be recovered, the current message will be discarded. In the actual production environment, this scenario is almost impossible.

If you are sure that the message cannot be discarded, you need to modify the source code (MsgTransferTask.run,KafkaUtils.send (…)). , repackage and deploy.

7) unknown exception occurred in the program (sqlite was manually modified, disk was full.) Will directly end the program running

At this point, the study on "how to implement kafka delay messages based on sqlite" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report