In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
What is the repeated consumption scenario and solution of Kafka? in view of this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a more simple and feasible way.
Kafka consumers consume a topic in the form of a consumer group (Consumer Group), and each record published to topic is passed to a consumer instance in each subscriber group. Consumer Group are independent and independent of each other, and they can subscribe to the same set of topics without interfering with each other. In the production environment, consumers may have the problem of repeated consumption if they do not consider the relevant characteristics of consumers when consuming news.
Before we talk about repeat consumption, let's take a look at several important configuration parameters related to consumers in kafka.
The default value of enable.auto.commit is true, which means that consumers will automatically submit their offset periodically.
Auto.commit.interval.ms the interval between auto-commits when enable.auto.commit is true. Default is 5000ms.
The number of big data entries pulled by max.poll.records consumers at a time. Default value.
The default value of 500 max.poll.interval.ms is 5 minutes, which means that if the consumer does not consume the last poll message within 5 minutes, the consumer will initiate a request to leave group.
In common usage scenarios, our consumer configuration is relatively simple, especially the integration of Spring components for message consumption. Usually, we only need an annotation to achieve message consumption. For example, the following code:
In this code, we configure a kafka consumption annotation to create a topic named "test1" for consumption, which belongs to the "group1" consumer group. Developers only need to process the messages they get. So how does the consumer in this code pull the message, and how does the consumer submit the offset of the corresponding message after consuming the message?
In fact, in auto.commit=true, the next poll occurs when the messages pulled by the last poll method are consumed, and after the auto.commit.interval.ms interval, the offset of all consumed messages is submitted the next time poll is called.
To verify the timing of consumer auto-submission, configure the consumer parameters as follows:
In order to obtain the consumption progress of consumers, the following code obtains the consumption progress information of consumers every 5 seconds through the relevant interfaces provided by kafka, and prints the obtained information to the console.
For topic test1, we set up only one partition to make it easier to observe consumption. For the configuration parameters of the consumer group group1, consumers will pull 20 messages at a time, and it will take 1 second for each message to be consumed. The printing results of some records are as follows:
As can be seen from the log, the offset of the consumer group is updated every 40 seconds, because each poll pulls 20 messages, and each message consumes 1 second. After the first poll, the next poll does not reach the auto.commit.interval.ms=30s, so the offset will not be submitted. By the time of the second poll, 40s have elapsed, so this time the poll will submit the message of the previous two consumption, and the offset will increase by 40%. That is, the offset of all consumed messages will be submitted only after the auto.commit.interval.ms interval has passed and the next time poll is called.
Considering the characteristics of the above consumer consumption messages, when the default value of automatic submission of enable.auto.commit, true, is configured, there are several scenarios in which repeated consumption occurs:
During the consumption of Consumer, the application process is forced to kill or exit abnormally.
For example, after a poll500 message, when the consumption reaches 200, the process is forced to consume kill and the offset is not submitted, or an abnormal exit causes the consumption to offset not to commit. The next time you restart, the 500 messages will still be pulled again, resulting in two repeated consumption of 200 messages before. Therefore, in applications with consumer threads, we should try to avoid using commands such as kill-9 that force to kill processes.
Consumers spend too long
The max.poll.interval.ms parameter defines the maximum interval between two poll. Its default value is 5 minutes, which means that if your Consumer program cannot consume the messages returned by the poll method within 5 minutes, Consumer will initiate a "leave the group" request, and Coordinator will open a new round of Rebalance. This may happen if consumers consume more time-consuming messages.
In order to reproduce this scenario, we reconfigure the consumer with the following parameters:
In the process of consumption, consumers will pull 11 messages at a time. Each message takes 5 minutes and 30 seconds. Because the default value of max.poll.interval.ms is 5 minutes, consumers cannot consume all of them in 5 minutes, and consumer will leave the group, resulting in rebalance.
The actual running log is as follows:
You can see that after consuming the 11th message, because the consumption time exceeds the default value of max.poll.interval.ms by 5 minutes, consumer has left the consumption group and started rebalance, so the submission of offset failed. After re-rebalance, after the consumer re-allocates the partition, the poll pull message is still consumed from the previously consumed message, which results in repeated consumption. And if we do not solve the problem that the consumption of a single consumption is too long, this part of the news may be consumed repeatedly all the time.
For the above scenarios of repeated consumption, if we do not deal with them accordingly, it may cause some online problems. In order to avoid the problems caused by repeated consumption, here are two ways to solve the problem of repeated consumption.
The first idea is to improve the consumption capacity and improve the processing speed of a single message, for example, the more time-consuming steps in message processing can be processed asynchronously, using multi-thread processing and so on. While shortening the consumption of a single message, you can set the value of max.poll.interval.ms to a higher value according to the actual scenario to avoid unnecessary rebalance. In addition, you can appropriately reduce the value of max.poll.records. The default value is 500, but the root can be appropriately reduced according to the actual message rate. This way of thinking can solve the problem of repeated consumption caused by too long consumption time, and make minor changes to the code, but can not absolutely avoid the problem of repeated consumption.
The second idea is to introduce a separate deduplication mechanism, such as adding a unique identifier such as message id to the message when generating the message. On the consumer side, we can save the most recent 1000 messages id to redis or mysql table, and configure the value of max.poll.records to be less than 1000. When consuming the message, the duplicates are removed through the front table before the message is processed.
In addition, in some consumption scenarios, we can idempotent the interface of consumption, for example, the query operation of the database is naturally idempotent, so there is no need to consider the problem of repeated consumption. For operations such as adding data, the impact of single and multiple operations on the system can be the same by setting a unique key, so as to make the interface idempotent.
This is the answer to the question about the repeated consumption scenario and solution of Kafka. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.