In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "technical analysis of Kafka cluster breaking through millions of partitions". In the operation process of actual cases, many people will encounter such difficulties. Next, let Xiaobian lead you to learn how to deal with these situations! I hope you can read carefully and learn something!
1. Number of ZK nodes
The topic of Kafka is stored and replicated in the broker with partition as the minimum unit, so the cluster needs to maintain the Leader information of each partition, which broker nodes multiple copies of a single partition are stored on, and which copies are in the replication synchronization state. To store this metadata, the kafka cluster creates a node on the zk cluster for each partition, and the number of partitions directly determines the number of nodes on zk.
Assuming that there are 10,000 topics on the cluster, each topic contains 100 partitions, the number of nodes on ZK is about 2 million, and the snapshot size is about 300MB. When ZK node data changes, the data will be written to the transaction log for persistent storage. When the transaction log reaches a certain number of entries, all data will be written to the persistent snapshot file. The expansion of partition nodes means that the snapshot file is also large. Full-write snapshots and transaction log writes interact, affecting client responsiveness and taking longer for the zk node to restart loading snapshots.
2. Partition copy
Kafka's partition replication is responsible for independent replication threads. Multiple partitions share replication threads. When the partition on a single broker increases, the number of partitions responsible for a single replication thread will also increase. Each partition corresponds to a log file. When a large number of partitions are written at the same time, the writing of files on the disk will be more dispersed, and the writing performance will deteriorate. Replication may not keep up, resulting in frequent fluctuations in ISR. Adjusting the number of replication threads reduces the number of partitions a single thread is responsible for, but also increases disk contention.
3. Controller Switching duration
Due to network or machine failure, there may be controller switching in the running cluster. When controller switching, it is necessary to recover broker node information, partition replication relationship of topic, which node partition current leader is on, etc. from ZK, and then synchronize the complete partition information to each broker node.
When tested on a virtual machine, it takes about 37s for metadata of 1 million partitions to be restored from ZK to broker, and about 80MB for metadata generated by 1 million partitions to be serialized (the data size is related to the number of copies and the length of topic name, etc.). After other brokers receive metadata, they deserialize and update it into the local broker memory, and the response time takes about 40s (the test duration is related to the network environment).
Controller controls leader switching and metadata delivery to other broker nodes in the cluster. The recovery time of controller becomes longer, which increases the risk of cluster unavailability. If there is partition Leader to switch when controller switches, there may be clients who cannot get a new leader for a long time, resulting in service interruption.
4. brokerUp-down line recovery time
During daily maintenance, it may be necessary to restart the broker. In order not to affect user use, the broker will notify the controller to switch leaders before stopping. Similarly, leader switching will be performed when the broker fails. The leader switching information needs to update the partition status node data on ZK, and update metadata information for other brokers synchronously. When the number of partitions increases, it means that the partition Leader switching time on a single broker node becomes longer.
Through the above several influencing factors, we know that when the number of partitions increases, it will directly affect the controller failure recovery time; the number of partitions on a single broker will affect the disk performance, the stability of replication; the broker restart Leader switching time increases, etc. Of course, we can limit the number of partitions on each broker under the existing architecture to avoid the impact of the number of partitions on a single broker, but this means that the number of broker nodes in the cluster will increase, and the number of broker nodes in charge of the controller will increase. At the same time, the number of partitions that the controller needs to manage will not decrease. If we want to solve the scenario where a large number of partitions share a cluster, Then the core problem to be solved is either to improve the processing performance of a single controller or to increase the number of controllers.
03 Solutions
1. Single ZK cluster
The following optimizations can be made to improve the processing performance of a single controller:
parallel pull zk node
Controller pulls metadata on zk, although asynchronous waiting for data response, request and response non-serial waiting, but single thread processing consumes about 37s, we can pull metadata through multi-thread parallel, each thread is responsible for a part of partition, thus reducing the time to pull metadata.
A simple simulation on the virtual machine to obtain 1 million node data, a single thread takes about 28s, distributed to 5 threads parallel processing, each thread is responsible for pulling 200 thousand partition data, the total time is reduced to about 14s (This time is affected by the performance of the virtual machine itself. If a single thread pulls 200,000 partitions on the same virtual machine, it only takes about 6s), so when the controller recovers, pulling partitions in parallel can significantly shorten the recovery time.
Change the way metadata is synchronized
As mentioned above, the metadata generated by 1 million partitions is about 80MB. If we limit the number of partitions on a single broker, it means that we need to increase the number of broker nodes. Switching parallel synchronization to a large number of brokers will bring traffic impact to controller nodes. At the same time, synchronizing 80MB metadata will also consume a long time. Therefore, it is necessary to change the current way of synchronizing metadata in clusters. For example, like storing consumption locations, metadata is stored through a built-in topic. The controller sends the data written to ZK to the topic where metadata is stored through messages. Brokers consume these data from the topic and update metadata in memory respectively. Although this kind of scheme can synchronize metadata in full when the controller switches, However, it will require some major adjustments to the current kafka architecture (although there are many other options, such as not using ZK to manage metadata, etc., which are beyond the scope of this article).
Is there any other way to support large-scale partition scenarios with minimal changes to the kafka architecture? We know that when the kafka client interacts with broker, it will first pull topic metadata through the specified address, and then produce and consume according to the corresponding Leader of the metadata connection partition. By controlling metadata, we can control the machines that the client produces and consumes connections. These machines do not necessarily need to be in the same cluster on the client side, but only the client can obtain the status information of these partitions. Therefore, we can distribute different topics to different clusters, and then find a way to combine the topic information on different clusters and return it to the client, so that the client can connect different clusters at the same time. From the perspective of the client, it is a large cluster. In this way, there is no need for a single physical cluster to support a very large scale, and multiple physical clusters can be combined to support a larger scale. In this way, users do not need to stop to modify the service during expansion. Let's describe how to implement this solution.
2. Small clusters form logical clusters
When we need to set up logical clusters, there are several core problems to be solved: 1. When the client needs to pull metadata, how to assemble the metadata on multiple small physical clusters and return it to the client;2. How to notify the change in time when the metadata on different clusters changes;3. How to distribute the topics that store the consumption location and transaction state on multiple clusters.
These questions are explained below:
metadata service
To solve the metadata assembly problem, we can select one of the multiple physical clusters in the logical cluster as the main cluster, and the other clusters as the extension clusters. The main cluster is responsible for providing metadata, consumption location, and transaction-related services to the outside world. Of course, the main cluster can also provide message production and consumption services at the same time. The extension clusters can only be used for the production and consumption of business messages. We know that when the partition Leader switches, the controller in the cluster needs to synchronize the new metadata data to the broker in the cluster. When a logical cluster is composed of multiple independent physical clusters, the controller cannot sense Broker nodes in other clusters.
We can simply modify the metadata interface in the primary cluster. When the client pulls metadata, we can jump to other clusters to pull metadata, and then merge and assemble it on the primary cluster and return it to the client.
Although the way to jump and pull metadata has some performance consumption, it is not on the path of message production and consumption under normal circumstances, and has little impact on the client. By reassembling metadata when the client pulls, the problem of updating metadata across physical clusters can be avoided, and real-time performance can be guaranteed.
Consumption grouping and transaction coordination
When members of consumption groups need to coordinate the partition of data pulled, the server will return the corresponding coordination node according to the partition information that stores the consumption location topic. Therefore, we need to determine the clusters with the distribution of consumption location topics in a logical cluster, so as to avoid problems such as different coordinators returned from nodes visiting different physical clusters and different consumption locations pulled from different clusters. We can choose the broker node of the primary cluster to provide consumption and transaction coordination services, and the consumption location is only stored on the primary cluster.
Through some of the above modifications, we can support a larger business scale, users only need to know the address of the main cluster when using it.
In addition to the core issues mentioned above, we also need to pay attention to topic allocation. Since ckafka of Tencent Cloud itself forwards the request to create topic on broker to the control module, it can easily solve the problem of topic distribution in multiple physical clusters, and can also avoid the problem that topics with the same name may appear in different physical clusters on the same logical cluster.
single physical cluster split
As described above, multiple physical clusters are organized into a single logical cluster. Sometimes, a single physical cluster needs to continuously expand partition on the existing topic due to some reasons. If multiple topics need to be expanded at the same time, a single physical cluster may be too large, so it is necessary to split the existing cluster. A physical cluster is split into two physical clusters.
Cluster splitting involves splitting the ZK cluster and splitting the broker nodes into groups. First, the broker nodes in the cluster are divided into two groups, each group connects different ZK nodes. For example, we can add observer nodes to the original zk cluster, and the newly added brokers are a group. The brokers in the original cluster are a group. We let the new brokers only fill in the address of the observer. Before the ZK cluster splits, different topics can be easily migrated to their respective broker groups through the built-in migration tool of KAFKA. The partition of the same topic will only be distributed on the broker nodes of the same group. Subsequently, the observer node will be removed from the existing ZK cluster, and then the observer and other ZK nodes will form a new ZK cluster, thus realizing the split of the kafka cluster.
The content of "Technical Analysis of Kafka Cluster Breaking through Million Partition" is introduced here. Thank you for reading it. If you want to know more about industry-related knowledge, you can pay attention to the website. Xiaobian will output more high-quality practical articles for everyone!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.