Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve the Kafka problem

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the relevant knowledge of "how to solve the Kafka problem". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Write at the front

It is estimated that the operation and maintenance staff did not have a worship server years ago, the problem with Nginx has been fixed, and Kafka has failed again. Today, I wanted to sleep again, but as a result, the phone rang again. Still operating, "Hey, Glacier, have you arrived at the company? hurry up and take a look at the server. There's a problem again."on the way, isn't the operation and maintenance guy at work yet"? "still on vacation." I: ". No, no, no. no, no, no." . Hey, did this guy run away? Leave him alone, the problem still needs to be solved.

Problem recurrence

After arriving at the company, I put down my special backpack, took out my laptop, opened it, and quickly logged in to the monitoring system, and found that there was nothing wrong with the main business system. A non-core service issues an alarm, and the monitoring system shows that the service frequently throws the following exceptions.

2021-02-28 22:03:05 131pool-7-thread-3 ERROR []-commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll () was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll () with max.poll.records. At org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest (ConsumerCoordinator.java:713) ~ [MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:596) ~ [MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1218) ~ [MsgAgent-jar-with-dependencies.jar:na] At com.today.eventbus.common.MsgConsumer.run (MsgConsumer.java:121) ~ [MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run (Thread.java:748) [na:1.8.0_161]

From the abnormal information output above, you can probably tell that there is a problem with the system: after processing a batch of poll messages, Kafka consumers made a mistake in synchronously submitting the offset to the broker Times. This is probably because the partition of the current consumer thread has been recycled by broker, because Kafka thinks the consumer is dead, as we can see in the output below.

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll () was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll () with max.poll.records.

Kafka internally triggers the Rebalance mechanism to identify the problem, and then we begin to analyze the problem.

Analyze the problem

Now that Kafka triggers the Rebalance mechanism, let me talk about the timing when Kafka triggers Rebalance.

What is Rebalance?

To take a specific example, for example, there are 10 Consumer instances under a group that subscribes to a topic with 50 partitions. Normally, Kafka assigns five partitions to each consumer. This allocation process is called Rebalance.

The timing of triggering Rebalance

Rebalance is triggered when the following conditions are met in Kafka:

The number of members in the group has changed, such as new consumers joining or leaving the consumer group. When a group member leaves the consumer group, the group member crashes or actively leaves the consumer group.

The number of topics subscribed to has changed.

The number of topic partitions subscribed to has changed.

The latter two situations can be artificially avoided, in the actual work process, the most common reason for Rebalance for Kafka is the change of consumer group members.

The normal addition and withdrawal of consumer members leads to Rebalance, which is inevitable, but in some cases, Consumer instances are mistakenly thought by Coordinator to be "stopped" and are "kicked out" of Group, resulting in Rebalance.

When Consumer Group completes the Rebalance, each Consumer instance sends a heartbeat request to Coordinator periodically, indicating that it is still alive. If a Consumer instance fails to send these heartbeat requests in time, Coordinator will assume that the Consumer is "dead", remove it from the Group, and start a new round of Rebalance. This time can be configured by the parameter session.timeout.ms on the Consumer side. The default value is 10 seconds.

In addition to this parameter, Consumer provides another parameter that controls how often heartbeat requests are sent, which is heartbeat.interval.ms. The lower the value, the more frequently the Consumer instance sends heartbeat requests. Sending heartbeat requests frequently consumes additional bandwidth resources, but the advantage is that you can know whether Rebalance is enabled more quickly, because Coordinator notifies each Consumer instance to enable Rebalance by encapsulating the REBALANCE_NEEDED flag in the response body of the heartbeat request.

In addition to the above two parameters, there is another parameter on the Consumer side, which is used to control the impact of Consumer's actual consumption power on Rebalance, that is, the max.poll.interval.ms parameter. It limits the maximum time interval between two calls to the poll method by the Consumer-side application. Its default value is 5 minutes, which means that if the Consumer program cannot consume the messages returned by the poll method within 5 minutes, Consumer will initiate a request to "leave the group" and Coordinator will start a new round of Rebalance.

From the above analysis, we can take a look at which rebalance can be avoided:

The first type of unnecessary Rebalance is caused by the failure to send the heartbeat in time, resulting in the Consumer being "kicked out" of the Group. In this case, we can set the values of session.timeout.ms and heartbeat.interval.ms to avoid the occurrence of rebalance as much as possible. (the following configuration is a best practice found online and has not been tested yet.)

Set session.timeout.ms = 6s.

Set heartbeat.interval.ms = 2s.

Make sure that the Consumer instance can send at least 3 heartbeat requests, that is, session.timeout.ms > = 3 * heartbeat.interval.ms, before it is determined to be "dead".

The main reason for setting session.timeout.ms to 6s is to enable Coordinator to locate dead Consumer faster and kick them out of Group as soon as possible.

The second kind of unnecessary Rebalance is caused by the excessive consumption time of Consumer. At this time, the setting of max.poll.interval.ms parameter value is particularly critical. If you want to avoid unexpected Rebalance, it's best to set this parameter value a little higher than the downstream maximum processing time.

In short, leave plenty of time for business processing logic. In this way, Consumer does not raise Rebalance because it takes too long to process these messages.

Pull offset and submit offset

The offset of kafka (offset) is managed by consumers, and there are two kinds of offsets, pull offset (position) and commit offset (committed). The pull offset represents the current consumer consumption progress by region. After each message consumption, the offset needs to be submitted. When submitting an offset, kafka sends it to the coordinator using the value of the pull offset as the commit offset of the partition.

If the offset is not submitted, the next time the consumer reconnects to the broker, the consumer will start spending at the offset that the current consumer group has submitted to the broker.

So, the problem is that when we process the message for too long, it has been removed by broker, and the submission offset will report an error. So the pull offset is not submitted to broker, and the partition is rebalance. The next time the partition is reassigned, the consumer will start spending at the latest submitted offset. Here there is the problem of repeated consumption.

The scheme of exception log prompt

In fact, having said that, the corresponding solution is also given in the exception log of Kafka consumer output.

Next, let's talk about pull offsets and commit offsets in Kafka.

In fact, the output log information probably gives a way to solve the problem. To put it simply, the configuration value of max.poll.records can be reduced by increasing the length of max.poll.interval.ms and session.timeout.ms, and the consumer side should submit the offset in time when the message is processed.

Problem solving

Through the previous analysis, we should know how to solve this problem. What I need to say here is that when I integrate Kafka, I use SpringBoot and Kafka consumer listeners, and the main code structure on the consumer side is as follows.

@ KafkaListener (topicPartitions = {@ TopicPartition (topic = KafkaConstants.TOPIC_LOGS, partitions = {"0"})}, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord record, Acknowledgment ack) {logger.info ("topic is {}, offset is {}, value is {} n", record.topic (), record.offset (), record.value ()); try {Object value = record.value (); logger.info (value.toString ()) Ack.acknowledge ();} catch (Exception e) {logger.error ("log consumer exception: {}", e);}}

The logic of the above code is relatively simple, that is, the message in Kafka is obtained and printed directly to the log file.

Try to solve the problem

Here, I first configure according to the prompts of the exception log, so I added the following configuration information to the application.yml file of SpringBoot.

Spring: kafka: consumer: properties: max.poll.interval.ms: 3600000 max.poll.records: 50 session.timeout.ms: 60000 heartbeat.interval.ms: 3000

After the configuration is complete, test the consumer logic again and find that the Rebalance exception is still thrown.

Final solution

Let's look at the problems caused by Kafka consumers from another perspective: one Consumer is producing messages, another Consumer is consuming its messages, and they cannot be under the same groupId, just change the groupId of one of them.

Here, our business project is developed by modules and subsystems, for example, module An is producing messages, and module B consumes messages produced by module A. At this point, modify the configuration parameters, such as session.timeout.ms: 60000, which does not work at all, or throw a Rebalance exception.

At this point, I try to modify the groupId of the consumer grouping by changing the following code

@ KafkaListener (topicPartitions = {@ TopicPartition (topic = KafkaConstants.TOPIC_LOGS, partitions = {"0"})}, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord record, Acknowledgment ack) {

Modify to the code shown below.

@ KafkaListener (topicPartitions = {@ TopicPartition (topic = KafkaConstants.TOPIC_LOGS, partitions = {"0"})}, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord record, Acknowledgment ack) {"how to solve the Kafka problem" ends here. Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report