In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
Kafka time round TimingWheel example analysis, I believe that many inexperienced people do not know what to do, so this paper summarizes the causes of the problem and solutions, through this article I hope you can solve this problem.
There are a large number of delayed operations in Kafka, such as delayed production, delayed pull, delayed deletion and so on. Instead of using the Timer or DelayQueue that comes with JDK to implement the delay function, Kafka customizes a timer (SystemTimer) based on the time wheel. The average time complexity of JDK's Timer and DelayQueue insert and delete operations is O (nlog (n)), which can not meet the high performance requirements of Kafka. Based on the time round, the time complexity of both insert and delete operations can be reduced to O (1). The application of time wheel is not unique to Kafka, and there are many application scenarios, such as Netty, Akka, Quartz, Zookeeper and other components.
Referring to the following figure, the TimingWheel in Kafka is a circular queue that stores scheduled tasks. The underlying queue is implemented in an array, and each element in the array can hold a scheduled task list (TimerTaskList). TimerTaskList is a circular two-way linked list, each item in the linked list represents a timing task item (TimerTaskEntry), which encapsulates the real scheduled task TimerTask.
The time wheel consists of multiple time lattices, each of which represents the basic time span (tickMs) of the current time wheel. The number of time cells of the time wheel is fixed, which can be expressed by wheelSize, so the total time span (interval) of the whole time wheel can be calculated by the formula tickMs × wheelSize. The time wheel also has a dial pointer (currentTime), which is used to indicate the current time of the time wheel, where currentTime is an integral multiple of tickMs. CurrentTime can divide the entire time wheel into the expiration part and the unexpired part, and the time lattice currently pointed to by currentTime also belongs to the expiration part, which means that it is just due and needs to deal with all the tasks of the TimerTaskList corresponding to this time lattice.
If the tickMs=1ms,wheelSize=20 of the time wheel, then the interval can be calculated to be 20ms. In the initial case, the dial pointer currentTime points to timeframe 0, and a task with a timed 2ms is inserted and stored in the TimerTaskList with timeframe 2. With the passage of time, the pointer currentTime continues to move forward. After the 2ms, when the time grid 2 is reached, it is necessary to do the corresponding expiration operation of the task in the TimeTaskList corresponding to time grid 2.
At this time, if another task with the timing of 8ms is inserted, it will be stored in timeframe 10, and currentTime will point to timeframe 10 after 8ms. What if a task scheduled for 19ms is plugged in at the same time? The new TimerTaskEntry will reuse the original TimerTaskList, so it will be inserted into timeframe 1 that has already expired. In short, the overall span of the whole time wheel is unchanged, with the continuous advance of the pointer currentTime, the time period that the current time wheel can handle is also moving backward, and the overall time range is between currentTime and currentTime+interval.
What if there is a task scheduled to 350ms at this time? Directly expand the size of wheelSize? there is no lack of tens of thousands or even hundreds of thousands of milliseconds of scheduled tasks in Kafka. There is no bottom line for the expansion of wheelSize. Even if the expiration time of all scheduled tasks is set an upper limit, such as 1 million milliseconds, then the time wheel with a wheelSize of 1 million milliseconds not only takes up a lot of memory space, but also reduces efficiency. Kafka introduces the concept of hierarchical time wheel for this purpose. When the expiration time of a task exceeds the time range represented by the current time wheel, it will try to add it to the upper time wheel.
Refer to the above figure, the case before reuse, the time round of * layer tickMs=1ms, wheelSize=20, interval=20ms. The tickMs of the second layer time wheel is the interval of the * * layer time wheel, that is, 20ms. The wheelSize of each time wheel is fixed, which is 20, so the overall time span interval of the second layer time wheel is 400ms. By analogy, this 400ms is also the size of the third-tier tickMs, and the overall time span of the third-tier time wheel is 8000ms.
For the 350ms scheduled tasks mentioned earlier, it is obvious that the * layer time wheel can not meet the conditions, so it is upgraded to the second layer time wheel, and finally inserted into the TimerTaskList corresponding to the time grid 17 in the second layer time wheel. If there is another task scheduled to 450ms at this time, it is obvious that the second tier time wheel cannot meet the conditions, so it is upgraded to the third tier time wheel and finally inserted into the TimerTaskList of time grid 1 in the third tier time wheel. Notice that multiple tasks (such as the scheduled tasks of 446ms, 455ms and 473ms) in the expiration time range of [400ms800ms) are put into the time grid 1 of the third layer time wheel, and the TimerTaskList timeout corresponding to time grid 1 is 400ms.
With the passage of time, when the TimerTaskList expires, the task originally scheduled for 450ms still has time for 50ms, and the expiration operation of this task cannot be performed. Here, there is a time round degraded operation, which resubmits the scheduled task with the remaining time of 50ms to the hierarchical time wheel. At this time, the overall time span of the * * layer time wheel is not enough, while the second layer is sufficient, so the task is placed in the time grid with the expiration time of the second layer time round [40ms, 60ms). After going through the 40ms, the task is "detected" again, but there is still 10ms left, and the expiration operation cannot be performed immediately. Therefore, there will be another degradation of the time round. This task is added to the time grid where the expiration time of the time round of the * * layer is [10ms, 11ms). After going through the 10ms, the task really expires, and finally performs the corresponding expiration operation.
Design originates from life. Our common clock is a three-tier structure of the time wheel, * layer time wheel tickMs=1ms, wheelSize=60,interval=1min, this is seconds; the second layer tickMs=1min,wheelSize=60,interval=1hour, this is minutes; the third layer tickMs=1hour,wheelSize is 12 12hours interval, this is the clock.
In Kafka, the parameters of the * * layer time wheel are the same as in the above case: tickMs=1ms, wheelSize=20, interval=20ms, and the wheelSize of each level is also fixed at 20, so the tickMs and interval of each level can also be calculated accordingly. Kafka also has some small details when implementing the time round TimingWheel:
TimingWheel is created with the current system time as the start time of the * layer time wheel (startMs). The current system time here does not simply call System.currentTimeMillis (), but calls Time.SYSTEM.hiResClockMs. This is because the time accuracy of the currentTimeMillis () method depends on the specific implementation of the operating system, and some operating systems cannot achieve millisecond precision. In fact, Time.SYSTEM.hiResClockMs uses System.nanoTime () / 1000000 to adjust the accuracy to millisecond. There are other coquettish operations that can achieve millisecond precision, but the author does not recommend that System.nanoTime () / 1000 is the most effective method. If you have any thoughts on this, you can discuss it in the message area.)
Each two-way circular linked list TimerTaskList in TimingWheel has a sentinel node (sentinel). The introduction of sentinel node can simplify the boundary conditions. Sentinel node, also known as dumb node (dummy node), is an additional linked list node. As * * nodes, it does not store anything in its value range, but is introduced for convenience. If a linked list has a sentinel node, then the elements of the linear list should be the second node of the linked list.
Except for the * layer time wheel, the start time (startMs) of the other high-level time rings is set to the currentTime of the front * round when creating this layer time wheel. The currentTime of each layer must be an integer multiple of tickMs, and if it is not satisfied, the currentTime will be trimmed to an integral multiple of tickMs to correspond to the expiration time range of the time lattice in the time wheel. The pruning method is: currentTime = startMs-(startMs% tickMs). CurrentTime will be recommended over time, but it will not change to an integer multiple of tickMs. If the time of a certain time is timeMs, then the currentTime of the time wheel is currentTime = timeMs-(timeMs% tickMs), and the currentTime of each level of the time wheel will advance according to this formula every time the time advances.
The timer in Kafka only needs to hold the reference of the first layer time wheel of TimingWheel, and will not directly hold other high-level time wheel, but each layer time wheel will have a reference (overflowWheel) pointing to a higher-level application. By calling at this level, the timer can indirectly hold the reference of each level time wheel.
The details of the time wheel are described here, and the implementation of the time wheel is more or less the same in each component. When you read this, will readers be curious about a situation that has been described in the article-"with the passage of time" or "with the passage of time"? so how do you advance time in Kafka? Similar to using scheduleAtFixedRate in JDK to push the time wheel per second? Obviously this doesn't make sense, and TimingWheel has lost most of its meaning.
The timer in Kafka uses DelayQueue in JDK to help advance the time wheel. The specific approach is that each TimerTaskList used is added to the DelayQueue, and "each TimerTaskList used" specifically refers to the TimerTaskList with the scheduled task item TimerTaskEntry of the non-sentinel node. The DelayQueue will be sorted according to the timeout expiration corresponding to the TimerTaskList, and the TimerTaskList of the shortest expiration will be at the head of the DelayQueue queue. There is a thread in Kafka to get the list of expired tasks in DelayQueue. Interestingly, the corresponding name of this thread is "ExpiredOperationReaper", which can be literally translated as "expired harvester", which is comparable to "SkimpyOffsetMap". When the "harvester" thread gets the timed-out task list TimerTaskList in DelayQueue, it can either advance the time of the time wheel according to the expiration of the TimerTaskList, or perform the corresponding operation on the acquired TimerTaskList. The opposite TimerTaskEntry performs the expiration operation if it should perform the expiration operation, and the degraded time wheel degrades the time wheel.
Readers may be very confused when they read this. The DelayQueue clearly stated at the beginning of the article is not suitable for high-performance scheduled tasks such as Kafka, so why introduce DelayQueue here? Note that for scheduled task item TimerTaskEntry insert and delete operations, the time complexity of TimingWheel is O (1), and the performance is much higher than that of DelayQueue. If you insert TimerTaskEntry directly into DelayQueue, then the performance is obviously difficult to support. Even if we divide several TimerTaskEntry into the TimerTaskList group according to certain rules, and then insert the TimerTaskList into the DelayQueue, imagine what if we want to add another TimerTaskEntry to this TimerTaskList? For DelayQueue, this kind of operation has obviously become inadequate.
From the analysis here, it can be found that the TimingWheel in Kafka is used specifically to insert and delete TimerTaskEntry, while DelayQueue is responsible for time-advancing tasks. Imagine again that the expiration of the * * timeout task list in DelayQueue is 200ms, and the second timeout task is 840ms. Here, it only takes O (1) time complexity to get the queue head of DelayQueue. If timing push per second is used, 199 of the 200 propulsions performed when getting * timeout task list are "empty propulsion", while 639 "air propulsion" is required when obtaining the second timeout task, which will consume the machine's performance resources for no reason. Here, DelayQueue is used to assist in exchanging a small amount of space for time, thus achieving "precise propulsion". The timer in Kafka is really "knowing people to make good use of", using TimingWheel to do the best tasks to add and delete operations, while using DelayQueue to do the best time promotion work, complement each other.
After reading the above, have you mastered the method of example analysis of the time round TimingWheel in Kafka? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.