In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about how to analyze the time ring in Kafka. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.
Write at the front
Kafka is a distributed message middleware, and its high availability and high throughput is the preferred message middleware in big data's field. Kafka is the originator of the idea of sequential read-write file segmentation organization and series organization of distributed message queues, including RocketMq, these message queues are transformed from the early architecture and design ideas of Kafka, so in the architecture design level, Kafka has a lot to learn from. PS: the execution process and code are from the Kafka0.10.2 version.
Introduction:
Starting with two interview questions, the first question is, if there are 10w scheduled tasks on a machine, how to trigger efficiently?
The specific scenarios are:
There is an APP real-time message channel system, for each user will maintain an APP to the server TCP connection, used to send and receive messages in real time, for this TCP connection, there is such a requirement: "if there is no request packet (such as login, message, keepalive packet) for 30 consecutive seconds, the server will set the user's status offline."
Among them, the simultaneous online volume of stand-alone TCP is about 10w, the keepalive request packets are scattered about 30s, and the throughput is about 3000qps.
How?
The common scheme uses the time scheduled task to scan the set Map of all connections once a second, and find out the connections whose connection time (the connection time of each new request to update the corresponding connection) is 30s larger than the difference of the current time.
In another scheme, the circular queue method is used:
(figure 1)
Three important data structures:
1) when 30s times out, create a circular queue with index from 0 to 30 (essentially an array)
2) every slot on the ring is a Set, a set of tasks.
3) there is also a Map to record which slot the uid falls in on the ring.
In this way, when a user uid has a request packet arriving:
1) from the Map structure, find out which slot the uid is stored in.
2) delete the uid from the Set structure of the slot
3) add uid back to the new slot, which slot is the last slot pointed to by the Current Index pointer, because this slot will be scanned by timer after 30 seconds
4) Update Map. This uid corresponds to the index value of slot.
Which elements will be dropped during the timeout?
Current Index moves one slot per second, and all uid in the Set corresponding to this slot should be collectively timed out! If a request packet has arrived in the last 30s, it must have been placed in the previous slot of the Current Index. The slot where the Current Index is located corresponds to all the elements in the Set that have not arrived in the last 30s.
So, when there is no timeout, there should be no elements in the Set of every slot that Current Index scans.
Comparison of the two schemes:
Scenario 1 polls all the data every time, while scenario 2 only polls the data that needs to expire at this moment. If there is no data expiration, there is no data to be processed, and it is a batch timeout. And because the ring structure saves more space, this is very suitable for high-performance scenarios.
The second question: what should I do if there are tasks that are delayed for a certain amount of time during the development process?
If we don't repeat the wheels, our choice is of course delay queue or Timer.
The delay queue and the task of adding delay in Timer are implemented by the data structure of the smallest heap represented by the array, and the time complexity of putting in the new element and removing the first element of the queue is O (nlog (n)).
Time wheel
The ring queue used in the second scheme is the underlying data structure of the time wheel, which can centralize the data to be processed (the abstraction of the task), and there are a large number of delayed operations in Kafka, such as delayed production, delayed pull, delayed deletion and so on. Instead of using the Timer or DelayQueue that comes with JDK to implement the delay function, Kafka customizes a timer (SystemTimer) based on the time wheel. The average time complexity of JDK's Timer and DelayQueue insert and delete operations is O (nlog (n)), which can not meet the high performance requirements of Kafka. Based on the time round, the time complexity of both insert and delete operations can be reduced to O (1). The application of time wheel is not unique to Kafka, and there are many application scenarios, such as Netty, Akka, Quartz, Zookeeper and other components.
Data structure of time wheel
Referring to the following figure, the TimingWheel in Kafka is a circular queue that stores scheduled tasks. The underlying queue is implemented in an array, and each element in the array can hold a scheduled task list (TimerTaskList). TimerTaskList is a circular two-way linked list, each item in the linked list represents a timing task item (TimerTaskEntry), which encapsulates the real scheduled task TimerTask. The TimeTaskList is represented by an array named buckets in the Kafka source code, so TimerTaskList may also be called bucket in the following introduction.
(figure 2: the picture is from "Kafka time Wheel (TimingWheel)")
Briefly explain some of the nouns in the picture above:
TickMs: the time wheel consists of multiple time lattices, each of which is tickMs, which represents the basic time span of the current time wheel.
WheelSize: the number of lattices representing each layer of time wheel
Interval: the overall time span of the current time wheel, interval=tickMs × wheelSize
StartMs: construct the current time of the time wheel of the layer. The startMs of the time wheel of the first layer is TimeUnit.NANOSECONDS.toMillis (nanoseconds ()), and the startMs of the upper time wheel is the currentTime of the lower time wheel.
CurrentTime: indicates the current time of the time wheel. CurrentTime is an integral multiple of tickMs (using currentTime=startMs-(startMs% tickMs to keep currentTime must be an integral multiple of tickMs). This operation compares to whether the pointer of 65 seconds in a clock still points to one minute. CurrentTime can divide the entire time wheel into the expiration part and the unexpired part, and the time lattice currently pointed to by currentTime also belongs to the expiration part, which means that it is just due and needs to deal with all the tasks of the TimerTaskList corresponding to this time lattice.
Task storage in the time wheel
If the tickMs=1ms,wheelSize=20 of the time wheel, then the interval can be calculated to be 20ms. In the initial case, the dial pointer currentTime points to timeframe 0, and a task with a timed 2ms is inserted and stored in the TimerTaskList with timeframe 2. With the passage of time, the pointer currentTime continues to move forward. After the 2ms, when the time grid 2 is reached, it is necessary to do the corresponding expiration operation of the task in the TimeTaskList corresponding to time grid 2. At this time, if another task with the timing of 8ms is inserted, it will be stored in timeframe 10, and currentTime will point to timeframe 10 after 8ms. What if a task scheduled for 19ms is plugged in at the same time? The new TimerTaskEntry will reuse the original TimerTaskList, so it will be inserted into timeframe 1 that has already expired. In short, the overall span of the whole time wheel is unchanged, with the continuous advance of the pointer currentTime, the time period that the current time wheel can handle is also moving backward, and the overall time range is between currentTime and currentTime+interval.
The rise and fall of the time wheel
What if there is a task scheduled to 350ms at this time? Directly expand the size of wheelSize? There is no shortage of tens of thousands or even hundreds of thousands of milliseconds of scheduled tasks in Kafka. There is no bottom line for the expansion of this wheelSize. Even if the expiration time of all scheduled tasks is set an upper limit, such as 1 million milliseconds, the time wheel with a wheelSize of 1 million milliseconds not only takes up a lot of memory space, but also reduces efficiency. For this reason, Kafka introduces the concept of hierarchical time wheel. When the expiration time of a task exceeds the time range represented by the current time wheel, it will try to add it to the upper time wheel.
(figure 3: the picture is from "Kafka time Wheel (TimingWheel)")
Refer to the above figure, reuse before the case, the first layer of the time round tickMs=1ms, wheelSize=20, interval=20ms. The tickMs of the time wheel of the second layer is the interval of the time wheel of the first layer, that is, 20ms. The wheelSize of each time wheel is fixed, which is 20, so the overall time span interval of the second layer time wheel is 400ms. By analogy, this 400ms is also the size of the third-tier tickMs, and the overall time span of the third-tier time wheel is 8000ms.
The task of 350ms just mentioned will not be inserted into the first tier time wheel, but will be inserted into the second layer time wheel of interval=20*20. Which bucket of the time wheel will it be inserted into? First use 350/tickMs (20) = virtualId (17), and then virtualId (17)% wheelSize (20) = 17, so 350 will be placed on the 17th bucket. If there is a task performed after 450ms at this time, it will be placed in the third tier time wheel, according to the formula just now, it will be placed in the 0th bucket. The 0th bucket will contain
[400800) the task of ms. With the passage of time, when the time has passed 400ms, then the tasks to be executed after 450ms still have time left for 50ms to be executed. At this time, there is a time round degraded operation to resubmit the 50ms task to the hierarchical time round, then the 50ms task will be put into the second bucket of the second time round according to the formula. The time range of this bucket is [40 ms) ms, and then after 40ms, the 50ms task will be monitored again. At this point, there is still 10ms before the task is executed, and the task of 10ms is also submitted to the hierarchical time wheel, which will be added to the 10th bucket of the first time round, so after 10ms, the task expires and is finally executed.
Is the upgrade and downgrade operation of the entire time wheel very similar to our clock? The first layer time round tickMs=1s, wheelSize=60,interval=1min, this is the seconds; the second layer tickMs=1min,wheelSize=60,interval=1hour, this is the minute; the third layer tickMs=1hour,wheelSize is 12 minutes interval is 12hours, this is the clock. The hands of the clock correspond to the currentTime in the program, which will be mentioned later when analyzing the code (the understanding of this is also the focus and difficulty of the time wheel understanding).
Task adding and driving time wheel scrolling core flowchart
(figure 4)
Introduction of key codes
This is to add a task to SystenTimer
/ / add a task to the Systemtimer. The task is packaged as a TimerTaskEntryprivate def addTimerTaskEntry (timerTaskEntry: TimerTaskEntry): Unit = {/ / first determine whether it can be added to the time wheel. If it cannot be added, it means that the task has expired or the task has been cancelled. Note that the timingWheel here holds a reference to the previous time wheel. So there may be recursive calls to if (! timingWheel.add (timerTaskEntry)) {/ / Already expired or cancelled if (! timerTaskEntry.cancelled) / / expired tasks directly thread pool asynchronously execute taskExecutor.submit (timerTaskEntry.timerTask)}} timingWheel add tasks Add recursively until the task is added to the bucket of the appropriate time round def add (timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs / / Task cancel if (timerTaskEntry.cancelled) {/ / Cancelled false} else if (expiration)
< currentTime + tickMs) { // 任务过期后会被执行 false } else if (expiration < currentTime + interval) {//任务过期时间比当前时间轮时间加周期小说明任务过期时间在本时间轮周期内 val virtualId = expiration / tickMs //找到任务对应本时间轮的bucket val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time //只有本bucket内的任务都过期后才会bucket.setExpiration返回true此时将bucket放入延迟队列 if (bucket.setExpiration(virtualId * tickMs)) { //bucket是一个TimerTaskList,它实现了java.util.concurrent.Delayed接口,里面是一个多任务组 成的链表,图2有说明 queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer //任务的过期时间不在本时间轮周期内说明需要升级时间轮,如果不存在则构造上一层时间轮,继续用 上一层时间轮添加任务 if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) }} 在本层级时间轮里添加上一层时间轮里的过程,注意的是在下一层时间轮的interval为上一层时间轮的tickMs private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } }} 驱动时间轮滚动过程: 注意这里会存在一个递归,一直驱动时间轮的指针滚动直到时间不足于驱动上层的时间轮滚动。 def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) {/ / flatten the current time to an integral multiple of the time wheel tickMs currentTime = timeMs-(timeMs% tickMs) / / Try to advance the clock of the overflow wheel if present / / drive the upper time wheel, where the currentTime time passed to the upper layer is leveled by the same time wheel, but the upper time wheel will continue to level if (overflowWheel! = null) overflowWheel.advanceClock (currentTime)}}
Driver source:
/ / the task list in the loop bucket is re-added to the time wheel one by one to upgrade or downgrade the qualified time wheel or execute the task private [this] val reinsert = (timerTaskEntry: TimerTaskEntry) = > addTimerTaskEntry (timerTaskEntry) / * * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. * / def advanceClock (timeoutMs: Long): Boolean = {var bucket = delayQueue.poll (timeoutMs, TimeUnit.MILLISECONDS) if (bucket! = null) {writeLock.lock () try {while (bucket! = null) {/ / drive time wheel timingWheel.advanceClock (bucket.getExpiration ()) / / cyclic buckek is the task list The task list continues to be added to the time wheel one by one to upgrade or downgrade the time wheel, and find out the expired tasks and execute bucket.flush (reinsert) / / loop / / here, there is a delay time for taking the bucket,bucket out of the delay queue, which represents the expiration of the bucket. Through bucket, we can get the task list contained in bucket bucket = delayQueue.poll ()}} finally {writeLock.unlock ()} true} else {false}}
The delay queue of kafka uses the time wheel implementation, which can support the efficient trigger of a large number of tasks, but we can still see the shadow of delayQueue in the kafka delay queue implementation. Using delayQueue is to put the bucket in the time wheel into the delay queue to promote the time wheel rolling, but based on putting the insert and delete operations into the time wheel, the time complexity of these operations is reduced to O (1) to improve efficiency. Kafka's extreme pursuit of performance allows it to put the most appropriate components in the most appropriate place.
The above is the problem of how to analyze the time ring in Kafka shared by the editor. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.