Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the implementation principle of JUC thread pool ThreadPoolExecutor from the source code

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to analyze the implementation principle of JUC thread pool ThreadPoolExecutor from the source code, I believe that many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Premise

I planned to read the source code implementation of JUC thread pool ThreadPoolExecutor a long time ago. Due to the recent busy time, I have no time to sort out the article on source code analysis. In the previous analysis of extended thread pool implementation of callback Future, it was mentioned that the concurrency master Doug Lea has only one stateless execution method in the top-level interface Executor of the design thread pool ThreadPoolExecutor's submit task:

Public interface Executor {

Void execute (Runnable command)

}

While ExecutorService provides a lot of extension methods, the underlying layer is basically based on the Executor#execute () method. This paper focuses on the implementation of ThreadPoolExecutor#execute (). The author will analyze it in detail from the point of view of implementation principle, source code implementation and simplified examples. The source code of ThreadPoolExecutor is basically unchanged from JDK8 to JDK11, and JDK11 was used when this article was written.

The principle of ThreadPoolExecutor

ThreadPoolExecutor uses the JUC Synchronizer framework AbstractQueuedSynchronizer (commonly known as AQS), a large number of bit operations, CAS operations. ThreadPoolExecutor provides several important functions such as fixed active threads (core threads), additional threads (additional threads created in the thread pool capacity-number of core threads, hereinafter referred to as non-core threads), task queues, and reject policies.

JUC Synchronizer Framework

ThreadPoolExecutor uses the JUC Synchronizer framework, which is mainly used in four aspects:

The global lock mainLock member attribute, which is a reentrant lock ReentrantLock type, is mainly used for locking operations when accessing worker thread Worker collections and recording data statistics. The condition variable termination, Condition type, is mainly used for thread blocking with duration when waiting for the termination of the awaitTermination () method. Task queue workQueue, BlockingQueue type, task queue, used to store tasks to be executed. Worker threads, the inner class Worker type, are the real worker thread objects in the thread pool.

About AQS, the author wrote an article about source code analysis: JUC Synchronizer framework AbstractQueuedSynchronizer source code analysis.

Core thread

Here, refer to the implementation of ThreadPoolExecutor and simplify it to implement a thread pool with only core threads. The requirements are as follows:

Do not consider the processing in the case of abnormal task execution for the time being. The task force is listed as an unbounded queue. The thread pool capacity is fixed to the number of core threads. Don't consider the rejection strategy for the time being. Public class CoreThreadPool implements Executor {

Private BlockingQueue workQueue

Private static final AtomicInteger COUNTER = new AtomicInteger ()

Private int coreSize

Private int threadCount = 0

Public CoreThreadPool (int coreSize) {

This.coreSize = coreSize

This.workQueue = new LinkedBlockingQueue ()

}

@ Override

Public void execute (Runnable command) {

If (+ + threadCount pool.execute (()->)

System.out.println (String.format ("Thread:%s,value:%d", Thread.currentThread () .getName (), I)

Thread.sleep (Integer.MAX_VALUE)

}

}

The result of one run is as follows:

Thread:Worker-0,value:0

Thread:Worker-3,value:3

Thread:Worker-2,value:2

Thread:Worker-1,value:1

Thread:Worker-4,value:4

Thread:Worker-1,value:5

Thread:Worker-2,value:8

Thread:Worker-4,value:7

Thread:Worker-0,value:6

Thread:Worker-3,value:9

When designing this thread pool, the core thread is created lazily, blocking the take () method in the task queue when the thread is idle, but it is similar to the implementation for ThreadPoolExecutor, except that if keepAliveTime is used and the core thread timeout is allowed (allowCoreThreadTimeOut is set to true), then BlockingQueue#poll (keepAliveTime) is used for polling instead of permanent blocking.

Other additional features

When building a ThreadPoolExecutor instance, you need to define maximumPoolSize (maximum number of threads in the thread pool) and corePoolSize (number of core threads). When the task queue is a bounded blocking queue, the core thread is full and the task queue is full, an attempt is made to create additional maximumPoolSize-corePoolSize threads to execute the newly committed task. When ThreadPoolExecutor implements two main additional functions here are:

Under certain conditions, a non-core thread will be created to execute the task. The recycling cycle of the non-core thread (the end of the thread life cycle) is keepAliveTime. The condition for the end of the thread life cycle is that the next time a task is obtained through the task queue and the survival time exceeds keepAliveTime. Provide a deny policy, that is, when the core thread is full, the task queue is full, and the non-core thread is full, the reject policy will be triggered. Source code analysis

First analyze the key attributes of the thread pool, then analyze its state control, and finally focus on the ThreadPoolExecutor#execute () method.

Key attribute public class ThreadPoolExecutor extends AbstractExecutorService {

/ / Control variables-store status and number of threads

Private final AtomicInteger ctl = new AtomicInteger (ctlOf (RUNNING, 0))

/ / Task queue, which must be a blocking queue

Private final BlockingQueue workQueue

/ / worker thread collection, which stores all (active) worker threads in the thread pool. This collection can only be accessed if the global lock mainLock is held.

Private final HashSet workers = new HashSet ()

/ / Global lock

Private final ReentrantLock mainLock = new ReentrantLock ()

/ / wait condition variables used by the awaitTermination method

Private final Condition termination = mainLock.newCondition ()

/ / record peak threads

Private int largestPoolSize

/ / record the number of tasks that have been successfully executed

Private long completedTaskCount

/ / Thread factory, used to create a new thread instance

Private volatile ThreadFactory threadFactory

/ / refuses to execute the processor, corresponding to different rejection policies

Private volatile RejectedExecutionHandler handler

/ / the time period for idle threads to wait for a task, in nanoseconds

Private volatile long keepAliveTime

/ / whether to allow the core thread to time out. If true, keepAliveTime is also effective for the core thread.

Private volatile boolean allowCoreThreadTimeOut

/ / number of core threads

Private volatile int corePoolSize

/ / Thread pool capacity

Private volatile int maximumPoolSize

/ / other codes are omitted

}

Let's look at the constructor with the longest argument list:

Public ThreadPoolExecutor (int corePoolSize

Int maximumPoolSize

Long keepAliveTime

TimeUnit unit

BlockingQueue workQueue

ThreadFactory threadFactory

RejectedExecutionHandler handler) {

If (corePoolSize

< 0 || maximumPoolSize largestPoolSize) largestPoolSize = s; // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例 workerAdded = true; } } finally { mainLock.unlock(); } // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例 if (workerAdded) { t.start(); // 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程集合移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // 添加Worker失败 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 从工作线程集合移除之 if (w != null) workers.remove(w); // wc数量减1 decrementWorkerCount(); // 基于状态判断尝试终结线程池 tryTerminate(); } finally { mainLock.unlock(); } } 笔者发现了Doug Lea大神十分喜欢复杂的条件判断,而且单行复杂判断不喜欢加花括号,像下面这种代码在他编写的很多类库中都比较常见: if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; // .... // 代码拆分一下如下 boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >

= SHUTDOWN (0)

Boolean atLeastStop = runStateAtLeast (c, STOP) | | firstTask! = null | | workQueue.isEmpty ()

If (atLeastShutdown & & atLeastStop) {

Return false

}

It should be noted in the above analysis logic that when a Worker instance is created, a Java thread Thread instance is created through ThreadFactory in its constructor. After locking, a second check is made to see if the Worker instance needs to be added to the worker thread collection workers and whether the Thread instance held in the Worker needs to be started. Only when the Thread instance is started, the Java instance really starts to operate, otherwise it is just a useless temporary object. Worker itself implements the Runnable interface, which can be thought of as an adapter for Runnable.

Worker source code analysis of inner class of worker thread

Each specific worker thread in the thread pool is packaged as an instance of the inner class Worker, and Worker inherits from AbstractQueuedSynchronizer (AQS) and implements the Runnable interface:

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

/ * *

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

, /

Private static final long serialVersionUID = 6138294804551838833L

/ / Save the thread instance created by ThreadFactory. If ThreadFactory fails to create thread, it will be null.

Final Thread thread

/ / Save the incoming Runnable task instance

Runnable firstTask

/ / record the total number of tasks completed by each thread

Volatile long completedTasks

/ / unique constructor. Pass the task instance firstTask. Note that it can be null.

Worker (Runnable firstTask) {

/ / Thread interruption is prohibited until the runWorker () method executes

SetState (- 1); / / inhibit interrupts until runWorker

This.firstTask = firstTask

/ / create a thread instance through ThreadFactory. Note that the Worker instance itself is used as a Runnable to create a new thread instance.

This.thread = getThreadFactory () .newThread (this)

}

/ / delegate to the external runWorker () method. Note that the runWorker () method is a thread pool method, not a Worker method

Public void run () {

RunWorker (this)

}

/ / Lock methods

/ /

/ / The value 0 represents the unlocked state.

/ / The value 1 represents the locked state.

/ / whether to hold an exclusive lock. A state value of 1 indicates holding the lock, and a state value of 0 indicates that the lock has been released

Protected boolean isHeldExclusively () {

Return getState ()! = 0

}

/ / try to acquire resources in exclusive mode. The variables passed in are not judged here. CAS directly determines whether the update from 0 to 1 is successful, and if successful, the exclusive thread is set to the current thread.

Protected boolean tryAcquire (int unused) {

If (compareAndSetState (0,1)) {

SetExclusiveOwnerThread (Thread.currentThread ())

Return true

}

Return false

}

/ / whether the resource is attempted in exclusive mode. The passed variable is not determined here, and state is set to 0 directly.

Protected boolean tryRelease (int unused) {

SetExclusiveOwnerThread (null)

SetState (0)

Return true

}

/ / add lock

Public void lock () {acquire (1);}

/ / attempt to lock

Public boolean tryLock () {return tryAcquire (1);}

/ / unlock

Public void unlock () {release (1);}

/ / whether to lock

Public boolean isLocked () {return isHeldExclusively ();}

/ / Thread is interrupted after startup. Note that it will determine whether the interrupt flag bit of the thread instance is false, and only if the interrupt flag bit is false.

Void interruptIfStarted () {

Thread t

If (getState () > = 0 & & (t = thread)! = null & &! t.isInterrupted ()) {

Try {

T.interrupt ()

} catch (SecurityException ignore) {

}

}

}

}

The logic in the constructor of Worker is very important. The Thread instance created through ThreadFactory is also passed into the Worker instance. Because Worker itself implements Runnable, it can be submitted to the thread for execution as a task. As long as the thread instance w held by Worker calls the Thread#start () method, Worker#run () can be executed at the right time. Simplify the logic as follows:

/ / construct in the addWorker () method

Worker worker = createWorker ()

/ / passed in through thread pool construction

ThreadFactory threadFactory = getThreadFactory ()

/ / in the Worker constructor

Thread thread = threadFactory.newThread (worker)

/ / start in the addWorker () method

Thread.start ()

Worker inherits from AQS, and the exclusive mode of AQS is used here. A trick here is to set the resource (state) of AQS to-1 through setState (- 1) when constructing Worker. This is because the default value of state in AQS is 0 when the Worker instance is created, and the thread has not been started, so you cannot interrupt the thread at this time, as shown in the Worker#interruptIfStarted () method. Neither of the two overriding AQS methods in Worker, tryAcquire () and tryRelease (), directly CAS (0Jing 1) and tryRelease (0), determine the incoming variables. Then take a look at the core method ThreadPoolExecutor#runWorker ():

Final void runWorker (Worker w) {

/ / get the current thread, which is actually the same as the thread instance held by Worker

Thread wt = Thread.currentThread ()

/ / get the task object passed in during initialization held in Worker, which is stored in the temporary variable task.

Runnable task = w.firstTask

/ / set the task object passed in during initialization held in Worker to null

W.firstTask = null

/ / since the state in the AQS is set to-1 when Worker is initialized, you need to unlock the state first and update it to 0, allowing the thread to interrupt

W.unlock (); / / allow interrupts

/ / record whether the thread terminates abnormally because of the user. Default is true.

Boolean completedAbruptly = true

Try {

/ / the initialization task object is not null, or the task obtained from the task queue is not empty (the task obtained from the task queue will be updated to the temporary variable task)

/ / getTask () due to the use of blocking queues, the while loop will be blocked or timed out if it hits the second half of the loop. Returning getTask () to null will cause the thread to jump out of the dead loop and terminate the thread.

While (task! = null | | (task = getTask ())! = null) {

/ / Worker locking, which essentially means that AQS acquires resources and attempts CAS to update state from 0 to 1.

W.lock ()

/ / If pool is stopping, ensure thread is interrupted

/ / if not, ensure thread is not interrupted. This

/ / requires a recheck in second case to deal with

/ / shutdownNow race while clearing interrupt

/ / if the thread pool is stopping (that is, from RUNNING or SHUTDOWN state to STOP state), make sure the current worker thread is in an interrupted state

/ / otherwise, make sure that the current thread is not interrupted

If ((runStateAtLeast (ctl.get (), STOP)) | |

(Thread.interrupted () & &

RunStateAtLeast (ctl.get (), STOP)) & &

! wt.isInterrupted ()

Wt.interrupt ()

Try {

/ / Hook method, before task execution

BeforeExecute (wt, task)

Try {

Task.run ()

/ / Hook method, after task execution-normal condition

AfterExecute (task, null)

} catch (Throwable ex) {

/ / Hook method, after task execution-exception

AfterExecute (task, ex)

Throw ex

}

} finally {

/ / clear the task temporary variable, which is important, otherwise while will execute the same task in an endless loop

Task = null

/ / accumulate the number of tasks completed by Worker

W.completedTasksgiving +

/ / Worker is unlocked, which essentially means that AQS releases resources. Set state to 0.

W.unlock ()

}

}

/ / go here to indicate that when getTask () returns null, the thread exits normally.

CompletedAbruptly = false

} finally {

/ / the processing thread exits. CompletedAbruptly indicates that the thread exits abnormally due to user exception when it is true.

ProcessWorkerExit (w, completedAbruptly)

}

}

Here we focus on disassembling and analyzing the code that determines the current interrupt status of the worker thread:

If ((runStateAtLeast (ctl.get (), STOP)) | |

(Thread.interrupted () & &

RunStateAtLeast (ctl.get (), STOP)) & &

! wt.isInterrupted ()

Wt.interrupt ()

/ / simplify the judgment logic first, as follows

/ / determine whether the thread pool status is at least STOP,rs > = STOP (1)

Boolean atLeastStop = runStateAtLeast (ctl.get (), STOP)

/ / determine whether the thread pool state is at least STOP, at the same time determine the interrupt state of the current thread and clear the interrupt state of the current thread

Boolean interruptedAndAtLeastStop = Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP)

If (atLeastStop | | interruptedAndAtLeastStop & &! wt.isInterrupted ()) {

Wt.interrupt ()

}

The Thread.interrupted () method gets the interrupt state of the thread and clears the interrupt state. This method is called here because it is possible to call the shutdownNow () method while executing the above if logic, and there is also logic in the shutdownNow () method to interrupt all Worker threads, but because the shutdownNow () method traverses all Worker threads to interrupt, it may not be possible to interrupt in time before the task is submitted to Worker for execution. So this interrupt logic is executed inside the Worker, which is the logic of the if code block. Also note here: in the STOP state, all newly submitted tasks are rejected, tasks in the task queue are no longer executed, and all Worker threads are interrupted. That is, "even if the task Runnable has been logically fetched in the first half of runWorker (), it may be interrupted as long as it hasn't reached the call to its Runnable#run ()." Assuming that the logic entering the if code block happens and the shutdownNow () method is called externally, then the thread interrupt state is determined and reset within the if logic, then the interruptWorkers () called in the shutdownNow () method will not interrupt the thread twice (which will cause an exception) because of a problem in the interrupt state judgment.

Summarize the core flow of the runWorker () method above:

Worker first performs an unlock operation to release the uninterruptible state. The getTask () method is called through the while loop to get the task from the task queue (of course, the first loop may also be an externally incoming firstTask task instance). If the thread pool changes to the STOP state, you need to ensure that the worker thread is in the interrupted state and handle the interrupt, otherwise the worker thread must not be in the interrupted state. Execute the task instance Runnale#run () method, and the hook methods beforeExecute () and afterExecute () are called before and after the task instance execution (including normal execution and exception execution), respectively. Jumping out of the while loop means the end of the runWorker () method and the end of the worker thread life cycle (the end of the Worker#run () life cycle), and processWorkerExit () is called to handle the subsequent work of the worker thread exiting. J-u-c-t-p-e-4

Next, take a look at the getTask () method to get the task from the task queue and the method processWorkerExit () to handle the subsequent work that the thread exits.

Source code analysis of getTask method

The getTask () method is the way that the worker thread gets the task object in the task queue in the while dead loop:

Private Runnable getTask () {

/ / record whether the last time you pulled from the queue timed out

Boolean timedOut = false; / / Did the last poll () time out?

/ / notice that this is an endless cycle

For (;;) {

Int c = ctl.get ()

/ / Check if queue empty only if necessary.

/ / the first if: if the thread pool state is at least SHUTDOWN, that is, rs > = SHUTDOWN (0), you need to determine two situations (or logic):

/ / 1. The thread pool state is at least STOP (1), that is, the thread pool is stopping, and the shutdownNow () method is generally called

/ / 2. Task team is listed as empty

/ / if the thread pool is at least SHUTDOWN and meets one of the above two conditions, the number of worker threads wc minus 1, and then directly returns null

If (runStateAtLeast (c, SHUTDOWN)

& & (runStateAtLeast (c, STOP) | | workQueue.isEmpty ()) {

DecrementWorkerCount ()

Return null

}

/ / running here indicates that the thread pool is still in RUNNING state, and the number of worker threads at a time is retrieved.

Int wc = workerCountOf (c)

/ / Are workers subject to culling?

/ / the timed temporary variable bravely controls the thread timeout to determine whether it is necessary to pull the task of the task queue through the non-blocking method of poll () with timeout.

/ / 1.allowCoreThreadTimeOut defaults to false. If set to true, core threads are also allowed to pull tasks from the task queue through the poll () method.

/ / 2. When the number of working threads is greater than the number of core threads, additional non-core threads are created in the thread pool. These non-core threads must pull tasks from the task queue through the poll () method.

Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize

/ / second if:

/ / 1.wc > maximumPoolSize indicates that the current total number of worker threads is larger than that of maximumPoolSize, indicating that the thread pool capacity is reduced through the setMaximumPoolSize () method.

/ / or 2.timed & & timedOut indicates that the thread hit the timeout control and the previous loop pulled the task as null from the task queue through the poll () method

/ and 3. If the total number of worker threads is greater than 1 or the task queue is empty, subtract 1 from the number of threads through CAS and return null at the same time

/ / if CAS subtracts 1 from the number of threads, it will enter the next cycle and try again.

If ((wc > maximumPoolSize | | (timed & & timedOut))

& & (wc > 1 | | workQueue.isEmpty ()) {

If (compareAndDecrementWorkerCount (c)

Return null

Continue

}

Try {

/ / if timed is true, timeout pull is done through the poll () method, and no valid task is waited within the keepAliveTime time, then null is returned

/ / if timed is false, blocking pull through take () will block until the next valid task is available before returning (usually not null)

Runnable r = timed?

WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS):

WorkQueue.take ()

/ / it is very important here. It is returned only when it is not null. In the case of null, it will enter the next cycle.

If (r! = null)

Return r

/ / running here indicates that the last task obtained from the task queue is null. Generally speaking, the workQueue.poll () method returns null when it times out.

TimedOut = true

} catch (InterruptedException retry) {

TimedOut = false

}

}

}

In this method, there are two very large if logic. For scenarios where the first if may cause the number of worker threads to subtract 1 and return null directly:

The thread pool state is SHUTDOWN, usually the shutdown () method is called, and the task queue is empty. The thread pool status is STOP.

For the second if, the logic is a little complicated, so disassemble it first:

/ / the total number of worker threads is larger than maximumPoolSize, indicating that the thread pool capacity is reduced through the setMaximumPoolSize () method.

Boolean b1 = wc > maximumPoolSize

/ / allow the thread to time out while pulling the task null from the task queue through the poll () method in the previous round

Boolean b2 = timed & & timedOut

/ / the total number of worker threads is greater than 1

Boolean b3 = wc > 1

/ / Task team is listed as empty

Boolean b4 = workQueue.isEmpty ()

Boolean r = (b1 | | b2) & & (b3 | | b4)

If (r) {

If (compareAndDecrementWorkerCount (c)) {

Return null

} else {

Continue

}

}

This logic is for non-core threads in most cases. In the execute () method, when the total number of thread pools has exceeded corePoolSize and is less than maximumPoolSize, non-core threads are added through addWorker (task,false) when the task queue is full. The logic here happens to be similar to the reverse operation of addWorker (task,false), which is used to reduce the number of non-core threads and make the total number of worker threads tend to corePoolSize. If, for a non-core thread, the last loop fetches the task object as null, this cycle can easily satisfy that timed & & timedOut is true. When getTask () returns null, it causes the Worker#runWorker () method to jump out of the endless loop, and then executes the processWorkerExit () method to handle the subsequent work, while the corresponding Worker of the non-core thread becomes a "free object", waiting to be reclaimed by JVM. When allowCoreThreadTimeOut is set to true, the life cycle termination logic of the non-core thread analyzed here also applies to the core thread. Then you can sum up the meaning of keepAliveTime:

When core threads are allowed to time out, that is, when allowCoreThreadTimeOut is set to true, keepAliveTime represents the lifetime of idle worker threads. Core threads are not allowed to time out by default, and keepAliveTime represents the survival period of idle non-core threads.

In some specific scenarios, a properly configured keepAliveTime can make better use of the worker thread resources of the thread pool.

Source code analysis of processWorkerExit method

The processWorkerExit () method is to clean up and record the data for the terminating Worker (because the processWorkerExit () method is also wrapped in the runWorker () method finally code block, but the worker thread doesn't really end until the worker has finished executing the processWorkerExit () method).

Private void processWorkerExit (Worker w, boolean completedAbruptly) {

/ / because the termination of the thread is caused by throwing a user exception, you can directly reduce the number of worker threads by 1

/ / if getTask () returns null to guide the thread to normally jump out of the while dead loop of the runWorker () method without any exception thrown, in this case, the number of threads has been subtracted by 1 in getTask ().

If (completedAbruptly) / / If abrupt, then workerCount wasn't adjusted

DecrementWorkerCount ()

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

/ / the number of completed task records globally plus the number of completed tasks in this Worker that will be terminated

CompletedTaskCount + = w.completedTasks

/ / remove this terminating Worker from the worker thread collection

Workers.remove (w)

} finally {

MainLock.unlock ()

}

/ / see the analysis in the next section to determine whether thread pool terminate processing is required based on the status of the current thread pool.

TryTerminate ()

Int c = ctl.get ()

/ / if the thread pool status is less than STOP, that is, if it is in RUNNING or SHUTDOWN status:

/ / 1. If the thread is not terminated by a user exception thrown, and if the core thread is allowed to time out, keep at least one worker thread in the thread pool

/ / 2. If the thread terminates abnormally due to throwing a user, or the current number of worker threads, add a new non-core thread directly

If (runStateLessThan (c, STOP)) {

If (! completedAbruptly) {

/ / if the core thread is allowed to time out, the minimum is 0, otherwise it is corePoolSize

Int min = allowCoreThreadTimeOut? 0: corePoolSize

/ / if the minimum value is 0 and the task queue is not empty, the minimum update value is 1

If (min = = 0 & &! WorkQueue.isEmpty ()

Min = 1

/ / the number of worker threads is greater than or equal to the minimum, and no new non-core threads are added.

If (workerCountOf (c) > = min)

Return; / / replacement not needed

}

AddWorker (null, false)

}

}

The later part of the code determines the state of the thread pool. If the thread pool is RUNNING or SHUTDOWN state, if the current worker thread is terminated because a user exception is thrown, a new non-core thread will be created. If the current worker thread is not terminating a user exception (normally terminated), it will be handled as follows:

AllowCoreThreadTimeOut is true, which allows core threads to time out, and if the task queue is empty, at least one worker thread is kept in the thread pool by creating a non-core thread. AllowCoreThreadTimeOut is false, return directly if the total number of worker threads is greater than corePoolSize, otherwise create a non-core thread, that is, it will tend to keep the number of worker threads in the thread pool tend to corePoolSize.

After the execution of processWorkerExit (), it means that the life cycle of the worker thread is over.

Source code analysis of tryTerminate method

The tryTerminate () method is called at the end of each worker thread:

Final void tryTerminate () {

For (;;) {

Int c = ctl.get ()

/ / determine the status of the thread pool, and return directly if it is any of the following three cases:

/ / 1. Thread pool is in RUNNING state

/ / 2. The thread pool is at least in the TIDYING state, that is, the TIDYING or TERMINATED state, which means that the thread pool is coming to an end.

/ / 3. The thread pool is at least STOP state and the task queue is not empty

If (isRunning (c) | |

RunStateAtLeast (c, TIDYING) | |

RunStateLessThan (c, STOP) & &! WorkQueue.isEmpty ())

Return

/ / if the number of worker threads is not 0, the first idle worker thread in the worker thread collection is interrupted

If (workerCountOf (c)! = 0) {/ / Eligible to terminate

InterruptIdleWorkers (ONLY_ONE)

Return

}

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

/ / CAS sets the thread pool status to TIDYING, and executes the hook method terminated () if the setting is successful

If (ctl.compareAndSet (c, ctlOf (TIDYING, 0)) {

Try {

Terminated ()

} finally {

/ / Last update thread pool status is TERMINATED

Ctl.set (ctlOf (TERMINATED, 0))

/ / Wake up all threads blocking the termination condition, and the await () method of this variable is called in awaitTermination ()

Termination.signalAll ()

}

Return

}

} finally {

MainLock.unlock ()

}

/ / else retry on failed CAS

}

}

/ / interrupt idle worker threads. When onlyOne is true, only one thread in the worker thread collection will be interrupted

Private void interruptIdleWorkers (boolean onlyOne) {

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

For (Worker w: workers) {

Thread t = w.thread

/ / Thread interruption occurs only when it is judged that the thread is not in an interrupted state and the attempt to acquire the lock is successful.

If (! t.isInterrupted () & & w.tryLock ()) {

Try {

T.interrupt ()

} catch (SecurityException ignore) {

} finally {

W.unlock ()

}

}

/ / the loop is jumped out here, that is, only the first worker thread in the collection is interrupted

If (onlyOne)

Break

}

} finally {

MainLock.unlock ()

}

}

The puzzle here is the second if code logic of the tryTerminate () method: if the number of worker threads is not zero, the first idle worker thread in the worker thread collection will be interrupted. There is a paragraph in the method API comment:

If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. When the condition for terminating the thread pool is met but the number of worker threads is not zero, an idle worker thread needs to be interrupted to ensure that the signal of thread pool closure is propagated.

In the shutdown () method that will be analyzed below, all idle threads will be interrupted by interruptIdleWorkers (). At this time, there may be a non-idle thread executing a task. After the task is finished, if it happens to be the core thread, it will block in the take () method of the task queue in the next loop, or even permanently block in the take () method of the task queue after the thread pool is closed without additional intervention. To avoid this situation, when each worker thread exits, it attempts to interrupt one of the idle threads in the worker set to ensure that all idle threads can exit normally.

In the interruptIdleWorkers () method, each worker thread is judged by tryLock () first, and thread interruption is possible only if true is returned. We know that in the runWorker () method, every time the worker thread gets a non-null task from the task queue, it will first lock the Worker#lock () operation, which can prevent the thread from being interrupted during the execution of the task and ensure that the worker thread that is interrupted must be an idle worker thread.

Source code analysis of shutdown method

There are several variants of the thread pool close operation. Take a look at shutdown () first:

Public void shutdown () {

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

/ / permission verification, security policy related judgment

CheckShutdownAccess ()

/ / set SHUTDOWN status

AdvanceRunState (SHUTDOWN)

/ / interrupt all idle worker threads

InterruptIdleWorkers ()

/ / Hook method

OnShutdown (); / / hook for ScheduledThreadPoolExecutor

} finally {

MainLock.unlock ()

}

/ / call the bold attempt terminate method above to change the state to TIDYING. After executing the hook method terminated (), the final state is updated to TERMINATED.

TryTerminate ()

}

/ / lifting status

Private void advanceRunState (int targetState) {

/ / assert targetState = = SHUTDOWN | | targetState = = STOP

For (;;) {

Int c = ctl.get ()

/ / if the thread pool status is at least targetState or CAS is set to targetState, you will jump out of the loop.

If (runStateAtLeast (c, targetState) | |

Ctl.compareAndSet (c, ctlOf (targetState, workerCountOf (c)

Break

}

}

/ / interrupt all idle worker threads

Private void interruptIdleWorkers () {

InterruptIdleWorkers (false)

}

Then look at the shutdownNow () method:

Public List shutdownNow () {

List tasks

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

/ / permission verification, security policy related judgment

CheckShutdownAccess ()

/ / set STOP status

AdvanceRunState (STOP)

/ / interrupt all worker threads

InterruptWorkers ()

/ / clear the work queue and retrieve all outstanding tasks

Tasks = drainQueue ()

} finally {

MainLock.unlock ()

}

/ / call the bold attempt terminate method above to change the state to TIDYING. After executing the hook method terminated (), the final state is updated to TERMINATED.

TryTerminate ()

Return tasks

}

/ / traverses all worker threads and interrupts if state > 0 (startup status)

Private void interruptWorkers () {

/ / assert mainLock.isHeldByCurrentThread ()

For (Worker w: workers)

W.interruptIfStarted ()

}

The shutdownNow () method changes the thread pool state to STOP, interrupts all worker threads (Worker instances where the state value of AbstractQueuedSynchronizer is greater than 0, that is, Worker that is executing tasks and idle Worker), then iterates through the task queue, takes out (removes) all tasks and returns them in a list.

Finally, look at the awaitTermination () method:

Public boolean awaitTermination (long timeout, TimeUnit unit)

Throws InterruptedException {

/ / convert timeout in nanoseconds

Long nanos = unit.toNanos (timeout)

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

/ / Loop waits until the thread pool state changes to TERMINATED, and each loop waits for nanos nanoseconds

While (runStateLessThan (ctl.get (), TERMINATED)) {

If (nanos TERMINATED, hook method terminated () callback and condition variables wake up interruptIdleWorkers to protect worker thread interruptions serialization, avoid "interrupt storm" addWorker protect worker thread collection to avoid concurrent increase in worker thread, protection metric statistics change processWorkerExit protection metric statistics change shutdown, shutdownNow and awaitTermination see below to analyze getPoolSize, getActiveCount, getLargestPoolSize, getTaskCount and getCompletedTaskCount protection metric statistics read These statistics are generally derived from the attribute statistics of the Worker collection.

Here's a look at how thread pools can be closed relatively gracefully through reentrant locks and condition variables. Let's first look at the shutdown () method:

Public void shutdown () {

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

CheckShutdownAccess ()

AdvanceRunState (SHUTDOWN)

InterruptIdleWorkers ()

OnShutdown (); / / hook for ScheduledThreadPoolExecutor

} finally {

MainLock.unlock ()

}

TryTerminate ()

}

Here shutdown () in addition to tryTerminate (), other its methods are wrapped in the lock, "ensure the stability of the worker thread set and turn off permissions, ensure that the state change serialization, interrupt all worker threads and avoid worker thread" interruption storm "(multiple concurrent calls to shutdown () if not locked, will repeatedly interrupt the worker thread).

Public List shutdownNow () {

List tasks

Final ReentrantLock mainLock = this.mainLock

MainLock.lock ()

Try {

CheckShutdownAccess ()

AdvanceRunState (STOP)

InterruptWorkers ()

Tasks = drainQueue (); # 0 is not satisfied and will not enter a loop, so the delta worker thread that needs to be created should be lazy when submitting a new task.

Int k = Math.min (delta, workQueue.size ())

While (KMY-> 0 & & addWorker (null, true)) {

/ / if the task queue is empty, jump out of the loop

If (workQueue.isEmpty ())

Break

}

}

}

Here is a description in the code block after else if (delta > 0). We don't know how many new worker threads are really needed. As a heuristic, pre-start enough new worker threads (until the number is the core thread pool size) to handle the current task in the queue, but if the queue becomes empty when doing so, stop creating new worker threads.

❞after reading the above, do you know how to analyze the implementation principle of JUC thread pool ThreadPoolExecutor from the source code? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report