Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deeply interpret the Reliability of kafka data

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how to interpret the reliability of kafka data in depth, which is concise and easy to understand, which will definitely brighten your eyes. I hope you can gain something through the detailed introduction of this article.

1 Overview

Kakfa was originally a distributed messaging system developed by LinkedIn and later became a part of Apache. It is written in Scala and is widely used for its horizontal scalability and high throughput. At present, more and more open source distributed processing systems such as Cloudera, Apache Storm, Spark and so on support integration with Kafka.

With its own advantages, Kafka is more and more favored by Internet companies. VIPSHOP also uses Kafka as one of its internal core message engines. As a commercial message middleware, the importance of message reliability in Kafka can be imagined. How to ensure the precise transmission of messages? How to ensure the accurate storage of messages? How to ensure the correct consumption of messages? These are all issues that need to be considered. This paper starts with the architecture of Kafka, first understands the basic principles of Kafka, and then analyzes the reliability of kakfa step by step by analyzing the storage mechanism, replication principle, synchronization principle, reliability and persistence guarantee of Kafka. * through benchmark to enhance the understanding of the high reliability of Kafka.

2 Kafka architecture

As shown in the figure above, a typical Kafka architecture includes several Producer (which can be server logs, business data, page view generated at the front end of the page, and so on), several broker (Kafka supports horizontal scaling, generally, the higher the number of broker, the higher the cluster throughput), several Consumer (Group), and a Zookeeper cluster. Kafka manages the cluster configuration through Zookeeper, elects leader, and rebalance when the consumer group changes. Producer uses the push mode to publish messages to broker,Consumer and uses the pull mode to subscribe and consume messages from broker.

The noun explains:

2.1 Topic & Partition

A topic can be thought of as a class of messages, each topic will be divided into multiple partition, and each partition is an append log file at the storage level. Any message published to this partition is appended to the end of the log file. The location of each message in the file is called offset (offset), and offset is a long number that uniquely marks a message. Each message is append to partition and is sequentially written to disk, so it is very efficient (it has been proved that sequential disk writing is more efficient than random write memory, which is an important guarantee of Kafka's high throughput).

Each message is sent to the broker, and the partition to which it is stored is selected according to the partition rules. If the partition rules are set properly, all messages can be evenly distributed to different partition, thus achieving horizontal scaling. If a topic corresponds to a file, the machine on which the file is located will become the performance bottleneck of the topic, and partition solves this problem. When you create a topic, you can specify the number of this partition in $KAFKA_HOME/config/server.properties (shown below), and of course you can modify the number of partition after the topic is created.

# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. Num.partitions=3

When sending a message, you can specify that the key,producer of the message determines which partition the message is sent to according to the key and partition mechanism. The partition mechanism can be specified by specifying the partition.class parameter of the producer, which must implement the kafka.producer.Partitioner interface.

For more details on Topic and Partition, you can refer to the "Kafka file storage mechanism" section below.

3 High reliability storage analysis

The guarantee of Kafka's high reliability comes from its robust replica (replication) strategy. By adjusting the parameters related to its copy, the Kafka can operate easily between performance and reliability. Kafka provides partition-level replication starting with version 0.8.x, and the number of replication can be configured in $KAFKA_HOME/config/server.properties (default.replication.refactor).

Here, we start with the Kafka file storage mechanism, understand the storage details of Kafka from the * layer, and then have a micro understanding of its storage. After that, the concept of macro level is expounded through the principle of Kafka replication and synchronization. * enrich the understanding of ISR,HW,leader-related knowledge points from various dimensions such as Kafka election and data reliability and persistence guarantee.

3.1 Kafka file storage mechanism

Messages in Kafka are classified by topic, producers send messages to Kafka broker through topic, and consumers read data through topic. However, at the physical level, topic can be grouped into partition, and a topic can be divided into several partition, so how are topic and partition stored? partition can also be subdivided into segment, and a partition is physically composed of multiple segment, so what are these segment? Let's find out one by one.

For illustrative purposes, assume that there is only one Kafka cluster and that cluster has only one Kafka broker, that is, only one physical machine. Configure (in $KAFKA_HOME/config/server.properties) log.dirs=/tmp/kafka-logs in this Kafka broker to set the Kafka message file storage directory, while creating a topic:topic_zzh_test,partition of 4 ($KAFKA_HOME/bin/kafka-topics.sh-create-zookeeper localhost:2181-partitions 4-topic topic_vms_test-replication-factor 4). So we can see that four directories have been generated in the / tmp/kafka-logs directory at this time:

Drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0 drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1 drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2 drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3

In Kafka file storage, there are multiple different partitions under the same topic, each partiton is a directory. The name rule of partition is: topic name + ordinal number, * sequence number starts from 0, * * the ordinal number is partition quantity minus 1 partition partition is an actual physical concept, while topic is a logical concept.

As mentioned above, partition can also be subdivided into segment. What is this segment? If we take partition as the minimum storage unit, we can imagine that when Kafka producer continues to send messages, it will inevitably lead to the expansion of partition files, which will have a serious impact on the maintenance of message files and the cleaning of messages that have been consumed. Therefore, partition is subdivided in segment units. Each partition (directory) is equivalent to a giant file that is evenly distributed to multiple segment (segment) data files of the same size (the number of messages in each segment file is not necessarily equal). This feature also facilitates the deletion of old segment, that is, it facilitates the cleaning of messages that have been consumed and improves disk utilization. Each partition only needs to support sequential read and write, and the file life cycle of the segment is determined by the server configuration parameters (log.segment.bytes,log.roll. {ms,hours} and other parameters).

The segment file consists of two parts, the ".index" file and the ".log" file, which are represented as the segment index file and the data file respectively. The command rules for these two files are as follows: the global * segment of partition starts from 0, and each subsequent segment file is named as the offset value of the last segment file * a message, with a numeric size of 64 bits and a length of 20 digits. No digits are filled with 0, as follows:

00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log

Take the above segment file as an example to show the corresponding relationship between the ".index" file and the ".log" file of segment:00000000000000170410, as shown in the following figure:

As shown in the figure above, the ".index" index file stores a large amount of metadata, and the ".log" data file stores a large number of messages, and the metadata in the index file points to the physical offset address of the message in the corresponding data file. Take the metadata [3348] in the ".index" index file as an example, the third message is represented in the ".log" data file, that is, 170410 messages are represented in the global partition, and the physical offset address of the message is 348.

So how do you find message through offset from partition?

The above figure is an example, to read the message of offset=170418, first look for the segment file, where 00000000000000000000.index is the original file, the second file is 00000000000000170410.index (starting offset of 170410'1' 170411), and the third file is 00000000000000239430.index (starting offset of 239430'1' 239431), so the offset=170418 falls into the second file. Other subsequent files can be named and arranged according to the actual offset, and then the specific file location can be quickly located according to the binary search method. Secondly, it is read according to the location of the 00000000000000170410.index file located to the 1325th location in the 00000000000000170410.log file.

If you read the message of offset=170418 and read it from the location of 2525 in the 00000000000000170410.log file, how do you know when to finish reading this message, or else you will read the contents of the next message?

This needs to be related to the physical structure of the message, which has a fixed physical structure, including: offset (8 Bytes), message body size (4 Bytes), crc32 (4 Bytes), magic (1 Byte), attributes (1 Byte), key length (4 Bytes), key (K Bytes), payload (N Bytes) and other fields to determine the size of a message, that is, where to read.

3.2 replication principles and synchronization methods

Each partition of topic in Kafka has a pre-written log file. Although partition can be subdivided into several segment files, for upper applications, partition can be regarded as the smallest storage unit (a "giant" file with multiple segment files). Each partition consists of a series of ordered and immutable messages, which are continuously appended to the partition.

There are two new nouns in the picture above: HW and LEO. Here we first introduce the abbreviation of LEO,LogEndOffset, which indicates the location of a Message in the log*** of each partition. HW is the abbreviation of HighWatermark, which refers to the location of the partition that consumer can see. This involves the concept of multiple copies, which is mentioned here and detailed in the next section.

Back to the point, in order to improve the reliability of the message, Kafka has N copies (replicas) of each topic partition, where N (greater than or equal to 1) is the number of replication factors (replica fator) of the topic. Kafka implements automatic failover through the multi-copy mechanism, and the service is still available in the event of a broker failure in the Kafka cluster. When replication occurs in Kafka, make sure that partition logs are written to other nodes in an orderly manner. Among N replicas, one replica is leader and the other is follower. Leader handles all read and write requests from partition. At the same time, follower passively and periodically replicates data on leader.

As shown in the following figure, there are 4 broker in a Kafka cluster, 3 partition in a topic, and 3 replicas as a replication factor:

Kafka provides a data replication algorithm to ensure that if the leader fails or dies, a new leader is elected and the message of the accepted client is successfully written. Kafka ensures that a copy is selected as leader, or follower, to catch up with leader data from the list of synchronized copies. Leader is responsible for maintaining and tracking the status of all follower latency in ISR (an acronym for In-Sync Replicas, which refers to the copy synchronization queue, see the next section). When producer sends a message to broker, leader writes the message and copies it to all follower. The message is successfully copied to all synchronous copies after it has been submitted. Message replication latency is limited by the slowest follower, and it is important to detect slow copies quickly. If the follower "lags" too far or fails, leader will remove it from the ISR.

3.3 ISR

In the previous section we covered ISR (In-Sync Replicas), which refers to the replica synchronization queue. The number of replicas has a certain impact on the throughput of Kafka, but greatly enhances the availability. By default, the number of replica of Kafka is 1, that is, each partition has a unique leader. In order to ensure the reliability of messages, the value (specified by the parameter offsets.topic.replication.factor of broker) is set to a size greater than 1, such as 3. All replicas are collectively referred to as Assigned Replicas, or AR. ISR is a subset of AR. Leader maintains the ISR list. Follower has some delays in synchronizing data from leader (including delay time replica.lag.time.max.ms and delay bars replica.lag.max.messages, which is only supported in the current version 0.10.x). Any one that exceeds the threshold will remove follower from the ISR and store it in the OSR (Outof-Sync Replicas) list. Newly added follower will also be stored in OSR first. AR=ISR+OSR .

The replica.lag.max.messages parameter has been removed since version 0.10.x of Kafka, leaving only replica.lag.time.max.ms as a parameter for replica management in ISR. Why would you do that? replica.lag.max.messages indicates that the number of messages that currently lag behind leaeder in a copy exceeds the value of this parameter, so leader removes follower from the ISR. Assuming that replica.lag.max.messages=4 is set, if the number of messages sent by producer to broker is less than 4 at a time, because after leader receives the messages sent by producer and before the follower copy starts to pull these messages, the number of messages behind leader will not exceed 4 messages, so no follower is moved out of ISR, so the setting of replica.lag.max.message seems reasonable at this time. However, producer initiates instantaneous peak traffic, and when producer sends more than 4 messages at a time, that is, more than replica.lag.max.messages, follower will be considered to be out of sync with the leader copy and will be kicked out of ISR. But in fact, these follower are alive and have no performance problems. Then he caught up with leader and rejoined ISR. As a result, they continue to weed out ISR and then return to ISR, which undoubtedly increases the unnecessary performance loss. And this parameter is global to broker. The setting is too large to affect the removal of the real "backward" follower; the setting is too small, resulting in frequent entry and exit of the follower. An appropriate value for replica.lag.max.messages cannot be given, so the new version of Kafka removes this parameter.

Note: leader and follower are included in ISR.

The previous section also deals with a concept, HW. HW is commonly known as high water level, an abbreviation for HighWatermark. Take the smallest LEO in the ISR corresponding to a partition as HW,consumer, which can only be consumed to the location where the HW is located at most. In addition, each replica has its own HW,leader and follower responsible for updating the status of its own HW. For messages newly written by leader, consumer cannot be consumed immediately. Leader will wait for the message to be updated by HW after it is synchronized by replicas in all ISR before the message can be consumed by consumer. This ensures that if the broker in which the leader is located fails, the message can still be obtained from the newly elected leader. For read requests from internal broKer, there is no restriction on HW.

The following figure illustrates in detail the flow process of ISR, HW and LEO when the producer production message reaches broker:

Thus it can be seen that the replication mechanism of Kafka is neither complete synchronous replication nor simple asynchronous replication. In fact, synchronous replication requires all the working follower to be replicated before the message is commit, which greatly affects the throughput. In asynchronous replication mode, follower replicates data asynchronously from leader, and as long as the data is written by leader to log, it is considered to have been commit. In this case, if follower has not been copied and lags behind leader, suddenly leader downtime, data will be lost. Kafka's way of using ISR is a good balance to ensure that data is not lost and throughput.

The management of Kafka's ISR will eventually be fed back to the Zookeeper node. The specific location is / brokers/topics/ [topic] / partitions/ [partition] / state. There are currently two places where the node of this Zookeeper is maintained:

Controller to maintain: one of the Broker in the Kafka cluster will be elected Controller, mainly responsible for Partition management and replica state management, and will also perform administrative tasks such as reassigning partition. Under certain conditions, LeaderSelector under Controller will elect new leader,ISR and new leader_epoch and controller_epoch to write to the relevant nodes of Zookeeper. At the same time, initiate a LeaderAndIsrRequest to notify all replicas.

Leader for maintenance: leader has a separate thread that periodically checks whether the follower in the ISR is separated from the ISR, and if it finds a change in the ISR, it will return the information of the new ISR to the relevant nodes of the Zookeeper.

3.4 data reliability and persistence assurance

When producer sends data to leader, you can set the level of data reliability through the request.required.acks parameter:

1 (default): this means that producer's leader in ISR has successfully received the data and sent the next message after it has been confirmed. If leader goes down, data will be lost.

0: this means that producer does not need to wait for a confirmation from broker to continue sending the next batch of messages. In this case, the data transmission efficiency is * *, but the data reliability is *.

-1:producer needs to wait for all the follower in the ISR to confirm that it has received the data before it is sent at a time. But this does not guarantee that data will not be lost, for example, when there is only leader in the ISR (as mentioned in the previous ISR section, members in ISR will increase and decrease due to some circumstances, at least one leader), which becomes the case of acks=1.

If you want to improve the reliability of the data, you need to cooperate with the parameter min.insync.replicas (which can be set at the broker or topic level) when setting request.required.acks=-1, so that it can have the effect of *. The parameter min.insync.replicas sets the minimum number of copies in ISR. The default value is 1. This parameter takes effect only if and only if the request.required.acks parameter is set to-1. If the number of replicas in ISR is less than the number configured by min.insync.replicas, the client returns an exception: org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required.

Next, the two situations of acks=1 and-1 are analyzed in detail:

1. Request.required.acks=1

Producer successfully sent data to leader,leader to write the local log, and the client was returned successfully. At this time, the leader goes down before the copy in ISR can pull the message, so the message sent this time will be lost.

2. Request.required.acks=-1

Synchronization (replication.factor defaults to synchronization, that is, producer.type=sync). If replication.factor > = 2 and min.insync.replicas > = 2, no data will be lost.

There are two typical situations. In the case of acks=-1 (unless otherwise specified, the following acks is expressed as parameter request.required.acks), the data is sent to leader. After all the follower of ISR is synchronized, the leader is hung up at this time, then a new leader will be elected and the data will not be lost.

In the case of acks=-1, after the data is sent to leader, some copies of ISR are synchronized, and leader dies at this time. For example, both follower1h and follower2 may become the new leader, the producer side will get a return exception, the producer side will resend the data, and the data may be duplicated.

Of course, if the follower2 has not synchronized any data at the time of leader crash, and follower2 is elected as the new leader, the message will not be repeated.

Note: Kafka only deals with fail/recover issues, not Byzantine issues.

3.5 further discussion on HW

Consider another situation in the figure above (that is, acks=-1, partial ISR copy synchronization). If follower1 synchronizes message 4 while follower2 is elected as leader, how to deal with the extra message 5 in follower1?

Here we need the cooperation of HW. As mentioned earlier, in an ISR list in partition, the HW of leader is the smallest LEO of all the copies in the ISR list. Similar to the barrel principle, the water level depends on the short board.

As shown in the figure above, there are three copies of a partition of a topic, which are A, B, and C. An as a leader must be followed by LEO***,B. Machine C has the slowest synchronization because of its low configuration and poor network. At this time, machine A goes down, if B becomes leader, if there is no HW, it will do makeFollower operation after A resumes, and append operation will be done directly after log file goes down. If B's LEO has reached A's LEO, data inconsistency will occur, so use HW to avoid this situation.

When A does the synchronization operation, first truncate the log file to the location of its own HW, that is, 3, and then pull the message from B for synchronization.

If the failed follower recovers, it first truncates its log file to the location of the HW at the last checkpointed, and then synchronizes the message from leader. If the leader is hung up, it will be re-elected, and the new leader will send "instructions" to truncate the rest of the follower to its own HW location and then pull the new message.

When the LEO of a copy of the ISR is inconsistent, if the leader is dead at this time, the election of the new leader is not conducted according to the level of the LEO, but according to the order in the ISR.

3.6 Leader election

A message is considered submitted only if all follower in the ISR are copied from the leader. This prevents part of the data from being written into leader and downtime before it is copied by any follower, resulting in data loss. In the case of producer, it can choose whether to wait for the message commit, which can be set through request.required.acks. This mechanism ensures that as long as there is one or more follower in the ISR, a message that is commit will not be lost.

A very important question is how to elect a new leader in the follower when the leader goes down. Because the follower may lag far behind or directly crash, you must make sure that the "* *" follower is selected as the new leader. A basic principle is that if the leader is gone, the new leader must have all the messages of the original leader commit. This requires a compromise. If leader waits for more follower acknowledgments before a message in the table name is commit, then more follower can become the new leader after it dies, but this can also lead to a drop in throughput.

A very common way to elect leader is "the minority is subordinate to the majority", which Kafka does not use. In this mode, if we have 2f+1 copies, we must ensure that there are 1 replica copies of the message before commit, and in order to ensure that a new leader is correctly elected, the number of failed copies cannot exceed f. This approach has a big advantage. The latency of the system depends on the fastest machines, that is, if the number of replicas is 3, then the delay depends on the fastest follower rather than the slowest one. " The "minority obeys the majority" approach also has some disadvantages. In order to ensure the normal conduct of the leader election, it can tolerate a relatively small number of failed follower. If you want to tolerate one follower failure, you must have at least three copies. If you want to tolerate two follower failures, you must have more than five copies. In other words, in order to ensure a high fault tolerance rate in the production environment, there must be a large number of copies, and a large number of copies will lead to a sharp decline in performance under a large amount of data. This algorithm is more often used in systems with shared cluster configurations such as Zookeeper than in systems that require large amounts of data. HDFS's HA function is also based on the "minority is subordinate to the majority" approach, but its data storage is not in this way.

In fact, there are many leader election algorithms, such as Zookeeper's Zab, Raft, and Viewstamped Replication. The leader election algorithm used by Kafka is more like Microsoft's PacificA algorithm.

Kafka dynamically maintains an ISR for each partition in Zookeeper, all replica in this ISR keep up with leader, and only members of ISR can be selected as leader (unclean.leader.election.enable=false). In this mode, a Kafka topic can tolerate the failure of f copies without losing commit messages for one copy, which is advantageous in most usage scenarios. In fact, in order to tolerate the failure of f copies, the "minority is subordinate to the majority" approach is the same as the number of copies ISR needs to wait before commit, but the total number of copies required by ISR is almost half that of the "minority is subordinate to the majority" way.

As mentioned above, when there is at least one follower in the ISR, Kafka can ensure that the data that has been commit will not be lost, but if all the replica of a partition is down, there is no guarantee that the data will not be lost. In this case, there are two feasible options:

Wait for any replica in the ISR to come alive and choose it as the leader

Select * living replica (not necessarily in ISR) as the leader

This requires a simple choice between usability and consistency. If you have to wait for the replica in ISR to come alive, it may be unavailable for a relatively long time. And if all the replica in the ISR fails to survive, or if the data is lost, the partition will never be available. Select * * living replica as the leader, and this replica is not the replica in ISR, then even if it does not guarantee that it contains all the messages that have already been commit, it will become the leader and serve as the data source of consumer. By default, Kafka uses the second policy, unclean.leader.election.enable=true, and you can also set this parameter to false to enable * policies.

The parameter unclean.leader.election.enable has a vital impact on the election of leader, the availability of the system, and the reliability of data. Let's analyze several typical scenarios.

As shown in the figure above, suppose that the number of replicas in a partition is 3, replica-1 and replica-2 are stored in broker0, broker1 and broker2, respectively. AR= (0Jol 1), ISR= (0Jue 1).

Set request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false. Here, the copy in broker0 is also called broker0. At first, broker0 is leader,broker1 and follower.

When the replica-0 in ISR is in the case of crash, broker1 is elected as the new leader [ISR= (1)], because write cannot be served because of min.insync.replicas=2, but read can continue to serve normally. Recovery programme for this situation:

Try to restore (restart) replica-0, if you can get up, the system is normal

If the replica-0 cannot be restored, you need to set min.insync.replicas to 1 to restore the write function.

When crash appears in the replica-0 in ISR, followed by crash in replica-1, and [ISR= (1), leader=-1], the service cannot be provided. In this case, the recovery solution:

Try to restore replica-0 and replica-1. If both can get up, the system will return to normal.

If replica-0 is up and replica-1 is not up, leader cannot be selected at this time, because when unclean.leader.election.enable=false is set, leader can only be elected from ISR. When all copies in ISR are invalid, the invalid copy in ISR can be restored before leader can be elected, that is, replica-0 is invalidated first, replica-1 is invalidated later, and leader can be elected only after replica-1 is restored. The conservative solution recommends setting unclean.leader.election.enable to true, but there will be data loss, which can restore the read service. You also need to set min.insync.replicas to 1 to restore the write function.

Replica-1 is restored, but replica-0 cannot be restored. This situation has been encountered above. Read service is available. You need to set min.insync.replicas to 1 to restore write function.

Neither replica-0 nor replica-1 can be restored, which can be seen in scenario 2.

When the replica-0 and replica-1 in the ISR are down at the same time, [ISR= (0recover1)] cannot provide services. In this case, try to restore replica-0 and replica-1. When any one of the replicas returns to normal, the read service can be provided. The write function will not be restored until the 2 replicas return to normal, or min.insync.replicas will be set to 1.

3.7 the transmission mode of Kafka

The sending mode of Kafka is set by the producer configuration parameter producer.type, which specifies whether the message is sent synchronously or asynchronously in the background thread. The default is synchronous, that is, producer.type=sync. If set to asynchronous mode, that is, producer.type=async, producer can push data in the form of batch, which greatly improves the performance of broker, but increases the risk of data loss. If you need to ensure the reliability of the message, you must set producer.type to sync.

For asynchronous mode, there are four matching parameters, as follows:

Pushing data in the way of batch can greatly improve the processing efficiency. Kafka producer can accumulate a certain number of messages in memory and send requests as a batch. The number of batch can be controlled by the parameter (batch.num.messages) of producer. By increasing the size of batch, you can reduce the number of network requests and disk IO, of course, the specific parameter settings need to make a tradeoff between efficiency and timeliness. The parameter batch.size is also available in newer versions.

4 use analysis of high reliability

4.1 message transmission guarantee

You've seen how Kafka can store effectively, and how producer and consumer work. The next discussion is how Kafka ensures that messages are transmitted between producer and consumer. There are three possible transmission guarantees (delivery guarantee):

At most once: messages may be lost, but will never be transmitted repeatedly

At least once: messages are never lost, but may be transmitted repeatedly

Exactly once: each message must be transmitted once and only once

The message transmission guarantee mechanism of Kafka is very intuitive. When producer sends a message to broker, once the message is commit, it will not be lost due to the existence of the copy mechanism (replication). However, if the communication is interrupted due to network problems encountered by producer after sending data to broker, then producer cannot determine whether the message has been submitted (commit). Although Kafka cannot determine what happened during a network failure, producer can retry multiple times to ensure that messages have been correctly transmitted to broker, so Kafka currently implements at least once.

After consumer reads the message from the broker, you can select commit, which stores the offset of the message read by the consumer under that partition in Zookeeper. The next time the consumer reads the partition, it starts with the next entry. If not commit, the start position of the next read will be the same as the start position after the last commit. Of course, you can also set consumer to autocommit, that is, consumer automatically commit as soon as it reads the data. If we only discuss the process of reading messages, then Kafka ensures exactly once, but if the message is duplicated due to some reason between the previous producer and broker, then this is at least once.

Consider such a situation, when consumer reads the message, commit before processing the message. In this mode, if consumer crash before processing the message after commit, the message that has just been submitted but not processed will not be read after the next restart, which corresponds to at most once.

After reading the message, process it first and then commit. In this mode, if the finished message is consumer crash before commit, the message that has not just been commit will be processed the next time you restart the work. In fact, the message has already been processed, which corresponds to at least once.

To achieve exactly once, it is necessary to introduce the mechanism of message deduplication.

4.2 message de-duplication

As mentioned in the previous section, Kafka will repeat messages on both the producer side and the producer side, which needs to be reprocessed.

The concept of GUID (Globally Unique Identifier) is mentioned in the Kafka document. The unique id of each message is obtained by the client generation algorithm, and can be mapped to the address stored on the broker, that is, the message content can be queried and extracted through GUID, which is also convenient for the idempotent guarantee of the sender. This de-reprocessing module needs to be provided on broker, which is not supported by the current version.

For GUID, if you remove duplicates from the client's point of view, then you need to introduce a centralized cache, which will inevitably increase the dependency complexity, and the size of the cache is difficult to define.

Not only Kafka, but also commercial middleware such as RabbitMQ and RocketMQ only guarantee at least once and cannot deduplicate messages from themselves. Therefore, we suggest that the business side carry out de-duplication according to its own business characteristics, such as the idempotent nature of the business message itself, or with the help of other products such as Redis.

4.3 High reliability configuration

Kafka provides high flexibility of data redundancy. For scenarios that require high data reliability, we can increase the number of redundant copies of data (replication.factor), increase the minimum number of replicas written (min.insync.replicas), and so on, but this will affect performance. On the contrary, the performance is improved and the reliability is reduced, and users need to make some tradeoff choices between their own business characteristics.

To ensure that data is written to Kafka safely and reliably, the following configuration is required:

Configuration of topic: replication.factor > = 3, that is, at least 3 copies; 2 ack_-1

The higher the number of copies, the lower the TPS. When the number of copies is the same, min.insync.replicas does not affect the TPS.

In the case of acks=0/1, TPS has nothing to do with min.insync.replicas parameters and the number of replicas, but is only affected by acks policies.

Set the number of partition to 1 to further confirm the impact of different acks policies, different min.insync.replicas policies and different number of copies on the sending speed. For more information, please see scenario 2 and scenario 3.

Scenario 2: when the number of partition is fixed at 1, test the impact of different number of copies and min.insync.replicas policy on the sending speed.

Specific configuration: a producer; sending method: the sync; message body size is 1kB; the producer end acks =-1 (all). The number of copies of the transformation: 2 min.insync.replicas 3: 4; the number of copies set to 1-2-4.

The test results are as follows:

Analysis of test results: the higher the number of copies, the lower the TPS (which is consistent with the test conclusion in scenario 1), but there is little difference when the number of partition is 1. Min.insync.replicas does not affect TPS.

Scenario 3: when the number of partition is fixed at 1, test the impact of different acks policies and the number of copies on the sending speed.

Specific configuration: a producer; sending method is sync; message body size of 1kB. In sync. Replicas 1. The number of topic replicas is as follows: 1According to 2According to 4umbacks: 0According to 1Accord 1.

The test results are as follows:

Analysis of test results (consistent with scenario 1):

The more copies, the lower the TPS

The acks policy of the client has a great influence on the TPS sent, TPS:acks_0 > acks_1 > ack_-1.

Scenario 4: test the effect of different partition numbers on the transmission rate

Specific configuration: the body size of a producer; message is 1KB, and the sending method is that the number of sync;topic copies is 2 parts min. In sync. Replicas 2. The number of partition is set to 1 / 2 / 4 / 8 / 12.

Test results:

Analysis of test results: the difference of partition will affect TPS. With the increase of the number of partition, TPS will increase, but it is not always proportional. When it reaches a certain critical value, the increase of the number of partition will slightly reduce TPS.

Scenario 5: test the impact on clients and messages by setting part of the broker in the cluster to an unserviceable state.

Specific configuration: the body size of a producer; message is 1KB; the sending method is that the number of sync;topic replicas is 4: 10 min.insync.replicas is set to 2; the number of partitions is 12.

The specific test data are as follows:

Error message:

Error 1: the client returns an exception, part of the data can be dropped to disk, and part of the failure: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

Error 2: [warn] internals.Sender-Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION

Error 3: [WARN] internals.Sender-Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left) Error: NOT_ENOUGH_REPLICAS

Error 4: [WARN] internals.Sender-Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left) Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

Analysis of test results:

After kill two broker, the client can continue to send. After the decrease of broker, the leader of partition is distributed on the remaining two broker, resulting in the decrease of TPS.

After three broker sets of kill, the client cannot continue to send. The automatic retry function of Kafka starts to work. When the number of broker greater than or equal to the number of min.insync.replicas is restored, you can continue to send.

When the retries is not 0, the messages are dropped repeatedly; all the messages returned by the client are successfully dropped, and some messages can be dropped in the event of an exception.

Scenario 6: test the transmission delay of a single producer, as well as the end-to-end delay.

Specific configuration: the body size of a producer; message is 1KB; the sending method is that the number of sync;topic copies is 4 positions min.insync.replicas is set to 2; the number of alternate partitions is 12.

Test data and results (in ms):

Test summary for each scenario:

When acks=- 1, the TPS of Kafka sender is limited by the number of copies of topic (in ISR). The more copies, the lower the TPS.

In acks=0, TPS***, is the second, and the worst is-1, that is, TPS:acks_0 > acks_1 > ack_-1.

Min.insync.replicas parameters do not affect TPS

The difference of partition will affect TPS. With the increase of the number of partition, TPS will increase, but it is not always proportional. When it reaches a certain critical value, the increase of the number of partition will slightly reduce TPS.

Kafka has high reliability when acks=-1,min.insync.replicas > = 1, and all messages returned successfully can be dropped.

The above is how to interpret the reliability of kafka data in depth. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report