In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "the principle of Kafka and partition allocation strategy". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "the principle of Kafka and partition allocation strategy".
I. brief introduction
Apache Kafka is a distributed flow processing platform (distributed publish / subscribe-based message queue [Message Queue]).
The stream processing platform has the following three features:
It allows you to publish and subscribe to streaming records. This is similar to message queuing or enterprise messaging systems.
It can store flow records and has good fault tolerance.
It can be processed as soon as the streaming record is generated.
1.1 two modes of message queuing 1.1.1 peer-to-peer mode
The producer sends the message to the queue, and the consumer takes the message out of the queue and consumes the message. After the message is consumed, it is no longer stored in the queue, so it is impossible for the consumer to consume the message that has been consumed. Queue supports multiple consumers, but for a message, it can only be consumed by one consumer.
1.1.2 publish / subscribe model
Producers publish the message to topic, and multiple consumers can subscribe to the message at the same time. Unlike peer-to-peer, messages posted to topic are consumed by all subscribers.
1.2 what kind of scenarios is Kafka suitable for?
It can be used in two major categories of applications:
A real-time streaming data pipeline is constructed, which can obtain data reliably between systems or applications. (equivalent to message queue).
Build real-time streaming applications to transform or influence these streaming data. Stream processing, which changes internally between kafka stream topic and topic.
To understand how Kafka does the functions mentioned above, we will delve deeper into the features of Kafka from the following.
First, there are some concepts:
Kafka runs on one or more servers as a cluster.
Kafka classifies the stored stream data through topic.
Each record contains a key, a value, and a timestamp (timestamp).
1.3 themes and partitions
Kafka messages are classified by Topic, just like tables in a database, or folders in a file system. Topics can be divided into several Partition, each of which is a commit log. The message is written to the partition as an append and then read in first-in-first-out order. Note that because a topic generally contains several partitions, the order of messages cannot be guaranteed across the entire topic, but the order of messages within a single partition can be guaranteed. Themes are logical concepts, and physically, a topic spans multiple servers.
The Kafka cluster keeps records of all releases (whether they have been consumed or not) and is controlled by a configurable parameter, the retention period (you can configure both the time and the message size, whichever is the smaller). For example, if the retention policy is set to 2 days, a record can be consumed at any time within two days after it is published, and after two days the record will be discarded and free disk space.
Sometimes we need to increase the number of partitions, such as to expand the capacity of the theme, to reduce the throughput of a single partition, or to run more consumers within a single consumer group (because a partition can only be read by one consumer in the consumer group). From the consumer's point of view, it is difficult to add partitions to key-based topics, because the key-to-partition mapping changes as the number of partitions changes, so for key-based themes, it is recommended to set partitions at the beginning to avoid adjusting them later.
(note: the number of partitions cannot be reduced, because if the partition is deleted, the data in the partition will also be deleted, resulting in data inconsistency. If you have to reduce the number of partitions, you can only delete the topic rebuild)
1.4 producers and consumers
The producer (publisher) creates a message, and in general, a message is published to a specific topic. By default, producers distribute messages evenly across all partitions of the topic, regardless of which partition a particular message will be written to. However, the producer can also write the message directly to the specified partition. This is usually achieved through a message key and a divider, which generates a hash value for the key and maps it to the specified partition. Producers can also customize the divider to map messages to partitions according to different business rules.
Consumers (subscribers) read messages, and consumers can subscribe to one or more topics and read them in the order in which the messages are generated. The consumer distinguishes between messages that have been read by checking the offset of the message. An offset is a kind of metadata, which is an increasing integer value that kafka adds to the message when it is created. The offset of each message is unique in a given partition. The consumer saves the last message offset read by each partition on zookeeper or kafka, and its read state is not lost if the consumer shuts down or restarts.
Consumers are part of the consumer group, that is, there will be one or more consumers reading a topic together. The consumer group ensures that each partition can only be used by one consumer in the same group. If a consumer fails, other consumers in the group can take over the work of the failed consumer.
1.5 broker and Cluster
Broker: a stand-alone kafka server is called broker. Broker receives messages from producers, sets offsets for messages, and submits messages to disk for storage. Broker provides services for consumers to respond to requests to read partitions and return messages that have been submitted to disk.
Cluster: the broker nodes that are managed by the same zookeeper cluster form the kafka cluster.
Broker is part of a cluster, and each cluster has a broker that also acts as a cluster controller. The controller is responsible for management, including assigning partitions to the broker and monitoring the broker. In broker, a partition is subordinate to a broker, and the broker is called the partition leader. A partition can be assigned to multiple broker (when Topic has multiple replicas set), partition replication occurs. As shown below:
How broker handles requests: broker runs an Acceptor thread on each port it listens to, which creates a connection and hands it over to the process thread. The number of Processor threads (also known as network threads) is configurable. The Processor thread is responsible for getting request information from the client, putting them into the request queue, and then getting the response information from the response queue and sending it to the client. As shown in the following figure:
Both the production request and the get request must be sent to the leader copy of the partition (partition Leader). If broker receives a request for a particular partition and the partition leader is on another broker, the client sending the request will receive a "non-partition leader" error response. The Kafka client is responsible for sending production and acquisition requests to the correct broker.
How does the client know where to send the request? The client uses another type of request, the metadata request. This request contains a list of topics of interest to the client, and the server's response message indicates the partitions contained in these topics, which copies of each partition, and which copy is the leader. Metadata requests can be sent to any broker because all broker caches this information. The client caches the metadata and periodically refreshes the information from broker requests. In addition, if the client receives a "non-leader" error, it will refresh the metadata before attempting to resend the request.
1.6 Kafka Infrastructure
2. Kafka architecture goes deep into 2.1Kafka workflow and file storage mechanism 2.1.1 workflow
Messages in Kafka are classified by topic. Producer production messages and consumer consumption messages are all oriented to topic.
Topic is a logical concept, while partition (partition) is a physical concept. Each partition corresponds to a log file, which stores the data produced by producer. The data produced by Producer is constantly appended to the end of the log file, and each piece of data has its own offset. Each consumer in the consumer group records which offset he or she consumes in real time so that when the error recovers, he or she will continue to spend from the last location.
2.1.2 File storage mechanism
As the messages produced by the producers are constantly appended to the end of the log file, in order to prevent the log file from being too large and leading to inefficient data location, Kafka adopts the mechanism of slicing and indexing, dividing each partition into multiple segment. (it is up to log.segment.bytes to control the size of each segment, or it can be controlled by log.segment.ms to specify how long the log segment will be closed.) each segment corresponds to two files-- the ".index" file and the ".log" file. These files are located in a folder with the naming convention: topic name + partition serial number. For example, if the topic bing has three partitions, its corresponding folders are: bing-0, bing-1, and bing-2.
Index and log file naming rules: each LogSegment has a benchmark offset that represents the offset of the first message in the current LogSegment. Offset is a 64-bit long shaping number, fixed is 20 digits, the length is not reached, filled with 0. As shown in the following figure:
The index and log files are named after the offset of the first message in the current segment. The index file records the offset and the corresponding physical location of the data file. It is with this index file that you can write and view any data with O (1) complexity. The granularity of the index file can be controlled by the parameter log.index.interval.bytes. The default is to record an index every 4096 bytes. The following figure shows the structure of the index file and the log file:
The process for finding message (for example, to find a message with an offset of 170417):
First, a binary lookup is used to determine which Segment file it is in, where 0000000000000000000.index is the first file, the second file is 0000000000000170410.index (starting offset 170410 offset 1 = 170411), and the third file is 0000000000000239430.index (starting offset 239430 offset 1 = 239431). So this offset = 170417 falls in the second file. Other subsequent files can be named and arranged with the initial offset, and then the specific file location can be quickly located according to the binary search method.
Use this offset to subtract the number of the index file, that is, 170417-170410 = 7, and use the binary search method to find the largest number equal to or less than 7 in the index file. You can see that we can find the offset of the set of data [4476] in the log file for the message offset=170410 + 4 = 170414.
Open the data file (0000000000000170410.log) and scan sequentially from location 476 until you find the Message with offset 170417.
2.1.3 data expiration mechanism
When the log fragment reaches the upper limit specified by log.segment.bytes (default is 1GB) or when the log segment is opened to log.segment.ms, the current log segment will be closed and a new log segment will be opened. If a log fragment is closed, it starts waiting for expiration. The segment currently being written is called the active segment, and the active segment will never be deleted, so if you want to keep the data for 1 day, but the fragment contains 5 days of data, then the data will be retained for 5 days, because the data cannot be deleted until the fragment is closed.
2.2 Kafka producer 2.2.1 Partition Policy
Multi-Partition distributed storage is beneficial to the balance of cluster data.
Read and write at the same time to speed up the reading and writing speed.
Speed up data recovery: when a machine dies, each Topic only needs to recover part of the data, and multiple machines are concurrent.
Principles of zoning
In the case of a specified partition, use the specified partition
Partition is not specified, but in the case of key, the partition value is obtained by combining the hash value of key with the partition number of topic.
In the case of neither partition nor key, an integer is randomly generated on the first call (incremented on this integer for each subsequent call), and this value is added to the number of partition available to topic to get the partitioning value, which is commonly known as the round-robin algorithm.
Public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic (topic); int numPartitions = partitions.size (); if (keyBytes = = null) {/ / key is empty, get a self-incrementing count, and then model the partition to get the partition number int nextValue = nextValue (topic); List availablePartitions = cluster.availablePartitionsForTopic (topic) If (availablePartitions.size () > 0) {int part = Utils.toPositive (nextValue)% availablePartitions.size (); return availablePartitions.get (part) .partition ();} else {/ / no partitions are available, give a non-available partition return Utils.toPositive (nextValue)% numPartitions }} when else {/ / hash the keyBytes to choose a partition / / key is not empty, select the partition module through key's hash (question: why not use availablePartitions here as above? ) / / according to the "Kafka authoritative Guide" Page45 understands: in order to ensure the same key, it can always be routed to fixed partitions, if available partitions are used, then the same key will be routed to different partitions because of the change in the number of partitions. / / so if you want to use key to map partitions. It is best to plan the return Utils.toPositive (Utils.murmur2 (keyBytes))% numPartitions when creating the theme. }} private int nextValue (String topic) {/ / maintains one AtomicInteger object for each topic, + 1 AtomicInteger counter = topicCounterMap.get (topic) for each fetch; if (null = = counter) {counter = new AtomicInteger (ThreadLocalRandom.current (). NextInt ()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent (topic, counter); if (currentCounter! = null) {counter = currentCounter }} return counter.getAndIncrement ();} 2.2.2 data reliability guarantee
What guarantees does kafka provide?
Kafka guarantees the order of partition messages. If the message is written to the same partition using the same producer, and message B is written after message A, then kafka can guarantee that the offset of message B is larger than that of message A, and that the consumer will read message A first and then message B.
A message is considered "committed" only when it is written to all copies of the partition. The producer can choose to receive different types of acknowledgements, such as when the message is fully submitted, when the message is written to the partition leader, or when the message is sent to the network.
As long as one copy is active, the information that has been submitted will not be lost.
Consumers can only read messages that have been submitted.
Copy
Kafka replication mechanism and partition multi-replica architecture are the core of kafka reliability guarantee. Writing a message to multiple copies enables kafka to keep the message persistent in the event of a crash.
Kafka's topic is divided into multiple partitions, which are basic blocks of data. Each partition can have multiple copies, one of which is the leader. All events are sent to the leader copy or read directly from the leader copy. Other copies only need to be synchronized with the leader copy and copy the latest events in a timely manner.
Leader maintains a dynamic in-sync replica set (ISR), which means a collection of follower that is synchronized with leader. When the follower in ISR completes data synchronization, leader sends ack. If the follower does not synchronize data with the leader for a long time, the follower will be kicked out of the ISR, and the time threshold is set by the replica.lag.time.max.ms parameter. When the Leader is not available, a new leader is elected from the ISR. The following conditions are met to be considered synchronous:
There is an active session with zookeeper, that is, it has sent a heartbeat to zookeeper in the past 6s (configurable).
The latest data has been obtained from the leader in the past 10s (configurable).
Configurations that affect the reliability of Kafka message storage
Ack response mechanism
For some less important data, the reliability of the data is not very high, and a small amount of data loss can be tolerated, so there is no need to wait for all the follower in the ISR to be received successfully. So Kafka offers three levels of reliability that users can weigh against requirements for reliability and latency. Acks:
0: producer does not wait for broker's ack, which provides a minimum delay. Broker returns as soon as it is received that it has not been written to disk, and may lose data when broker fails.
1: producer waits for ack to be returned after the leader of leader's ack,partition is successfully removed. If the leader fails before the follower synchronization succeeds, the data will be lost.
-1 (all): producer waits for the leader of broker's ack,partition and the follower of ISR to be completed before returning to ack. However, if the leader fails after the follower synchronization is completed and before the broker sends the ack, it will result in duplicate data. (it is also possible to lose data in extreme cases: when there is only one Leader in the ISR, it is equivalent to 1).
Consumption consistency guarantee
(1) follower failure
After a failure, the follower will be temporarily kicked out of the ISR. After the follower is restored, the follower will read the last HW recorded on the local disk, truncate the part where the log file is higher than the HW, and synchronize with the leader from HW.
When the LEO of the follower is greater than or equal to the HW of the Partition, that is, after the follower catches up with the leader, you can rejoin the ISR.
(2) leader failure
When the leader fails, a new leader is selected from the ISR. In order to ensure data consistency among multiple replicas, the rest of the follower will first truncate the part of their log file that is higher than the HW, and then synchronize the data from the new leader.
Note: this only ensures data consistency between replicas, not data loss or duplication.
2.2.3 message sending process
Kafka's producer sends messages asynchronously. In the process of sending the message, two threads are involved-- the main thread and the sender thread, and a thread sharing the variable-- RecordAccumulator. The main thread sends messages to the RecordAccumulator,sender thread that constantly pulls messages from the RecordAccumulator and sends them to the Kafka broker.
In order to improve efficiency, messages are written to kafka in batches. A batch is a set of messages that belong to the same topic and partition. If each message travels through the network separately, it will cause a lot of network overhead, which can be reduced by dividing the message into batches. However, there is a trade-off between time delay and throughput: the larger the batch, the more messages are processed per unit time, and the longer the transmission time of a single message. The batch data will be compressed, which can improve the transmission and storage capacity of the data, but more computing needs to be done.
Related parameters:
Batch.size: sender will not send data until the data has been accumulated to batch.size. (unit: bytes, note: not the number of messages).
Linger.ms: if the data is too late to reach the batch.size,sender, it will also be sent after waiting for the linger.ms. (in milliseconds).
Client.id: this parameter can be any string and can be used by the server to identify the source of the message and to be used in logging and quota metrics.
Max.in.flight.requests.per.connection: this parameter specifies how many messages the producer can send before receiving the server response. The higher its value, the more memory it takes up, but it also increases throughput. Setting it to 1 ensures that messages are written to the server in the order in which they are sent, even if a retry occurs.
2.3 Kafka consumers 2.3.1 consumption patterns
Consumer reads data from broker in pull (pull) mode.
The push mode is difficult to adapt to consumers with different consumption rates, because the message delivery rate is determined by broker. Its goal is to deliver messages as quickly as possible, but it is easy to cause consumer to have no time to process messages, which is typically characterized by denial of service and network congestion. The pull mode can consume messages at an appropriate rate according to the consumption power of consumer.
The drawback of the pull pattern is that if kafka does not have data, consumers may fall into a loop and return empty data all the time. In view of this, consumers of kafka will pass a duration parameter timeout when consuming data. If there is no data to consume, consumer will wait for a period of time before returning.
2.3.2 Partition allocation policy
There are multiple consumer in a consumer group and multiple partition in a topic, so it inevitably involves the allocation of partition, that is, determining which partition is consumed by which consumer. Kafka provides three consumer partition allocation strategies: RangeAssigor, RoundRobinAssignor, and StickyAssignor.
The PartitionAssignor interface is used for user-defined implementation of partition allocation algorithms to achieve partition allocation between Consumer. Members of the consumer group subscribe to the Topic they are interested in and pass this subscription relationship to the Broker as the coordinator of the subscription group. The coordinator selects one of the consumers to perform the partition allocation of the consumption group and forwards the allocation result to all consumers in the consumption group. Kafka defaults to RangeAssignor's allocation algorithm.
2.3.2.1 RangeAssignor
RangeAssignor assigns separate partitions to each Topic. For each Topic, first sort the partitions by partition ID, then sort the consumers who subscribe to the Topic consumer group, and then distribute the partitions to consumers as evenly as possible. It can only be balanced as far as possible, because the number of partitions may not be divisible by the number of consumers, so some consumers will be allocated more partitions. The schematic diagram of the allocation is as follows:
The algorithm for partition allocation is as follows:
Overridepublic Map assign (Map partitionsPerTopic, Map subscriptions) {Map consumersPerTopic = consumersPerTopic (subscriptions); Map assignment = new HashMap (); for (String memberId: subscriptions.keySet ()) assignment.put (memberId, new ArrayList ()) / / the for loop processes multiple topic subscriptions separately. For (Map.Entry topicEntry: consumersPerTopic.entrySet ()) {String topic = topicEntry.getKey (); List consumersForTopic = topicEntry.getValue (); Integer numPartitionsForTopic = partitionsPerTopic.get (topic); if (numPartitionsForTopic = = null) continue; / / sort Collections.sort (consumersForTopic) to consumers / / calculate the average number of partitions per consumer int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size (); / / calculate the number of additional partitions after the average distribution int consumersWithExtraPartition = numPartitionsForTopic% consumersForTopic.size (); List partitions = AbstractPartitionAssignor.partitions (topic, numPartitionsForTopic); for (int I = 0, n = consumersForTopic.size (); I
< n; i++) { //计算第i个消费者,分配分区的起始位置 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); //计算第i个消费者,分配到的分区数量 int length = numPartitionsPerConsumer + (i + 1 >ConsumersWithExtraPartition? 0: 1); assignment.get (consumersForTopic.get (I)) .addAll (partitions.subList (start, start + length));}} return assignment;}
An obvious problem with this allocation method is that with the increase in the number of Topic subscribed by consumers, the imbalance will become more and more serious. For example, in the scenario of four partitions and three consumers in the figure above, C0 will allocate one more partition. If you subscribe to a Topic with 4 partitions at this time, C0 will allocate one more partition than C1 and C2, so that C0 will allocate two more partitions than C1 and C2, and this situation will become more and more serious with the increase of Topic. Allocation result:
Subscribe to 2 Topic, each Topic4 partition, a total of 3 Consumer
C0: [T0P0,T0P1,T1P0,T1P1]
C1: [T0P2,T1P2]
C2: [T0P3,T1P3]
2.3.2.2 RoundRobinAssignor
The allocation strategy of RoundRobinAssignor is to sort all the Topic partitions of subscriptions in the consumption group and all consumers to allocate them as evenly as possible (RangeAssignor is sorted for the partitions of a single Topic). If the consumer subscribes to the same Topic list within the consumer group (each consumer subscribes to the same Topic), the distribution result is as balanced as possible (the difference in the number of partitions allocated between consumers will not exceed 1). If the Topic list of subscriptions is different, the allocation result is not guaranteed to be "as balanced as possible" because some consumers do not participate in the allocation of some Topic.
In the case of the above two topic, the partition allocation can be more balanced than the previous RangeAssignor allocation strategy. However, considering this situation, suppose that three consumers are C0, C1, C2, and there are 3 Topic T0, T1, T2, with 1, 2, and 3 partitions respectively, and C0 subscribes to T0, C1, and T1, C2 subscribes to T0, T1, T2, then the distribution result of RoundRobinAssignor is as follows:
It seems that the distribution has been balanced as far as possible, but it can be found that C2 undertakes the consumption of 4 regions and C1 subscribes to T1, is it more balanced to hand over the T1P1 to C1 consumption?
2.3.2.3 StickyAssignor
The purpose of the StickyAssignor partition allocation algorithm is to adjust the changes of partition allocation as little as possible on the basis of the results of the previous allocation, so as to save the overhead caused by the change of partition allocation. Sticky is "sticky", which can be understood to mean that the allocation result is "sticky"-each allocation change makes the least change compared to the previous allocation. Its objectives are as follows:
The distribution of partitions is as balanced as possible.
The result of each redistribution is consistent with that of the previous allocation as far as possible.
When these two goals conflict, priority is given to ensuring the first goal. The first goal is that every allocation algorithm tries to achieve it, while the second goal really reflects the characteristics of StickyAssignor.
The StickyAssignor algorithm is complex. The following example illustrates the effect of allocation (compared with RoundRobinAssignor). Prerequisites:
There are 4 Topic:T0, T1, T2, T3, and each Topic has 2 partitions.
There are three Consumer:C0, C1, C2, and all Consumer subscribe to these four partitions.
The red arrow above represents the changing partition allocation, as you can see, the StickyAssignor allocation strategy, the change is small.
2.3.3 maintenance of offset
Since failures such as power outages may occur during the consumption of Consumer, after the recovery of Consumer, you need to continue to consume from the location before the failure, so Consumer needs to record where it is consumed in real time so that it can continue to consume after the failure is restored. Prior to the Kafka0.9 version, Consumer saved offset in zookeeper by default, and starting with version 0.9, Consumer saved offset in Kafka in a built-in topic called _ consumeroffsets. It cannot be read by default and can be read by setting exclude.internal.topics=false in consumer.properties.
2.3.4 kafka reads and writes data efficiently (understand)
Write disk sequentially
Kafka producer production data, to be written to the log file, the process of writing is appended to the end of the file, writing sequentially. The data shows that the same disk can be written to 600M/s sequentially, while only 100K/s can be written at random. This is related to the mechanical structure of the disk, and sequential writing is fast because it saves a lot of time for head addressing.
Zero copy technology
The main task of zero copy is to prevent CPU from copying data from one storage to another. The main task is to use a variety of zero copy technologies to avoid letting CPU do a lot of data copy tasks, reduce unnecessary copies, or let other components do this kind of simple data transfer tasks, so that CPU is free to focus on other tasks. In this way, the use of system resources can be made more efficient.
Thank you for your reading, the above is the content of "the principle of Kafka and partition allocation strategy". After the study of this article, I believe you have a deeper understanding of the principle of Kafka and partition allocation strategy, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.