Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the principle of kafka

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how to analyze the principle of kafka. Many people may not know much about it. In order to make you understand better, the editor has summarized the following contents for you. I hope you can get something according to this article.

Brief introduction

Kafka is a distributed message queue. With high performance, persistence, multi-copy backup, scale-out ability. The producer writes the message to the queue, and the consumer takes the message from the queue for business logic. Generally, it plays the role of decoupling, peak cutting and asynchronous processing in architecture design.

Kafka uses the concept of topic externally. Producers write messages to topic and consumers read messages. In order to scale horizontally, a topic is actually composed of multiple partition. When you encounter a bottleneck, you can scale up horizontally by increasing the number of partition. Within a single parition, messages are guaranteed to be orderly.

Every time a new message is written, kafka is written in the corresponding file append, so the performance is very high.

The overall data flow of kafka is as follows:

Kafka data flow

The approximate usage is that Producers writes a message to the specified Topic in the Brokers, and Consumers pulls the message of the specified Topic from the Brokers, and then carries on the business processing.

In the figure, there are two topic,topic 0s, two partition,topic 1s, one partition and three copies. You can see that consumer 2 in consumer gourp 1 is not assigned to partition processing, which is possible, as we will talk about below.

Some meta-information about broker, topics and partitions is stored in zk, and zk is also used in monitoring and routing.

Production

The basic process is as follows:

Kafka sdk product flow.png

Create a record in which you want to specify the corresponding topic and value,key and partition options. Serialize first, and then put it in the corresponding send queue according to topic and partition. Kafka produce is a batch of requests, which will be accumulated and then sent together. Instead of calling send (), you will immediately send packets to the network.

If the partition is not filled in, the situation will look like this:

Key is filled in.

Hash according to key, and the same key goes to a partition. (if the number of partition is expanded, then there is no guarantee.)

Key is not filled in

Round-robin to choose partition

These requests are sent to the same partition according to the configuration, save a wave, and then be sent by a separate thread at once.

API

There is high level api, doing a lot of things for us, offset, routing everything for us, it's very easy to use.

And simple api,offset and everything is for us to record.

Partition

When there are multiple copies, try to assign multiple copies to different broker. Kafka selects a leader for partition, and then all requests for that partition are actually leader, and then synchronized to other follower. When a broker shuts down, all leader partition on that broker will be re-elected to elect a leader. (unlike distributed file storage systems, replicas are automatically replicated to keep the number of copies.)

Then there are two details: how to allocate partition and how to choose leader.

With regard to the allocation of partition and the election of leader, there has to be an executor. In kafka, the executor is called controller. Kafka uses zk to select a controller in broker for partition allocation and leader election.

Allocation of partition

Sort all Broker (assuming a total of n Broker) and Partition to be assigned

Assign the I Partition to the (i mod n) Broker (this is leader)

Assign the j th Replica of the I th Partition to the ((I + j) mode n) Broker

Leader disaster recovery

Controller registers Watch on the / brokers/ids node of Zookeeper, and it will know if broker goes down. When broker goes down, controller selects a new leader for the affected partition. Controller reads the ISR (synchronized copy of in-sync replica) list of the corresponding partition from the / brokers/topics/ [topic] / partitions/ [partition] / state of zk, and selects one to be leader.

After the leader is selected, update the zk, and then send the LeaderAndISRRequest to the affected broker to let them know about it. Why not use zk notifications here, but send rpc requests directly to broker? my understanding may be that there are performance problems with zk.

If the ISR list is empty, then according to the configuration, randomly choose a replica to do the leader, or simply stop the partition. If there is a machine in the ISR list, but it is not working, then you can wait for ISR's machine to come back to life.

Multi-copy synchronization

The strategy here is that follower pulls data in batches from leader to synchronize on the server side. But the specific reliability is determined by the producer.

When the producer produces the message, the request.required.acks parameter is used to set the reliability of the data.

Ackswhat happen0which means that the producer never waits for an acknowledgement from the broker. Just send it. I don't care whether broker is successful or not, and the data may be lost. 1which means that the producer gets an acknowledgement after the leader replica has received the data. When the Leader is successfully written, it will be returned. All other replica are synchronized through fetcher, so the kafka is written asynchronously, and data may be lost when switching between master and slave. -1which means that the producer gets an acknowledgement after all in-sync replicas have received the data. Success cannot be returned until all the machines in the isr are synchronized, and the delay depends on the slowest machine. Strong consistency, will not lose data.

In acks=-1, if the number of ISR is less than the number specified by min.insync.replicas, it will be returned unavailable.

Here the machines in the ISR list will change, and according to the configuration of replica.lag.time.max.ms, how long they are not synchronized will be removed from the ISR list. In the past, ISR was kicked out based on how many messages fell behind, but it was removed after version 1.0, because this value is difficult to get, and it is easy to see nodes constantly entering and leaving the ISR list during peak times.

After selecting leader from ISA, follower removes the record after the last high water level in his log and goes to leader to get the new data. Because after the new leader is selected, there may be more data on the follower than the new leader, so it has to be intercepted. The high water level here means that for partition and leader, it is the latest record in all ISR. Consumers can only read the high water level at most.

From the leader point of view, the update of the high water level will be delayed by one round, for example, a new message has been written, and all the broker in the ISR have arrived, but the broker in the ISR can only be told to leader in the next round of fetch.

It is precisely because the high water level delays one round, in some cases, the data loss of kafka is inconsistent with the master / standby data. Starting from 011, leader epoch is used to replace the high water level. (https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)

Think about:

When acks=- 1

Will all follwers come to fetch and return success, or wait for the second round of fetch of follwers?

Leader has been written locally, but some machines in ISR have failed, so what to do?

Consumption

Subscription topic is subscribed to by a consumer group, and there can be multiple consumers in a consumer group. Two consumers in the same consumer group will not consume the same partition at the same time. In other words, a partition can only be consumed by one consumer in a consumer group, but can be consumed by multiple consumer groups at the same time. Therefore, if there are more consumers in the consumer group than partition, then individual consumers will be idle all the time.

Untitled_page.png

API

When subscribing to topic, you can use regular expressions, and if there is a new topic match, it can be subscribed automatically.

Preservation of offset

A consumer group consumes partition and needs to save offset records of where it is consumed. Previously, it was saved in zk. Due to the poor write performance of zk, the previous solution was to report consumer every other minute. Here, the performance of zk seriously affects the speed of consumption, and it is easy to repeat consumption.

After version 0.10, kafka strips the offset from the zk and stores it in a topic called _ _ consumeroffsets topic. The key written into the message consists of groupid, topic, and partition, and value is the offset offset. The cleanup policy configured by topic is compact. Always keep the latest key and delete the rest. In general, the offset of each key is cached in memory. When querying, you do not have to traverse the partition. If there is no cache, you will traverse the partition for the first time to establish the cache, and then the query returns.

Determine which partition of consumer group displacement information is written into _ _ consumers_offsets, and the specific calculation formula:

_ _ consumers_offsets partition = Math.abs (groupId.hashCode ()% groupMetadataTopicPartitionCount) / / groupMetadataTopicPartitionCount is specified by offsets.topic.num.partitions and defaults to 50 partitions.

Think about:

If the running service modifies the offsets.topic.num.partitions, is the offset saved out of order?

Assign partition--reblance

In the production process, broker needs to allocate partition, and here in the consumption process, partition should also be assigned to consumers. If a controller is selected from the broker, the consumer also chooses a coordinator from the broker to allocate the partition.

Let's elaborate on it from top to bottom.

How to choose coordinator.

Interactive process.

The process of reblance.

Choose coordinator

Look at the offset saved in that partition.

The broker where the partition leader is located is the selected coordinator.

Here we can see that the coordinator of consumer group is the same machine as the partition leader that holds the consumer group offset.

Interactive flow

After the coordinator is selected, it is time to assign it.

The whole process goes like this:

If consumer starts or coordinator goes down, consumer will request a broker at will and send a ConsumerMetadataRequest request. Broker will select the address of the consumer corresponding to coordinator according to the method mentioned above.

If consumer sends a heartbeat request to coordinator and returns IllegalGeneration, it means that the information of consumer is old and needs to be rejoined to reblance. If success is returned, the consumer continues execution from the last allocated partition.

Reblance process

Consumer sends a JoinGroupRequest request to coordinator.

At this point, when other consumer sends a heartbeat request, coordinator will tell them that it is time for reblance.

Other consumer sends JoinGroupRequest requests.

After all the recorded consumer have sent a JoinGroupRequest request, coordinator will choose any leader from the consumer here. Then go back to JoinGroupRespone, which will tell consumer whether you are follower or leader, and for leader, it will also bring follower information to it and ask it to allocate partition based on this information.

5. Consumer sends SyncGroupRequest to coordinator, where the SyncGroupRequest of leader contains the allocation.

6. Coordinator returns the packet and tells consumer about the allocation, including leader.

When the number of partition or consumers changes, you have to do reblance.

List the situation of the meeting reblance:

Add partition

Increase consumers

Consumers shut down voluntarily.

Consumers are down.

Coordinator himself is down.

Message delivery semantics

Kafka supports three message delivery semantics

At most once: once at most, the message may be lost, but will not be repeated

At least once: at least once, the message will not be lost and may be repeated

Exactly once: only once, the message is not lost, not repeated, and consumed only once (implemented in 0.11, only downstream is also kafka)

In a business, the At least once model is often used, and if reentrant is needed, the business is often implemented on its own.

At least once

First obtain the data, and then carry on the business processing, after the business processing is successful, commit offset.

1. Producer production message exception, whether the message was successfully written is uncertain, redo, may write duplicate message

2. Consumers process messages. After successful business processing, the update of offset fails, and consumers will repeat consumption if they restart.

At most once

First get the data, then commit offset, and finally carry on the business processing.

1. The producer's production message is abnormal. No matter if the next message is produced, the message will be lost.

2. When the consumer processes the message, update the offset first, and then do the business processing. If the business processing fails, the consumer restarts and the message is lost.

Exactly once

The idea is like this, first of all, to ensure that the information is not lost, and then to ensure that there is no repetition. So the reason to stare at At least once. The first thing I came up with:

Producer redo results in repeated writing of messages-production guarantees idempotency

Consumers' repeated consumption-it is no problem to eliminate repeated consumption, or the business interface ensures idempotent repeated consumption.

Since whether the business interface is idempotent or not is not guaranteed by kafka, the exactly once provided by kafka is limited, and the downstream of consumers must also be kafka. So what I'm going to discuss, without special note, the downstream systems of consumers are all kafka (Note: use kafka conector, which adapts to some systems and implements exactly once).

Producer idempotency is easy to do, there is no problem.

There are two ways to solve the problem of repeated consumption:

The downstream system is idempotent, and repeated consumption will not lead to multiple records.

Bind commit offset and business processing into one transaction.

Originally, exactly once implemented the first point of ok.

But in some usage scenarios, our data source may be multiple topic, processed and output to multiple topic, then we will want to output either all successful or all failed. This requires transactionality. Since you want to do a transaction, you might as well solve the problem of repeated consumption at its root and bind commit offset and output to other topic into a transaction.

Production idempotency

The idea is that each producer is assigned a pid as the unique identity of that producer. Producer maintains a monotonously increasing seq for each. Similarly, broker records the latest seq for each. Broker accepts the message only when req_seq = = broker_seq+1. Because:

When the seq of the message is larger than the seq of broker, it means that some data has not been written in the middle, that is, it is out of order.

The seq of the message is no less than the seq of broker, so the message has been saved.

Solve the problem of repetitive production

Transactional / atomic broadcasting

The scene goes like this:

First get the data from multiple source topic.

Do business processing, write to multiple destinations downstream topic.

Update the offset of multiple source topic.

Among them, points 2 and 3 are either all successful or all failed as a transaction. Here the benefits and offset are actually saved with a special topic, both of which are normalized into the transactional processing of writing multiple topic.

The basic idea is as follows:

The introduction of tid (transaction id), unlike pid, is provided by the application to identify transactions, regardless of who the producer is. That is, any producer can use this tid to do transactions, so that transactions that die in the middle of the process can be recovered by another producer.

At the same time, in order to record the status of the transaction, similar to the processing of offset, transaction coordinator is introduced to record transaction log. There will be multiple transaction coordinator in the cluster, and each tid corresponds to a unique transaction coordinator.

Note: the transaction log deletion policy is compact, and completed transactions are marked as null,compact and are not retained.

When doing a transaction, first mark the open transaction and write data, and all successes are recorded as prepare commit status in transaction log, otherwise the state of prepare abort is written. A marker (commit or abort) message is then written to each relevant partition, marking that the message of the transaction can be read or deprecated. After success, record the commit/abort status in transaction log, and the transaction ends.

Data flow:

Kafka Transactions Data Flow.png

First use tid to request any broker (the broker with the least load is written in the code) to find the corresponding transaction coordinator.

Request transaction coordinator to get the corresponding pid and epoch corresponding to pid. This epoch is used to prevent message confusion caused by the resurrection of zombie processes. When the epoch of the message is smaller than the currently maintained epoch, reject it. Tid and pid have an one-to-one correspondence, so the same pid is returned for the same tid.

Client first requests the transaction status recorded by transaction coordinator. The initial state is BEGIN. If it is the first one in the transaction, the transaction will be timed; client outputs data to the relevant partition; client requests transaction coordinator to record the transaction status of offset; and client sends offset commit to the corresponding offset partition.

Client sends a commit request, transaction coordinator records the prepare commit/abort, and then sends marker to the relevant partition. After all success, record the status of the commit/abort, and the last record does not need to wait for the ack of other replica, because the final correctness can be guaranteed if the prepare is not lost.

Here, the status of prepare is mainly used for transaction recovery. For example, sending a control message to the relevant partition causes downtime before it is sent. After the standby is up, when producer sends a request to obtain pid, it will complete the outstanding transaction.

When the marker of commit is written in partition, the related messages can be read. So during the period from prepare commit to commit for kafka transactions, messages are gradually visible, not at the same time.

For details, please see: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees

Consumer affairs

The foregoing is to look at affairs from the perspective of production. There are some issues that need to be considered from the perspective of consumption.

When consuming, there will be some messages in the partition that are not commit, that is, messages that should not be seen by the business, which need to be filtered so that the business does not see them. Kafka chooses to filter in the consumer process rather than in the broker, and the main consideration is performance. One of the key points of the high performance of kafka is zero copy. If you need to filter in broker, then you must read the message contents into memory, and you will lose the features of zero copy.

Document organization

The data of kafka is actually stored in the file system as a file. Topic has partition,partition and segment,segment is an actual file, topic and partition are both abstract concepts.

In the directory / ${topicName}-{$partitionid} /, the actual log file (that is, segment) is stored, along with the corresponding index file.

Each segment file is equal in size and the file name is named after the smallest offset in the segment with a .log file extension; the segment corresponding index has the same file name with a .index extension. There are two index files, one is offset index to check message by offset, the other is time index to check according to time, in fact, here can be optimized together, the following only say offset index. The overall organization is as follows:

Kafka file organization.png

In order to reduce the size of the index file, reduce the use of space, and facilitate direct loading into memory, the index here uses a sparse matrix, which does not record the specific location of every message, but builds an index every certain number of bytes. The index consists of two parts, baseOffset and position.

BaseOffset: it means that this index corresponds to the number of message in the segment file. This facilitates the use of numerical compression algorithms to save space. For example, kafka uses varint.

Position: absolute position in segment.

When looking up the records corresponding to offset, we will first use dichotomy to find out which segment the corresponding offset is in, and then use the index to locate the approximate location of offset in segment, and then traverse to find message.

Common configuration item broker configuration item the unique identity auto.create.topics.auto of broker.idbroker is set to true, that is, the topic is automatically created if there is no topic. The number of directories in log.dirslog. Put partition in the directory. When you generate a new partition, you will choose the directory with the least number of partition in the directory. The topic configuration item functions as num.partitions to create a new topic, and there will be several partition. Log.retention.ms corresponds to the unit of minutes,hours. Log retention time, because deletion is a file dimension rather than a message dimension, which depends on the mtime of the log file. Log.retention.bytespartion maximum capacity, more than clean up the old one. Note that this is the partion dimension, that is, if your topic has 8 partition and is configured with 1G, then on average, the theoretical maximum value of topic is 8G. Log.segment.bytes the size of a segment. Roll when you exceed it. Log.segment.ms the opening time of a segment, scroll after it has passed. What is the maximum size of message.max.bytesmessage

With regard to log cleanup, the log currently being written by default will not be cleaned up by any means.

There are versions before 0.10, the time is to look at the log file mtime, but this is not accurate, it is possible that the file was touch, mtime changed. So starting with version 0.10, use the time of the latest message in the file instead.

Clean up by size also note here that Kafka attempts to compare whether the total size of the current log exceeds the threshold size of at least one log segment in a scheduled task. If it exceeds but does not exceed one log segment, it will not be deleted.

After reading the above, do you have any further understanding of how to analyze the principle of kafka? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report