Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does the timing thread pool achieve delayed execution and periodic execution

2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains "how the timing thread pool realizes delayed execution and periodic execution". The explanation in the article is simple and clear and easy to learn and understand. let's go deep into the editor's train of thought. Let's study and learn how the timing thread pool realizes delayed execution and periodic execution.

1 introduction

ScheduledThreadPoolExecutor, or timed thread pool, is used to perform deferred or periodic tasks. Compared with Timer's single thread, the timed thread pool is more robust without shutting down the entire thread pool when a task throws an exception. (it should be mentioned that ScheduledThreadPoolExecutor, like ThreadPoolExecutor, will be discarded if an exception is thrown during task execution. Therefore, the exception needs to be caught during the execution of the task, and compensation measures need to be taken if necessary.

The incoming task is wrapped as ScheduledFutureTask, which inherits from FutureTask, provides the ability to execute asynchronously, and can return the execution result. At the same time, the Delayed interface is implemented, and the delay time can be obtained by getDelay method.

Compared to the queue used in ThreadPoolExecutor,ScheduledThreadPoolExecutor is DelayedWorkQueue, it is an unbounded queue. So in a timed thread pool, the maximum number of threads is meaningless (the maximum number of threads is fixed to the maximum value of int and is not used as a parameter for the timed thread pool). In ThreadPoolExecutor, core threads are created directly to execute tasks if the current number of threads is less than the number of core threads, and tasks are placed in the blocking queue if the number of core threads is greater than or equal to; this is not the case in ScheduledThreadPoolExecutor. When you come up in ScheduledThreadPoolExecutor, you put the task in a delayed queue and then wait for it to be executed.

1.1 small top reactor

The implementation of DelayedWorkQueue is somewhat special and is built on a small top heap (similar to DelayQueue and PriorityQueue). Because you want to ensure that each task you get from the delay queue is the closest to now, it is perfectly appropriate to use a small top heap structure to build (the heap structure is also often used to solve the problems of the first N small and the first N big). The small top heap ensures that the value of each node is not less than the value of its parent node, and not greater than the value of its child node, while there is no restriction for sibling nodes. In this way, the minimum point in the small top pile is always guaranteed to be at the root node. If you use an array to build a small top heap, the point with the lowest value is at the first position in the array.

The red number in the figure represents the index position of the node in the array, from which we can see that another nature of the heap is: assuming that the index of the current node is k, then the index of the parent node is: (kmur1) / 2; the index of the left child node is: Kwon 2 + 1; and the index of the right child node is k, the index of the right child node is k.

The two core methods for building the heap are the uptracking process when the siftUp and siftDown,siftUp methods are used to add nodes, and the siftDown method is used for the downtracking process when the node is deleted. The specific implementation source code will be analyzed below, which can be understood by drawing here (the following will only analyze the classic implementation of adding and deleting nodes from the small top heap, but the implementation in the source code is slightly different, but the core is all the same):

1.1.1 add Node

If in the above siftUp process, it is found that the value of the current node is already greater than that of the parent node, the siftUp process will be terminated prematurely. At the same time, it can be seen that in the above siftUp and the siftDown operation to be discussed below, only the values of the current node and its parent and child nodes are compared and exchanged each time, rather than the whole heap is changed, which reduces the time complexity.

1.1.2 deleting a Node

There are three cases of deleting a node. First, let's take a look at the deletion of the root node:

Then there is the case of deleting the last node. Deleting the last node is the easiest and only needs to be deleted, because this does not affect the structure of the small top heap and does not need to be adjusted. It is no longer shown here (note: deleting leaf nodes other than the last node is not the current situation, but the third case below. In other words, deleting these leaf nodes cannot simply delete them, because the heap structure must first be guaranteed to be a complete binary tree.

Finally, delete the situation that is neither the root node nor the last node:

When the deletion is neither the root node nor the last node, you can see that a siftDown is executed and a siftUp is accompanied. However, this siftUp process is not necessarily triggered, and the siftUp operation will be triggered only if the value of the last node is smaller than that of the parent node of the node to be deleted. (this is a good reasoning: in the small top heap, if the value of the last node is smaller than the value of the parent node of the node to be deleted. Then to delete the left and right child node values of the node must be greater than the last node value (regardless of the equality of values), then the siftDown operation will not occur at this time If a siftDown occurs, the value of the last node is at least larger than one of the left and right child nodes (if any) from which the node is to be deleted. The grandchild node value is definitely greater than the grandparent node value (regardless of the equality of values), so when the siftDown operation occurs, the last node value is larger than the parent node value of the node to be deleted. At this time, after the siftDown exchange between the grandchild node and the last node, it still meets the nature of small top heap, so there is no need for additional siftUp operation. In another case, if the value of the last node is between the value of the parent node of the node to be deleted and the value of the left and right child node of the node to be deleted, then neither siftDown nor siftUp will occur at this time.

The biggest difference between the implementation in the source code and the classic implementation above is that there is no operation for nodes to exchange with each other. In the classic implementation of siftUp and siftDown, if a node needs to be changed, there will be an exchange operation between parent and child nodes (including the exchange operation between the first node and the last node when deleting a node). If you think about it carefully, you will find that this is actually superfluous. When the nodes need to be exchanged, only the parent node of the siftUp operation or the child node of the siftDown operation is moved back to the current node location that needs to be compared, while the comparison node does not need to be moved to their position. Go directly to the next judgment and repeat the siftUp or siftDown process until the insertion position of the comparison node is finally found. The advantage of this is that half of the node assignment operations can be saved and the execution efficiency can be improved. At the same time, this means that the node to be compared needs to be saved as a parameter, which is also implemented in the source code.

1.2 Leader-Follower mode

Leader-Follower mode is used in ScheduledThreadPoolExecutor. This is a design idea, let's say there are a lot of tasks waiting to be executed (usually sorted in a queue), and only one of all worker threads will be leader thread, and the other threads will be follower thread. Only the leader thread can execute the task, while the remaining follower threads will not execute the task, and they will be in a dormant state. When the leader thread gets the task and executes the task, it becomes a follower thread and selects a new leader thread before executing the task. If there is a next task at this time, it is the new leader thread that executes and repeats the process. When the previous thread that executes the task finishes execution and comes back, it will judge that if there is no task at this time, or if there is a task but another thread as a leader thread, then it will sleep; if there is a task but no leader thread, then it will become a leader thread to execute the task again.

Unlike ThreadPoolExecutor, where tasks need to be executed immediately, tasks in ScheduledThreadPoolExecutor are delayed, and fetching tasks are also delayed. Therefore, it is not necessary for all threads to be in a running state to delay waiting for the acquisition task. If you do this, in the end, only one thread will be able to perform the current task, and the other threads will be dormant again (this is only talking about single-task and multi-threading, but the same is true for multi-tasking, which in summary is that Leader-Follower mode only wakes up threads that really need "officers"). This is unnecessary and a waste of resources. So the advantage of using Leader-Follower mode is that it avoids unnecessary wake-up and blocking operations, which is more efficient and saves resources.

2 Constructor 1 / * * 2 * ScheduledThreadPoolExecutor: 3 * / 4 public ScheduledThreadPoolExecutor (int corePoolSize) {5 super (corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 6 new DelayedWorkQueue ()) 7} 8 9 / * 10 * ThreadPoolExecutor:11 * / 12 public ThreadPoolExecutor (int corePoolSize,13 int maximumPoolSize,14 long keepAliveTime,15 TimeUnit unit,16 BlockingQueue workQueue) {17 this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,18 Executors.defaultThreadFactory (), defaultHandler); 19}

As you can see: the constructor of ScheduledThreadPoolExecutor is implemented by calling the constructor of the parent class ThreadPoolExecutor, and the constructor of the parent class and all the parameters in it I mentioned in the previous analysis of the ThreadPoolExecutor source article, so I won't repeat them here.

3 schedule method

Inside both the execute method and the submit method are the calling schedule methods, so take a look at their implementation:

1 / * * 2 * ScheduledThreadPoolExecutor: 3 * / 4 public ScheduledFuture schedule (Runnable command, 5 long delay, 6 TimeUnit unit) {7 / / non-empty check 8 if (command = = null | | unit = = null) 9 throw new NullPointerException () 10 / / Packaging task 11 RunnableScheduledFuture t = decorateTask (command,12 new ScheduledFutureTask (command, null,13 triggerTime (delay, unit); 14 / / delayed execution 15 delayedExecute (t); 16 return t 17} 1819 / * * 20 * line 13: 21 * trigger time for delayed operation 22 * / 23 private long triggerTime (long delay, TimeUnit unit) {24 / / delay non-negative processing 25 return triggerTime (unit.toNanos ((delay)

< 0) ? 0 : delay));26 }2728 long triggerTime(long delay) {29 /*30 now方法内部就一句话:"System.nanoTime();",也就是获取当前时间。这里也就是获取31 当前时间加上延迟时间后的结果。如果延迟时间超过了上限,会在overflowFree方法中处理32 */33 return now() +34 ((delay < (Long.MAX_VALUE >

> 1)? Delay: overflowFree (delay)); 35} 3637 private long overflowFree (long delay) {38 / / get the head node (not removed) 39 Delayed head = (Delayed) super.getQueue () .peek (); 40 if (head! = null) {41 / / remaining delay time for getting the head of the team 42 long headDelay = head.getDelay (NANOSECONDS) 43 / * 44 can walk into this method, which shows that delay is a number close to the maximum value of long. At this point, it is judged that if headDelay is less than 045, it means that the delay time has expired but has not been executed, and the difference between delay and headDelay is less than 0, indicating that the difference between headDelay46 and delay has exceeded the range of long 47 * / 48 if (headDelay).

< 0 && (delay - headDelay < 0))49 //此时更新一下delay的值,确保其和headDelay的差值在long的范围内,同时delay也会重新变成一个正数50 delay = Long.MAX_VALUE + headDelay;51 }52 return delay;53 }5455 /**56 * 第39行代码处:57 * 调用DelayedWorkQueue中覆写的peek方法来获取队头节点58 */59 public RunnableScheduledFuture peek() {60 final ReentrantLock lock = this.lock;61 lock.lock();62 try {63 return queue[0];64 } finally {65 lock.unlock();66 }67 }6869 /**70 * 第42行代码处:71 * 可以看到本方法就是获取延迟时间和当前时间的差值72 */73 public long getDelay(TimeUnit unit) {74 return unit.convert(time - now(), NANOSECONDS);75 }4 包装任务 上面第11行和第12行代码处会进行任务的包装: 1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 ScheduledFutureTask(Runnable r, V result, long ns) { 5 //调用父类FutureTask的构造器 6 super(r, result); 7 //这里会将延迟时间赋值给this.time 8 this.time = ns; 9 //period用来表示任务的类型,为0表示延迟任务,否则表示周期性任务10 this.period = 0;11 //这里会给每一个任务赋值一个唯一的序列号。当延迟时间相同时,会以该序列号来进行判断。序列号小的会出队12 this.sequenceNumber = sequencer.getAndIncrement();13 }1415 /**16 * schedule方法第11行代码处:17 * 包装任务,这里只是返回task而已,子类可以覆写本方法中的逻辑18 */19 protected RunnableScheduledFuture decorateTask(20 Runnable runnable, RunnableScheduledFuture task) {21 return task;22 }5 delayedExecute方法 在schedule方法的第15行代码处会执行延迟任务,添加任务和补充工作线程: 1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 private void delayedExecute(RunnableScheduledFuture task) { 5 if (isShutdown()) 6 /* 7 这里会调用父类ThreadPoolExecutor的isShutdown方法来判断当前线程池是否处于关闭或正在关闭的状态, 8 如果是的话就执行具体的拒绝策略 9 */ 10 reject(task); 11 else { 12 //否则就往延迟队列中添加当前任务 13 super.getQueue().add(task); 14 /* 15 添加后继续判断当前线程池是否处于关闭或正在关闭的状态,如果是的话就判断此时是否还能继续执行任务, 16 如果不能的话就删除上面添加的任务 17 */ 18 if (isShutdown() && 19 !canRunInCurrentRunState(task.isPeriodic()) && 20 remove(task)) 21 //同时会取消此任务的执行 22 task.cancel(false); 23 else 24 //否则,说明线程池是可以继续执行任务的,就去判断此时是否需要补充工作线程 25 ensurePrestart(); 26 } 27 } 28 29 /** 30 * 第19行代码处: 31 * 传进来的periodic表示任务是否是周期性任务,如果是的话就是true(通过"period != 0"进行判断) 32 */ 33 boolean canRunInCurrentRunState(boolean periodic) { 34 return isRunningOrShutdown(periodic ? 35 //关闭线程池时判断是否需要继续执行周期性任务 36 continueExistingPeriodicTasksAfterShutdown : 37 //关闭线程池时判断是否需要继续执行延迟任务 38 executeExistingDelayedTasksAfterShutdown); 39 } 40 41 /** 42 * ThreadPoolExecutor: 43 */ 44 final boolean isRunningOrShutdown(boolean shutdownOK) { 45 //获取当前线程池的运行状态 46 int rs = runStateOf(ctl.get()); 47 //如果是RUNNING状态的,或者是SHUTDOWN状态并且是能继续执行任务的,就返回true 48 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 49 } 50 51 /** 52 * ScheduledThreadPoolExecutor: 53 * 上面第20行代码处的remove方法会调用ThreadPoolExecutor的remove方法,而该方法我在之前的 54 * ThreadPoolExecutor的源码分析文章中已经分析过了。但是其中会调用延迟队列覆写的remove逻辑, 55 * 也就是本方法(同时第130行代码处也会调用到这里) 56 */ 57 public boolean remove(Object x) { 58 final ReentrantLock lock = this.lock; 59 //加锁 60 lock.lock(); 61 try { 62 //获取当前节点的堆索引位 63 int i = indexOf(x); 64 if (i < 0) 65 //如果找不到的话,就直接返回false 66 return false; 67 68 //将当前节点的索引位设置为-1,因为下面要进行删除了 69 setIndex(queue[i], -1); 70 //size-1 71 int s = --size; 72 //获取小顶堆的最后一个节点,用于替换 73 RunnableScheduledFuture replacement = queue[s]; 74 //将最后一个节点置为null 75 queue[s] = null; 76 //如果要删除的节点本身就是最后一个节点的话,就可以直接返回true了,因为不影响小顶堆的结构 77 if (s != i) { 78 /* 79 否则执行一次siftDown下溯过程,将最后一个节点的值重新插入到小顶堆中 80 这其中会删除i位置处的节点(siftDown方法后面会再次调用,到时候再来详细分析该方法的实现) 81 */ 82 siftDown(i, replacement); 83 /* 84 经过上面的siftDown的操作后,如果最后一个节点的延迟时间本身就比要删除的节点的小的话, 85 那么就会直接将最后一个节点放在要删除节点的位置上。此时从删除节点到其下面的节点都是满足 86 小顶堆结构的,但是不能保证replacement也就是当前删除后的替换节点和其父节点之间满足小顶堆 87 结构,也就是说可能出现replacement节点的延迟时间比其父节点的还小的情况 88 */ 89 if (queue[i] == replacement) 90 //那么此时就调用一次siftUp上溯操作,再次调整replacement节点其上的小顶堆的结构即可 91 siftUp(i, replacement); 92 } 93 return true; 94 } finally { 95 //释放锁 96 lock.unlock(); 97 } 98 } 99100 /**101 * 第63行代码处:102 */103 private int indexOf(Object x) {104 if (x != null) {105 if (x instanceof ScheduledFutureTask) {106 //如果当前节点是ScheduledFutureTask类型的,就获取它的堆索引位107 int i = ((ScheduledFutureTask) x).heapIndex;108 //大于等于0和小于size说明当前节点还在小顶堆中,并且当前节点还在延迟队列中的话,就直接返回该索引位109 if (i >

= 0 & & I

< size && queue[i] == x)110 return i;111 } else {112 //否则就按照普通遍历的方式查找是否有相等的节点,如果有的话就返回索引位113 for (int i = 0; i < size; i++)114 if (x.equals(queue[i]))115 return i;116 }117 }118 //找不到的话就返回-1119 return -1;120 }121122 /**123 * 第22行代码处:124 */125 public boolean cancel(boolean mayInterruptIfRunning) {126 //调用FutureTask的cancel方法来尝试取消此任务的执行127 boolean cancelled = super.cancel(mayInterruptIfRunning);128 //如果取消成功了,并且允许删除节点,并且当前节点存在于小顶堆中的话,就删除它129 if (cancelled && removeOnCancel && heapIndex >

= 0) 130 remove (this); 131131return cancelled;132} 133134 / * * 135* ThreadPoolExecutor:136 * 25th line of code: 137138void ensurePrestart () {139 / / get the number of worker threads in the current thread pool 140int wc = workerCountOf (ctl.get ()); 141if (wc)

< corePoolSize)142 /*143 如果小于核心线程数,就添加一个核心线程,之前我在分析ThreadPoolExecutor的源码文章中讲过,144 addWorker方法的执行中会同时启动运行线程。这里传入的firstTask参数为null,因为不需要立即执行任务,145 而是从延迟队列中拿取任务146 */147 addWorker(null, true);148 else if (wc == 0)149 //如果当前没有工作线程,就去添加一个非核心线程,然后运行它。保证至少要有一个线程150 addWorker(null, false);151 /*152 从这里可以看出,如果当前的工作线程数已经达到了核心线程数后,就不会再创建工作线程了153 定时线程池最多只有"核心线程数"个线程,也就是通过构造器传进来的参数大小154 */155 }6 添加任务 因为延迟队列是用小顶堆构建的,所以添加的时候会涉及到 小顶堆的调整: 1 /** 2 * ScheduledThreadPoolExecutor: 3 * 这里会调用DelayedWorkQueue的add方法 4 */ 5 public boolean add(Runnable e) { 6 return offer(e); 7 } 8 9 public boolean offer(Runnable x) { 10 //非空校验 11 if (x == null) 12 throw new NullPointerException(); 13 //强转类型 14 RunnableScheduledFuture e = (RunnableScheduledFuture) x; 15 final ReentrantLock lock = this.lock; 16 //加锁 17 lock.lock(); 18 try { 19 //获取当前的任务数量 20 int i = size; 21 //判断是否需要扩容(初始容量为16) 22 if (i >

= queue.length) 23 grow (); 24 / / size+1 25 size = I + 1; 26 if (I = = 0) {27 / / if this is the first task, just put it directly at the root node of the small top heap (first position in the queue) 28 queue [0] = e 29 / / at the same time, set the heap index bit of the current node to 0 30 setIndex (e, 0); 31} else {32 / / otherwise insert it into the position 33 siftUp (I, e) that should be inserted by siftUp. 34} 35 / / after the above insertion process, if the root node of the small top heap is still the current newly added node, it means that the delay time of the newly added node is the shortest 36 if (queue [0] = = e) {37 / /, then whether there is a leader thread or not, it must be set to null 38 leader = null. 39 / * 40 and re-transfer a node on the conditional queue to the CLH queue (if there is only one node at present, it will also enter the signal method 41, but it doesn't matter, because there are no nodes in the conditional queue at this time, so nothing will be done) it is important to note that if you have really seen the internal implementation of the signal method 42, you will know The signal method does not normally do the work of waking up the thread, which is 43 * / 44 available.signal () implemented in the following unlock method 45} 46} finally {47 / * 48 release the lock (note that only the next node of the head node in the CLH queue will be awakened, it may be the other thread that is locked above to add the task, 49 may be the thread that was ready to take the task again after the last task execution, or it may be the follower thread waiting to be woken up, or there may be other 50 cases. But whichever it is, as long as it ensures that the awakening action can be spread all the time. The execution details of ReentrantLock and blocking queue 51 can be found in my previous analysis of AQS source code) 52 * / 53 lock.unlock (); 54} 55 return true; 56} 57 58 / * 59 * 23 line of code: 60 * / 61 private void grow () {62 int oldCapacity = queue.length 63 / / you can see that the expansion strategy here is * 1.5: 64 int newCapacity = oldCapacity + (oldCapacity > > 1); 65 / / if the new capacity after expansion is overflowed, it will be restored to a maximum of 66 if of int (newCapacity)

< 0) 67 newCapacity = Integer.MAX_VALUE; 68 //使用Arrays.copyOf(System.arraycopy)的方式来进行数组的拷贝 69 queue = Arrays.copyOf(queue, newCapacity); 70 } 71 72 /** 73 * 第30行、第99行和第109行代码处: 74 * 设置f节点在小顶堆中的索引位为idx,这样在最后的删除节点时可以通过index是否大于0来判断当前节点是否仍在小顶堆中 75 */ 76 private void setIndex(RunnableScheduledFuture f, int idx) { 77 if (f instanceof ScheduledFutureTask) 78 ((ScheduledFutureTask) f).heapIndex = idx; 79 } 80 81 /** 82 * 第33行代码处: 83 * 堆排序的精髓就在于siftUp和siftDown方法,但本实现与常规的实现略有不同,多了一个入参key 84 * key代表当前要插入节点中的任务 85 */ 86 private void siftUp(int k, RunnableScheduledFuture key) { 87 //当k 0) { 89 //获取父节点的索引((当前节点索引位-1)/2的方式) 90 int parent = (k - 1) >

> 1; 91 / get the task of the parent node 92 RunnableScheduledFuture e = queue [parent]; 93 / / if the delay time of the task currently to be inserted into the node is greater than the delay time of the parent node, stop the uptracking process, indicating that the insertion location is found 94 if (key.compareTo (e) > = 0) 95 break 96 / / otherwise, you need to assign the contents of the parent node to the current node 97 queue [k] = e; 98 / / at the same time, set the heap index of the parent node to 99 setIndex (e, k) at the current node; 100 / then assign the parent node to the current node to continue the next uptracking process 101k = parent 102} 103 / * 104 shows that there are two situations: the uptracking process has ended, but the last parent node has not been assigned yet. Here is the assignment operation. 105 if the last node to be added in this method satisfies the small top heap condition, then it is assigning a value to the last node 106 * / 107 queue [k] = key;108 / / and setting the heap index bit 109 setIndex (key, k) of the node to be inserted. 111112 / * * 113* Line 94 code: 114115public int compareTo (Delayed other) {116 / / if the comparison is the current object, it directly returns 0 equivalent 117117if (other = = this) 118 return 0ash 119if (other instanceof ScheduledFutureTask) {120 / / if the task to be compared is also ScheduledFutureTask type, first redirect the type 121ScheduledFutureTask x = (ScheduledFutureTask) other Calculate the delay time difference between the current task and the task to be compared 123 long diff = time-x.timebox 124 if (diff

< 0)125 //小于0说明当前任务的延迟时间更短,就返回-1126 return -1;127 else if (diff >

0) 128 / / indicates that the delay time of the task to be compared is shorter, so return 1129 return 1position 130 / / if the two are equal, compare the sequence number. If the sequence number is lower (the sequence number is unique), it should be executed 1129 else if (sequenceNumber) first

< x.sequenceNumber)132 return -1;133 else134 return 1;135 }136 //如果需要比较的任务不是ScheduledFutureTask类型的话,就通过getDelay的方式来进行比较137 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);138 return (diff < 0) ? -1 : (diff >

0)? 1: 0th 139} 7 get the task

The addWorker method is called in the ensurePrestart method above to supplement the worker thread. As I mentioned in my previous article analyzing the ThreadPoolExecutor source code, the addWorker method calls the getTask method to fetch the task from the queue:

1 / * * 2 * ThreadPoolExecutor: 3 * / 4 private Runnable getTask () {5 / /... 6 / * 7 the allowCoreThreadTimeOutout here defaults to false (true indicates that the idle core thread also needs to be destroyed in timeout), 8 and the timed thread pool mentioned above has a maximum of "core threads", so timed is false 9 * / 10 boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize | 11 / /. 12 / / because timed is false, we will follow the logic 13 Runnable r = timed? 14 workQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): 15 workQueue.take () in the take method. 16 / /... 17} 18 19 / * 20 * ScheduledThreadPoolExecutor: 21 * Line 15: 22 * the above take method calls the take method of DelayedWorkQueue, which is 23 * / 24 public RunnableScheduledFuture take () throws InterruptedException {25 final ReentrantLock lock = this.lock; 26 / / locking (response interrupt mode) 27 lock.lockInterruptibly (), 28 try {29 for (), which is used to implement the delayed fetch task. ) {30 / / get the line head node 31 RunnableScheduledFuture first = queue [0] 32 if (first = = null) 33 / * 34 if there is no delayed task in the current delay queue, the current thread is blocked here (through the conditional queue in AQS), waiting for the task to be awakened. 35 in addition, when the thread has finished executing the task, it will go to this method in the getTask method again. If there is no task at this time, it will be blocked and dormant here 36 (as I said in a previous article on AQS source code analysis: all ReentrantLock lock resources will be released in the await method before it will be blocked) 37 * / 38 available.await () 39 else {40 / / otherwise the remaining delay time of the queue head is 41 long delay = first.getDelay (NANOSECONDS); 42 / / if the delay time has already been reached, delete and return to the queue head, indicating that task 43 if has been obtained (delay x = queue [s]) If there is more than one node in the delay queue before deletion, enter the siftDown method to delete the root node in the small top heap and re-maintain the small top heap 117 if (s! = 0) 118 siftDown (0, x) At the same time, set the heap index bit of the root node before deletion to-1, indicating that it does not exist in the small top heap. 120 setIndex (f,-1); 121 / / finally return it out 122 return f Line 118 code: key in the method parameter represents task 128129private void siftDown (int k, RunnableScheduledFuture key) {124125 / * 131where half of the length of the array is taken (note that size here is the size after the last node has been deleted) 132 and half is pointing to the next node of the last non-leaf node 133 * / 134 int half = size > 1 It can be seen here that the termination condition for downtracking is k greater than or equal to half, that is, traversing to the point where there are no non-leaf nodes. Naturally, there is no need to adjust 136while (k < half) {137 / / to get the index bit of the left child node 138int child = (k scheduleAtFixedRate (Runnable command, 6 long initialDelay, 7 long period) 8 TimeUnit unit) {9 / / non-null check 10 if (command = = null | | unit = = null) 11 throw new NullPointerException () 12 / / Nonnegative parity 13 if (period t = 33 (RunnableScheduledFuture) e 34 / / if you do not need to continue to execute the task when closing the thread pool Or you may need to continue but the task has been cancelled 35 if ((t.isPeriodic ()?! keepPeriodic:! keepDelayed) | | 36 t.isCancelled () {37 / / delete the current node 38 if (q.remove (t)) 39 / / cancel task 40 t.cancel (false) at the same time 41} 42} 43} 44} 45 / / to determine whether thread pool 46 tryTerminate () should be terminated based on thread pool status; 47} 4849 / * 50 * line 27 code: 51 * / 52 public void clear () {53 final ReentrantLock lock = this.lock;54 / / lock 55 lock.lock (); 56 try {57 for (int I = 0) I < size; iTunes +) {58 / / traversing to get each node in the delay queue 59 RunnableScheduledFuture t = queue [I]; 60 if (t! = null) {61 / / set the node to null62 queue [I] = null 63 / / at the same time, the index position is-1 (recheck) 64 setIndex (t,-1); 65} 66} 67 / / size is assigned as the initial value 068 size = 0ter69} finally {70 / / release lock 71 lock.unlock () 72} 73} Thank you for your reading. The above is the content of "how the timed thread pool achieves delayed execution and periodic execution". After the study of this article, I believe you have a deeper understanding of how the timed thread pool realizes delayed execution and periodic execution, and the specific use still needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report