Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does KAFKA handle deferred tasks

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "how KAFKA handles delayed tasks". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

1. What are the delayed tasks on the kafka server?

First of all, we need to know what tasks in kafka need to be delayed, and how to check them?

Quite simply, the design of kafka is based on interfaces, so we only need to find the top-level interface for deferred tasks, and then look at the implementation classes of that interface to see which deferred tasks are available.

The top-level abstract class interface is: DelayedOperation

Corresponding subcategories:

DelayedHeartbeat: it is used to detect consumer heartbeat timeout.

DelayedProduce: when the producer sets ack=- 1, you need to wait for all copies to confirm that the write is successful.

DelayedFetch: when consuming, the partition has no data and needs to wait for a delay.

DelayedJoin: when you go to add a group of consumers, you need to wait in the JOIN phase.

Second, how is the delay task in kafka realized?

This answer has already been answered in the title, that is, the wheel of time.

So how is the time wheel implemented in kafka?

The time wheel ontology in kafka is a 20-length array, but internally holds a reference to the upper array, and each element in the array is a List that holds all tasks in that time period.

Finally, these List references with tasks are put into the DelayQueue to achieve the flow of time, and each time the expired List is extracted from the DelayQueue for the corresponding operation.

Translate:

Originally, it was too much to put all the delayed tasks into DelayQueue. Because the underlying data structure of DelayQueue is a small top heap, the time complexity of inserting and deleting is O (nlog (n)).

N represents the number of specific tasks, when the n value is very large, the corresponding performance is very poor, which can not meet the requirements of a high-performance middleware. So I thought of a way to reduce the number of n.

That is, the original delayed tasks are encapsulated into a List through the time interval, and the List is stored in the DelayQueue as a basic unit, so that the time complexity of insertion and deletion can be reduced.

From O (nlog (n)) to close to O (1) [why is it approximate O (1) here? You can understand that the time wheel is a hash-like table structure. In addition, the most important thing is to greatly reduce the number of elements n in DelayQueue.

Because there are only 20 List,10 layers in a layer of time, DelayQueue can live in hold for such a small number of elements.

To sum up:

In fact, the design idea of time wheel is the idea of batch processing, which encapsulates a batch of tasks into a List according to the time interval, and finally puts the List into the DelayQueue to achieve the effect of rotation.

There are mainly two optimization points, one is that the time complexity of insertion / deletion is reduced from O (nlog (n)) to approximately O (1), and the second is that the number of DelayQueue elements is greatly reduced.

To understand the design idea, let's look at the principle of implementation:

1. Core function: add Task to the time wheel

There are three steps:

Return false if the task has expired

If the task is within its own time span, calculate which bucket (in which time interval) it should be put in; if the bucket is not in the DelayQueue, add it to the DelayQueue.

If the timeout of the task exceeds its own time span, pass it to the upper level until you find a time wheel that meets the time span.

Def add (timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) {/ / cancelled / / Cancelled false} else if (expiration

< currentTime + tickMs) { // 已经过期 // Already expired false } else if (expiration < currentTime + interval) { // 在有效期内 // Put in its own bucket val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)// Set the bucket expiration time // 设置超时时间,如果该桶已经设置了超时时间则说明已经存在于DelayQueue中了 // 如果不存在超时时间,则需要将当前桶加入DelayQueue中 if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) }true } else { // 超过了当前层时间轮的时间跨度 需要向上层时间轮传递,如果上层不存在则新建 // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry) }} 2、时间轮如何推进? 每一个DelayedOperationPurgatory,都有一个线程expirationReaper,去负责推进时间轮,如果当前没有task到期就挂起200ms等待。 如果有task到期,就取出对应的桶,然后将桶中的数据全都执行reinsert,也就是从最底层的时间轮重新执行一遍add操作。 /** * A background reaper to expire delayed operations that have timed out */private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { override def doWork() { advanceClock(200L) }}def advanceClock(timeoutMs: Long): Boolean = { // 从延时队列中取出到期的桶 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) {writeLock.lock()try { // 一次性把到期的全部取出来 while (bucket != null) { // 时间轮的时间推进 timingWheel.advanceClock(bucket.getExpiration())// 把桶中的所有数据都拿去执行reinsert函数 // 本质就是去执行addTimerTaskEntry(timerTaskEntry) bucket.flush(reinsert)bucket = delayQueue.poll() } } finally { writeLock.unlock() }true } else {false }} 3、到期的任务如何执行? 其实就是接着上面的源码,当任务到期之后,reinsert函数会返回false,代表已经超期/被取消了,每个DelayedOperationPurgatory又有一个单线程的taskExecutor, 超期的任务就提交到线程池中去执行即可。 private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) =>

AddTimerTaskEntry (timerTaskEntry) private def addTimerTaskEntry (timerTaskEntry: TimerTaskEntry): Unit = {/ / if the time round adds and returns false, it means that the expiration / has been cancelled and submitted directly to your own single-thread thread pool to execute the task if (! timingWheel.add (timerTaskEntry)) {/ / Already expired or cancelled if (! timerTaskEntry.cancelled) taskExecutor.submit (timerTaskEntry.timerTask)}}.

4. The operation diagram of the whole process

To summarize the whole process, the business code wants TimingWheel to execute add and submit tasks.

TimingWheel finds the appropriate time round and inserts the corresponding bucket into the corresponding bucket and puts the bucket into the DelayQueue

There is a harvesting thread in the DelayedOperationPurgatory component to constantly poll the expired task from the DelayQueue.

Finally, task re-executes the reinsert, and if it expires, it is submitted to the taskExecutor to execute the corresponding business handler logic.

Third, compared with the time wheel, why not use DelayQueue to achieve delayed tasks?

This answer is actually given in the second section. Here is a summary:

1. The underlying data structure of DelayQueue is a small top heap, and the time complexity of insertion and deletion is O (nlog (n)). Therefore, in the face of a large number of delay operations, this structure can not meet the high performance requirements of kafka.

2. The time round adopts the idea of batch processing to encapsulate the task according to the interval to form a kind of structure similar to hash table, which reduces the time complexity of insert / delete to O (1) and greatly reduces the number of DelayQueue elements.

In addition, each delay scenario in kafka creates a separate time wheel, and only one type of delay task is stored in each time round, because different Task have different logic to perform when overdue / completion.

It needs to be carried out one by one. Take Chestnut, the heartbeat delay scene has its own heartbeatPurgatory, production delay has its own delayedProducePurgatory, and so on.

IV. Case study of delay-- maintenance of consumers' heartbeat

1. Processing of HEARTBEAT request

From the source code, we can know that the maintenance of heartbeat and session timeout, the implementation of kafka is very ingenious.

Normally, the heartbeat is 3 seconds, and the session timeout is 10 seconds.

Immediately after receiving the HEARTBEAT request, kafka creates a DelayedHeartbeat deferred task with the corresponding session.timeout value of 10s.

If the HEARTBEAT request corresponding to consumer is received within 10 seconds, the delayed task submitted last time will be completed.

If the HEARTBEAT request for the corresponding consumer is not received within 10 seconds, there is something wrong with the task consumer, and the corresponding expiration logic is executed.

Private def completeAndScheduleNextHeartbeatExpiration (group: GroupMetadata, member: MemberMetadata) {/ / complete current heartbeat expectation member.latestHeartbeat = time.milliseconds () val memberKey = MemberKey (member.groupId, member.memberId) / / complete the last delay task heartbeatPurgatory.checkAndComplete (memberKey) / / reschedule the next heartbeat expiration deadline / / the server can get the session.timeout, and then generate a delay task based on this time, / / for example, 30s, if no heartbeat request is received for such a long time If you think there is something wrong with the consumer, you will kick it out and implement rebalance. Val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat (this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) heartbeatPurgatory.tryCompleteElseWatch (delayedHeartbeat, Seq (memberKey))} private [group] class DelayedHeartbeat (coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long SessionTimeout: Long) extends DelayedOperation (sessionTimeout, Some (group.lock)) {override def tryComplete (): Boolean = coordinator.tryCompleteHeartbeat (group, member, heartbeatDeadline, forceComplete _) override def onExpiration () = coordinator.onExpireHeartbeat (group, member, heartbeatDeadline) override def onComplete () = coordinator.onCompleteHeartbeat ()}

2. Logic after DelayedHeartbeat task expires

This piece is also very simple, which is to execute the coordinator.onExpireHeartbeat function.

The specific logic is to print an identification log: Member xxx has failed, why should this log be talked about separately? Because this is the core log when we troubleshoot consumer problems.

When we are watching server.log, if we find that a consumer in a consumer group appears this log, then we can be sure that the consumer in this consumer group has been excluded because of the session timeout.

So we can continue to analyze the reason why the consumer dropped the line because the consumer process failed? Or is it that the load on the client machine is too high and the heartbeat thread is a daemon thread with a low priority and cannot get the CPU resources?

Wait, a series of location clues. This log is mainly used to locate consumer problems, as well as consumer group rebalance reasons, is a very important identification log!

After talking about the log, we can see that the next step is to turn on rebalance, because the number of consumers has changed, and we need to re-allocate partitions, which has failed over.

Def onExpireHeartbeat (group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {group.inLock {if (! shouldKeepMemberAlive (member, heartbeatDeadline)) {/ / identity log info (s "Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup (group, member)}} private def removeMemberAndUpdateGroup (group: GroupMetadata) Member: MemberMetadata) {group.remove (member.memberId) group.currentState match {case Dead | Empty = > case Stable | CompletingRebalance = > maybePrepareRebalance (group) case PreparingRebalance = > joinPurgatory.checkAndComplete (GroupKey (group.groupId))}} "how KAFKA handles deferred tasks" ends here. Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report