Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to solve the problem of missing messages in Kafka

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the relevant knowledge of "how to solve the problem of lost messages in Kafka". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Broker

The loss of messages is caused by Kafka itself. In order to get higher performance and throughput, Kafka stores data asynchronously in batches on disk.

Message brushing process, in order to improve performance and reduce the number of flushing times, Kafka adopts the practice of batch brushing. That is, the disk is brushed according to a certain amount of information and time interval.

This mechanism is also determined by the Linux operating system. When data is stored in the Linux operating system, it is first stored in the page cache (Page cache), flushed according to time or other conditions (from Page Cache to file), or forced by the fsync command.

When the data is in Page Cache, if the system dies, the data will be lost.

Broker reads and writes at high speed on Linux servers and synchronizes to Replica

The figure above outlines the process of Broker writing data and synchronization. Broker write data is only written to Page Cache, while Page Cache is located in memory.

This part of the data will be lost after a power outage. The data of Page Cache is brushed through the flusher program of Linux.

There are three triggering conditions for brushing:

Actively call the sync or fsync function.

The available memory is below the threshold.

The dirty data time reached the threshold. Dirty is an identification bit of Page Cache. When data is written to Page Cache, Page Cache is marked as dirty. After the data is brushed, the dirty flag is cleared.

Broker configures the flushing mechanism to take over the flushing action by calling the fsync function. From a single Broker point of view, Page Cache data will be lost.

Kafka does not provide a way to synchronize flushing. Synchronous flushing is implemented in RocketMQ, which is implemented by blocking the process of asynchronous flushing and waiting for a response, similar to Ajax's callback or Java's future.

The following is the source code of a RocketMQ:

GroupCommitRequest request = new GroupCommitRequest (result.getWroteOffset () + result.getWroteBytes ()); service.putRequest (request); boolean flushOK = request.waitForFlush (this.defaultMessageStore.getMessageStoreConfig (). GetSyncFlushTimeout ()); / / Brush disk

In other words, in theory, it is impossible for Kafka to guarantee that a single Broker will not lose messages, and the situation can only be alleviated by adjusting the parameters of the disk flushing mechanism.

For example, reduce the interval between brushing disk and reduce the amount of data. The shorter the time, the worse the performance, and the better the reliability (as reliable as possible). This is a multiple choice question.

To solve this problem, Kafka cooperates with Producer and Broker to deal with the loss of parameters in a single Broker.

Once Producer discovers that the Broker message is missing, it can automatically retry. Messages are not lost unless the number of retry exceeds the threshold (configurable).

At this point, the producer client needs to handle the situation manually. So how does Producer detect data loss? It is through the ack mechanism, similar to http's three-way handshake.

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:

Acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to-1.

Acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

Acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

Http://kafka.apache.org/20/documentation.html

The above reference is the official interpretation of the parameter acks by Kafka (in the previous version, the parameter was request.required.acks):

① acks=0,Producer does not wait for a response from Broker, which is most efficient, but messages are likely to be lost.

After receiving the message, ② acks=1,leader broker returns ack without waiting for a response from another follower. It can also be understood that the number of ack is 1.

At this point, if follower dies before receiving the message of leader synchronization, the message will be lost.

According to the example in the figure above, if leader receives a message and successfully writes to PageCache, it returns ack, and Producer thinks the message was sent successfully.

But at this point, according to the figure above, the data has not yet been synchronized to follower. If the leader is powered off at this time, the data will be lost.

After receiving the message, ③ acks=-1,leader broker suspends and waits for all the follower in the ISR list to return a result before returning ack.

-1 is equivalent to all. In this configuration, ack is not returned only if leader writes data to pagecache, and ack is triggered only if all ISR returns "successful".

If the power is cut off at this time, Producer can know that the message has not been sent successfully and will be resent. If the ack,leader power is successfully returned after follower receives the data, the data will exist in the original follower. After re-election, the new leader will hold this part of the data.

2 steps are required to synchronize data from leader to follower:

The data is flushed from Page Cache to disk. Because only the data in disk can be synchronized to replica.

Data is synchronized to replica, and replica successfully writes data to Page Cache. After Producer gets the ack, even if all the machines are powered off, the data will at least exist on leader's disk.

The third point above mentions the follower of ISR's list, which needs to be matched with another parameter to better ensure the validity of ack.

ISR is a "reliable follower list" maintained by Broker, the in-sync Replica list, and the configuration of Broker contains one parameter: min.insync.replicas.

This parameter represents the minimum number of copies in the ISR. If you do not set this value, the follower list in ISR may be empty. This is equivalent to acks=1.

As shown in the figure above:

Acks=0, total time f (t) = f (1).

Acks=1, total time f (t) = f (1) + f (2).

Acks=-1, total time f (t) = f (1) + max (f (A), f (B)) + f (2).

The performance decreases and the reliability increases in turn.

Producer

The Producer lost message, which occurred on the producer client.

In order to improve efficiency, it is reduced that IO,Producer can merge multiple requests and send them when sending data. How to send the merged request is cached in the local buffer.

Caching is similar to the flushing mentioned earlier, Producer can package the request into "blocks" or send out the data in the buffer at intervals.

Through buffer, we can transform the producer into an asynchronous mode, which can improve our transmission efficiency.

However, the data in buffer is dangerous. Under normal circumstances, asynchronous calls from the client can handle situations where message delivery fails or times out through callback.

However, once Producer is illegally stopped, the data in buffer will be lost and Broker will not be able to receive that part of the data.

Or, when the Producer client runs out of memory, if the strategy adopted is to discard the message (another strategy is block blocking), the message will also be lost.

Or, message generation (asynchronous generation) is too fast, resulting in too many suspended threads, insufficient memory, program crash and message loss.

Schematic diagram of Producer taking batch delivery

Schematic diagram of too fast production speed of asynchronously sending messages

According to the figure above, you can think of several solutions:

Send messages asynchronously instead of sending messages synchronously. Or service uses a blocked thread pool when generating messages, and there is a certain limit to the number of threads. The overall idea is to control the speed of message generation.

Expand the capacity configuration of Buffer. This way can alleviate the occurrence of the situation, but it cannot be eliminated.

Instead of sending the message directly to buffer (memory), service writes the message to a local disk (database or file), which is sent by another (or a small number of) production threads. It is equivalent to adding a richer buffer between buffer and service.

Consumer

Consumer consumption messages have the following steps:

Receive messages

Processing messages

Feedback "processed" (commited)

There are two main ways of consumption of Consumer:

Automatically submit offset,Automatic Offset Committing

Manually submit offset,Manual Offset Control

The mechanism of Consumer auto-submission is to commit the received messages according to a certain time interval. The commit process and the process of consuming messages are asynchronous.

That is, there may be an unsuccessful consumption process (such as throwing an exception) and the commit message has been submitted. At this point the message is lost.

Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("group.id", "test"); / / Auto-commit switch props.put ("enable.auto.commit", "true"); / / time interval for auto-commit, here is 1s props.put ("auto.commit.interval.ms", "1000") Props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList ("foo", "bar")); while (true) {/ / after calling poll, after 1000ms, the message status is changed to committed ConsumerRecords records = consumer.poll For (ConsumerRecord record: records) insertIntoDB (record); / / store messages into the library for a longer time than 1000ms

The above example is an example of automatic submission. If an exception occurs in insertIntoDB (record) at this time, the message will be lost.

Here is an example of manual submission:

Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("group.id", "test"); / / turn off autocommit and manually submit props.put ("enable.auto.commit", "false"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") KafkaConsumer consumer = new KafkaConsumer (props); consumer.subscribe (Arrays.asList ("foo", "bar")); final int minBatchSize = 200; List buffer = new ArrayList (); while (true) {/ / after calling poll, auto commit ConsumerRecords records = consumer.poll is not performed; for (ConsumerRecord record: records) {buffer.add (record);} if (buffer.size () > = minBatchSize) {insertIntoDb (buffer) / / commit operation consumer.commitSync (); buffer.clear ();} is performed only after all messages have been consumed.

When you change the submission type to manual, you can guarantee that the message is "consumed at least once" (at least once). However, repeated consumption may occur at this time, which is outside the scope of this article.

The above two examples are High level API that uses Consumer directly, and the client is transparent to offset and so on.

You can also use Low level API to manually control offset, or you can ensure that messages are not lost, but it will be more complicated.

Try {while (running) {ConsumerRecords records = consumer.poll (Long.MAX_VALUE); for (TopicPartition partition: records.partitions ()) {List partitionRecords = records.records (partition); for (ConsumerRecord record: partitionRecords) {System.out.println (record.offset () + ":" + record.value ()) } long lastOffset = partitionRecords.get (partitionRecords.size ()-1). Offset (); / / precise control of offset consumer.commitSync (Collections.singletonMap (partition, new OffsetAndMetadata (lastOffset + 1));}} finally {consumer.close () } this is the end of the content of "how to solve the problem of lost messages in Kafka". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report