Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the Kafka source code and the broker side

2025-04-12 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces you how to carry out Kafka source code analysis and broker side, the content is very detailed, interested friends can refer to, hope to be helpful to you.

Let's start with how kafka creates a topic:

Kafka-topics-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic test

There are several parameters:

-- address of zookeeper:zookeeper

-- replication-factor: replica factor

-- partitions: number of partitions (default is 1)

-- topic:topic name

two。 What is zoning?

A topic can have multiple partitions, and the message for each partition is different. Although partitions can provide higher throughput, the more partitions, the better. Generally, the number of partitions should not exceed the number of machines in the kafka cluster. The more memory and file handles are consumed by the partition. The general partition is set to 3-10. For example, there are three machines in the cluster. To create a topic named test, the number of partitions is 2, as shown in the figure:

Partiton is a recordset that is sequentially tangent and immutable, and is constantly appended to the log file. Every message in partition is assigned an id, that is, offset (offset). Offset is used to mark a record of the partition. Here we use the diagram of the official website. I didn't draw it well:

2.1 producer side and partition relationship

In the case of the figure, which partition will the producer send mq to? This is also a parameter partitioner.class that we mentioned in the previous section. The default partition processing is: if there is key, use murmur2 algorithm to calculate the hash value of key, take the module to calculate the partition number for the total partition, and poll if there is no key. (org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition). Of course, we can also customize the partition policy by implementing the org.apache.kafka.clients.producer.Partitioner API:

/ * Compute the partition for the given record. * * @ param topic The topic name * @ param key The key to partition on (or null if no key) * @ param keyBytes serialized key to partition on (or null if no key) * @ param value The value to partition on or null * @ param valueBytes serialized value to partition on or null * @ param cluster The current cluster metadata * / public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic (topic) Int numPartitions = partitions.size (); if (keyBytes = = null) {int nextValue = nextValue (topic); List availablePartitions = cluster.availablePartitionsForTopic (topic); if (availablePartitions.size () > 0) {int part = Utils.toPositive (nextValue)% availablePartitions.size (); return availablePartitions.get (part). Partition ();} else {/ / no partitions are available, give a non-available partition return Utils.toPositive (nextValue)% numPartitions }} else {/ / hash the keyBytes to choose a partition return Utils.toPositive (Utils.murmur2 (keyBytes)) numPartitions;}}

2.2 consumer side and partition relationship

First, let's take a look at the definition of consumer group on the official website: Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

Consumers mark themselves with a consumer group name, and a topic message is sent to a consumer instance of the consumer group that subscribes to it.

Consumer group is a consumer mechanism used to achieve high scalability and high fault tolerance. If a consumer fails or a new consumer,consumer group will be rebalanced (rebalance), the rebalancing mechanism will be explained in detail in the consumer section, but not in this section. Then continue to draw the consumer side according to the above picture:

Here is the best-case scenario, where two partition correspond to two consumer in one group. So think, what if the number of consumers in a consumer group is greater than the number of divisions? Or less than the number of partitions?

If the number of consumers in a consumption group is greater than the number of partitions, then the equivalent of excess consumers is a waste, and excess consumers will not be able to consume messages.

If the number of consumers in a consumer group is less than the number of divisions, there will be a corresponding consumer partition allocation strategy. One is Range (default), the other is RoundRobin (polling), of course, you can also customize the policy. As a matter of fact, every consumer can work with a balanced load. The details will be explained in the consumer section, not here.

Recommendation: the number of configuration partitions is an integral multiple of the number of consumers

three。 Copy and ISR Design

3.1 what is a copy

When creating a topic, there is a parameter-- replication-factor to set the number of copies. Kafka maintains high availability of the system with multiple identical backups, which are called replica in Kafka. Copies are divided into three categories:

Copy of leader: respond to read and write requests on the producer side

Follower copy: back up the data of the leader copy without responding to the read and write requests on the producer side!

ISR replica collection: contains 1 leader copy and all follower copies (or there may be no follower copies)

Kafka distributes all replicas evenly over all broker in kafka-cluster, and selects one of these replicas as leader replicas and the others as follow replicas. If the broker where the leader copy resides is down, then one of the follow replicas becomes the leader replica. The leader replica receives read and write requests from the producer side, while the follow copy only requests data from the leader copy and does not receive read and write requests!

3.2 copy synchronization mechanism

As mentioned above, ISR is to dynamically maintain a set of synchronous replicas, and leader copies are always contained in the ISR collection. Only copies in ISR are eligible to be elected as leader copies. When the ack parameter on the producer side is configured to all (- 1), the mq written by producer needs to be received by all copies of ISR before it is considered committed. Of course, as mentioned in the previous section, the ack parameter must be used in conjunction with the min.insync.replicas (default is 1) parameter on the broker side to achieve the effect, and this parameter controls how many copies are written to the isr to be successful. If the number of replicas in ISR is less than min.insync.replicas, the client returns the exception org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required.

To understand the replica synchronization mechanism, you need to learn a few terms:

High Watermark: messages with a copy of a high water level, or HW for short, less than HW or below HW are considered "backed up", and HW points to the next message! The HW value of the leader copy determines the number of messages that consumer can poll! consumer can only consume messages that are less than HW!

LEO:log end offset, the displacement of the next message. In other words, there is no message about the location that LEO points to!

Remote LEO: strictly speaking, this is a collection. The broker where the leader copy is located maintains a Partition object in memory to hold the corresponding partition information, and this Partition maintains a Replica list that holds all the replica objects of the partition. In addition to the copy of leader Replica, the LEO of other Replica objects in the list is called remote LEO.

Here is a practical example (refer to Hu Xi blog for this example). In this example, the topic is a single partition and the copy factor is 2. That is to say, one copy of leader, one copy of follower, and the ISR contains a collection of these two replicas. Let's first take a look at what happens to the copy object of the leader/ follower side broker when producer sends a message and how the partition HW is updated. The first is the initial state:

At this point, producer sends a message to the topic partition. The state at this time is shown in the following figure:

As shown in the figure above, after producer successfully sends a message (assuming acks=1, leader is successfully written and returned), follower sends a new FECTH request and still requests the data with fetchOffset = 0. Unlike last time, there is data to read this time, so the whole process is shown in the following figure:

Obviously, both leader and follower have saved the message with a shift of 0, but the HW values on both sides have not been updated, and they need to be updated in the next round of FETCH request processing, as shown in the following figure:

To explain briefly, in the second round of FETCH requests, follower sends a FETCH request with fetchOffset = 1-- because the message with fetchOffset = 0 has been successfully written to the follower local log, this time the data with fetchOffset = 1 is requested. After receiving the FETCH request, the broker on the Leader side will first update the Leo value in the other replicas, that is, update the remote LEO to 1, and then update the partition HW value to 1 Mel-see the above explanation for the specific update rules. After doing this, encapsulate the current partition HW value (1) into FETCH response and send it to follower. After receiving the FETCH response, the broker on the Follower side extracts the current partition HW value 1, and then compares it with its own Leo value, thus updating its HW value to 1, thus ending the complete HW and LEO update cycle.

3.3 ISR Maintenanc

After version 0.9.0.0, there is only one parameter: replica.lag.time.max.ms to determine whether the copy should be in the ISR collection, which defaults to 10s. This means that if a follower copy takes longer than 10s to respond to a leader copy, Kafka will think that the copy has been removed from the synchronized replica list.

four。 Log design

Each topic in Kafka is isolated from each other, and each topic can have one or more partitions, and each partition has log files that record message data:

There is a topic for demo-topic in the figure. This topic has eight partitions, each of which has a message log file named [topic-partition]. In the partition log file, you can see several files with the same prefix but different file types. For example, the three files in the figure, (00000000000000000000.index, 00000000000000000000.timestamp, 00000000000000000000.log). This is called a LogSegment (log segmentation).

4.1 LogSegment

For a specific example of a test environment, for a topic named ALC.ASSET.EQUITY.SUBJECT.CHANGE, let's look at the log file of partition0:

Each LogSegment contains a collection of files with the same file name. The file name is fixed to 20 digits, if the file name is 00000000000000000000 representing the current LogSegment, the offset (offset) of the first message is 0, and if the file name is 0000000000000097, the offset (offset) of the first message of the current LogSegment is 97. Log files have a variety of suffixes, focusing on .index, .timestamp and .log files.

.index: offset index file

.timeindex: time index file

.log: log file

.snapshot: snapshot fil

.swap: temporary files after Log Compaction

4.2 Index and log files

Kafka has two kinds of index files, the first is the offset (offset) index file, that is, the file at the end of the .index. The second is the timestamp index file, that is, the file at the end of the .timeindex.

We can use kafka-run-class.sh to view the contents of the offset (offset) index file:

You can see that each line is offset:xxx position:xxxx. There is no direct relationship between the two.

Offset: relative offset

Position: physical address

So what does the first line of offset:12 position:4423 mean? It represents the physical address of messages with offsets from 0-12 to 0-4423.

By the same token, the meaning of offset:24 position:8773 in the second line can also be guessed: it represents the physical address of messages with offsets from 13-24 in the range of 4424-8773.

We can use kafka-run-class.sh to look at the contents of the .log file and pay attention to the values of baseOffset and postion. Take a look at the corresponding to the above.

4.3How to search with offset

According to the above example, how to query a message with an offset of 60

According to offset, first find the corresponding LogSegment, and find 00000000000000000000.index here.

Find the largest index item not greater than offset by dichotomy, and find offset:24 position:8773 here

Open the 00000000000000000000.log file and scan sequentially from the place where position is 8773 until you find the message of offset=60

On how to carry out Kafka source code analysis and Broker side to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 244

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report