In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "what is the optimization method of Kafka sequential consumption thread model". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next let the editor to take you to learn "what is the optimization method of Kafka sequential consumption thread model"?
Practice and Optimization of Kafka Sequential consumption Thread Model
The approach of all kinds of message middleware to implement sequential messages is to send a class of sequential messages to the same topic partition, and only need to set the same Key for such messages, while Kafka will ensure that a consumer group can only have one consumer monitoring consumption at any time, so it can be consumed sequentially when consuming, ensuring that the messages in each partition have local ordering. Because of the need to ensure the sequence of partition messages, consumption can not be consumed concurrently, which will have a certain impact on the throughput of consumption. So, how to maximize the spending power of consumers on the premise of ensuring the sequence of messages?
This article will conduct an in-depth analysis of the Kafka consumer pull message process, and then practice and optimize the Kafka consumer sequential consumption thread model.
Analysis of the process of Kafka consumers pulling messages
Before we talk about implementing the Kafka sequential consumption threading model, we need to deeply analyze the message pull mechanism of Kafka consumers. Only when you have an in-depth understanding of the whole process of Kafka consumers pulling messages, can you have a good understanding of this threading model transformation.
Let me first simulate the actual phenomenon of message pulling, here max.poll.records = 500.
1. When messages are not piled up:
It can be found that when the message is not piled up, when the consumer pulls it, if there are less than 500 messages not available in a partition, it will collect 500 messages from other partitions before returning.
2. When multiple partitions are stacked:
When messages accumulate, you can find that messages of the same partition are returned each time, but after continuous debug, consumers do not wait for a partition to finish consumption without accumulation, and then pull the message of the next partition, but continue to pull the message of each partition in a cycle. But this cycle does not mean that partition p0 will pull the message of partition p1 after pulling 500pieces. It is very likely that the message of p0 partition will be pulled later, and in order to understand this phenomenon, I read the relevant source code carefully.
Org.apache.kafka.clients.consumer.KafkaConsumer#poll
Private ConsumerRecords poll (final Timer timer, final boolean includeMetadataInTimeout) {try {/ / poll for new data until the timeout expiresdo {/ / client pull message core logic final Map records = pollForFetches (timer) If (! records.isEmpty ()) {/ / sends the next fetch request before returning the data to prevent the user from blocking the if (fetcher.sendFetches () > 0) next time the user gets the data. | client.hasPendingRequests () {/ / call the ConsumerNetworkClient#poll method to send the FetchRequest. Client.pollNoWakeup ();} return this.interceptors.onConsume (new ConsumerRecords (records));}} while (timer.notExpired ()); return ConsumerRecords.empty ();} finally {release ();}}
We usually give a time when we use Kafka consumer for consumption, such as:
Consumer.poll (Duration.ofMillis (3000))
As can be seen from the above code logic, the purpose of this time given by the user is to wait for messages to gather up enough max.poll.records messages before returning. Even if the number of message bars is not enough for max.poll.records messages, the time will be returned after the waiting time given by the user.
The pollForFetches method is the client pulling the core logic of the message, but not actually pulling it from the broker, but fetching the message from the cache. After pollForFetches pulls the message, fetcher.sendFetches () and client.pollNoWakeup () are called if the message is not zero. What is the use of calling these two methods?
Fetcher.sendFetches () after reading the source code, I know that the purpose of this method is to build a pull request FetchRequest and send it, but the sending here is not really sent, but to store the FetchRequest request object in the unsend cache, and then it will actually be sent when the ConsumerNetworkClient#poll method is called.
Fetcher.sendFetches () filters the current pull partition before building the FetchRequest, and this is the core of determining the rule of pulling messages from multiple partitions, which I'll talk about later.
As can be seen from the source code of the KafkaConsumer#poll method, in fact, Kafka consumers have two threads working in the process of pulling messages. The user's main thread calls the pollForFetches method to obtain the message consumption from the cache. After obtaining the message, it will call the ConsumerNetworkClient#poll method to send the pull request from Broker, and then cache the pulled message locally. Why is the ConsumerNetworkClient#poll method called actively after pulling the message? I think the goal here is to pull messages from the cache immediately the next time you poll.
The pollForFetches method calls the Fetcher#fetchedRecords method to get and parse the message from the cache:
Public Map fetchedRecords () {Map fetched = new HashMap (); int recordsRemaining = maxPollRecords Try {while (recordsRemaining > 0) {/ / if the current PartitionRecords for getting the message is empty, or if it has been pulled / /, you need to retrieve the completedFetch from completedFetches and parse it into PartitionRecords if (nextInLineRecords = = null | | nextInLineRecords.isFetched) {/ / if the data in the previous partition cache has been pulled, directly interrupt the pull of this cycle And return an empty message list / / until cached data is available CompletedFetch completedFetch = completedFetches.peek () If (completedFetch = = null) break;try {/ / CompletedFetch that is, the CompletedFetch in the local cache data of the pull message is parsed into PartitionRecords nextInLineRecords = parseCompletedFetch (completedFetch);} catch (Exception e) {/ /} completedFetches.poll ();} else {/ / get the specified number of messages from the partition cache List records = fetchRecords (nextInLineRecords, recordsRemaining) / /... fetched.put (partition, records); recordsRemaining-= records.size ();} catch (KafkaException e) {/ /...} return fetched;}
CompletedFetches is the message cache pulled. The above code logic revolves around how to obtain messages from the completedFetches cache. You can see from the above code logic:
MaxPollRecords is the maximum number of messages pulled this time. This value can be configured through the parameter max.poll.records. The default is 500. this method takes one CompletedFetch from completedFetches and parses it into a PartitionRecords object that can be pulled, that is, nextInLineRecords in the method. Please note that the number of messages in PartitionRecords may be as large as 500. therefore, it is possible to get 500 messages from PartitionRecords at once and return this time. If the number of messages in PartitionRecords is less than 500, The next partition message to be pulled from the completedFetches cache will be fetched, and recordsRemaining will record how many messages are left unpulled this time, and continue to fetch messages from the completedFetches cache through a loop until recordsRemaining is 0.
The above code can explain why when messages are stacked, there is a high probability that the messages pulled each time will be messages of the same partition, because the messages in the cache CompletedFetch cache will most likely exceed the number of messages pulled each time. The message data pulled by Kafka clients from Broker each time is not determined by max.poll.records. This parameter only determines how many pieces of data users get from the local cache each time. The actual amount of message data pulled from Broker is determined by parameters such as fetch.min.bytes, max.partition.fetch.bytes, fetch.max.bytes, and so on.
Let's think about it again, assuming that messages for a partition have been stacked all the time, will Kafka pull the partition every time until the partition is consumed? (it is assumed that the Kafka consumer will pull the message from this partition every time and store the message in the CompletedFetch cache associated with the partition. According to the above code logic, the nextInLineRecords has not finished pulling, resulting in pulling the message from the partition every time. )
The answer is obviously not, do not believe you open Kafka-manager to observe the progress of consumption in each zone, each zone will have consumers in consumption.
So how do Kafka consumers cyclically pull the partitions it listens to? Let's move on.
Send pull request logic:
Org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches
Public synchronized int sendFetches () {/ / parses the pulled partition Map fetchRequestMap = prepareFetchRequests (); for (Map.Entry entry: fetchRequestMap.entrySet ()) {final Node fetchTarget = entry.getKey (); final FetchSessionHandler.FetchRequestData data = entry.getValue () / / build request object final FetchRequest.Builder request = FetchRequest.Builder .forConsumer (this.maxWaitMs, this.minBytes, data.toSend ()) .isolationLevel (isolationLevel) .setMaxBytes (this.maxBytes) .metadata (data.metadata ()) .toForget (data.toForget ()) / / send the request, but not really Instead, save the request in unsent client.send (fetchTarget, request) .addListener (new RequestFutureListener () {@ Overridepublic void onSuccess (ClientResponse resp) {synchronized (Fetcher.this) {/ /... / create CompletedFetch and cache it in the completedFetches queue completedFetches.add (new CompletedFetch (partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader (). ApiVersion () } / /.});} return fetchRequestMap.size ();}
The logic of the above code is easy to understand. Before sending a pull request, check which partitions can be pulled, and then build a FetchRequest object for each partition. MinBytes and maxBytes in FetchRequest can be set by fetch.min.bytes and fetch.max.bytes parameters, respectively. This is why every message pulled from Broker is not necessarily equal to max.poll.records.
The prepareFetchRequests method calls Fetcher#fetchablePartitions to filter fetchable partitions. Let's take a look at how Kafka consumers filter:
Org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions
Private List fetchablePartitions () {Set exclude = new HashSet (); List fetchable = subscriptions.fetchablePartitions (); if (nextInLineRecords! = null & &! nextInLineRecords.isFetched) {exclude.add (nextInLineRecords.partition);} for (CompletedFetch completedFetch: completedFetches) {exclude.add (completedFetch.partition);} fetchable.removeAll (exclude); return fetchable;}
NextInLineRecords is parsed according to a partition cache CompletedFetch mentioned above. If the cache in the nextInLineRecords has not been pulled, the message will not be pulled from the broker, and if the partition cache exists in the completedFetches cache, the message will not be pulled.
We can draw a clear conclusion:
When there is still message data for a partition in the cache, consumers will not continue to pull requests for the partition until the partition's local cache is consumed.
In order to express this logic more clearly, I will give an example and express the whole process in a diagram:
Suppose a consumer listens to three partitions, each partition pulls 4 messages from the Broker at a time, and the user gets 2 messages from the local cache each time:
This consumption model creates multiple KafkaConsumer objects, and each thread maintains a KafkaConsumer, thus achieving thread isolation consumption. Since each partition can only have one consumer consumption at a time, this consumption model naturally supports sequential consumption.
However, the disadvantage is that the consumption capacity of a single partition cannot be improved. If there are a large number of topic partitions, you can only increase the consumption capacity by adding KafkaConsumer instances. As a result, there are too many threads, resulting in huge Socket connection overhead in the project, which is generally not used in the project.
2. Single KafkaConsumer instance + multiple worker threads
First of all, during initialization, the consumer thread pool is initialized. Specifically, several thread pools of a single thread are created according to the number of threadsNumMax. The thread pool of a single thread is to ensure that the thread pool obtained after each partition is modeled is consumed serially. But it is unreasonable to create a threadsNumMax thread pool here, which I will talk about later.
Com.zto.consumer.KafkaConsumerProxy#submitRecords
The above is the current thread model of ZMS sequential consumption, and the above code logic is shown in a graph:
When the message traffic is large, sequential message consumption is reduced to single-thread consumption.
How to improve the concurrency of Kafka sequential consumption?
After an in-depth understanding of ZMS's consumption thread model and the Kafka consumer pull message process, I thought of the following aspects to optimize ZMS's consumption thread model:
1. Refine the order granularity of messages
The previous practice is to consume each partition with a separate thread, so we can no longer increase the consumption capacity on top of the partition. We know that when the business sends sequential messages, it will send the same type of sequential messages to the same Key to ensure that such messages are sent to the same partition for consumption, thus achieving the purpose of sequential consumption of messages, while the same partition will receive multiple types of messages (that is, different Key). The messages pulled each time are likely to be of different types, so we can assign the messages of the same partition to an independent thread pool, and then use the message Key to take the model and put it into the corresponding thread for consumption, so as to achieve the purpose of concurrent consumption without disrupting the sequence of messages.
2. Refine the displacement submission granularity
Since ZMS manually commits displacement, each pull message must be consumed before it can be submitted. Since partition messages have been consumed by the specified thread pool, and the displacement between partitions is submitted successively, we can hand over the displacement submission to each partition for management, so that the pull main thread does not have to wait until the next round of message pulling is finished.
3. Asynchronous pull and current limit
There is a problem with asynchronous pull, that is, if the consumption of the node can not keep up, and too many pull messages are saved locally, it is likely to cause memory overflow, so we need to limit the flow of message pull and prevent message pull when the amount of local message cache reaches a certain amount.
When analyzing the Kafka consumer pull message flow above, we know that when consumers send pull requests, they first judge whether the partition cache exists in the local cache. If so, no pull requests are sent, but because ZMS needs to be transformed into asynchronous pull form, because Comsumer#poll no longer waits for message consumption before the next round of pull, there is almost no data in the local cache of Kafka. This causes Kafka to send pull requests every time, which is equivalent to storing the local cache of Kafka in ZMS. Therefore, we need to restrict the flow of message pull at the ZMS level. Kafka consumers have two methods to set whether the subscribed partition can send pull requests:
/ / suspend partition consumption (that is, suspend the partition sending pull message request) org.apache.kafka.clients.consumer.KafkaConsumer#pause// resume partition consumption (that is, resume the partition sending pull message request) org.apache.kafka.clients.consumer.KafkaConsumer#resume
The above two methods actually change the status value paused of the consumer's subscription partition. When paused = true, partition consumption is suspended, and when paused = false, partition consumption is resumed. Where is this parameter used? When analyzing the Kafka consumer pull message flow above, we mentioned that before sending a pull request, we will filter the fetchable partitions. One of the conditions is partition paused = false:
Org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable
Private boolean isFetchable () {return! paused & & hasValidPosition ();}
Because KafkaConsumer is non-thread safe, if we are in an asynchronous thread KafkaConsumer related class, we will report the following error:
KafkaConsumer is not safe for multi-threaded access
You only need to make sure that the KafkaConsumer-related methods are called in the KafkaConsumer#poll method thread. You can set a thread-safe context container. For asynchronous threads to operate KafkaConsumer, you only need to put the specific partition into the context container, and the subsequent unified execution is performed by the poll thread.
Therefore, we only need to make good use of this feature to pull and limit the flow. The Comsumer#poll method of the consumer main thread is still to get messages from the cache asynchronously without causing the consumer to be kicked out of the consumer group due to the excessive time between the two poll.
The core of the above optimization and modification is to use the message Key to consume concurrently as much as possible without disrupting the message order. However, if you encounter that the messages in the partition are all the same Key, and under a certain backlog, the above model may not be as good as in the ideal situation. Whether you can set the fetch.max.bytes and max.partition.fetch.bytes parameters smaller, so that each partition has less than 500local buffers, so that each poll message list can contain messages from multiple partitions, but this will lead to an increase in RPC requests, which requires tuning these parameters according to the size of the business message.
In the above thread model, a parameter orderlyConsumePartitionParallelism needs to be added to set the consumption parallelism of a partition. Assuming that a consumption group is assigned to 5 partitions for consumption, each partition starts one thread consumption by default, with a total of 5 * 1 = 5 consumption threads. When orderlyConsumePartitionParallelism = 3, each partition starts 3 threads consumption, with a total of 5 * 3 = 15 consumption threads. When orderlyConsumePartitionParallelism = 1, all messages in the partition are consumed sequentially (serial); when orderlyConsumePartitionParallelism > 1, thread consumption is allocated according to the Key of the partition message, which does not guarantee the sequential consumption of the whole partition, but the sequential consumption of messages with the same Key.
Note that when orderlyConsumePartitionParallelism > 1, the effective utilization of the partition consumption thread depends on the Key of the partition message:
1. If the Key of all messages in the partition is the same, the consumed Key modules are all assigned to the same thread, and the parallelism is reduced to orderlyConsumePartitionParallelism = 1.
2. If the messages of the same Key in the partition are too concentrated, it will cause a batch of messages with the same key to be pulled each time, and the same parallelism will be reduced to orderlyConsumePartitionParallelism = 1.
Comprehensive comparison:
Before optimization, ZMS can ensure the order of the whole partition message. After optimization, concurrent consumption can be carried out according to the message Key without disrupting the order of the same Key message, which effectively improves the consumption throughput of a single partition. Before optimization, there is a high probability that it will degenerate into single-thread consumption at the same time. After optimization, at least one thread per partition can be guaranteed, and when the situation is good, each partition can consume more than one thread.
Through the above scenario analysis, the optimization scheme is not a silver bullet to improve the throughput of sequential consumption, it has great limitations, users can not heavily rely on sequential consumption to achieve business, so as not to affect the requirements of business performance.
At this point, I believe you have a deeper understanding of "what is the optimization method of the Kafka sequential consumption thread model". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.