Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is ThreadPoolExecutor?

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the relevant knowledge of "what is ThreadPoolExecutor". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

ThreadPoolExecutor is an ExecutorService that performs each commit task by using one of several possible pool threads, which are typically configured through the Executors factory method.

Thread pools in ThreadPoolExecutor address two different issues:

1. Because they reduce the overhead of each task call, they usually provide improved performance when performing a large number of asynchronous tasks

2. They provide a means of bounding and managing resources, including multithreading, consumption when executing a set of tasks.

Each ThreadPoolExecutor also maintains some basic statistics, such as the number of tasks completed.

The important member variable AtomicInteger ctl in ThreadPoolExecutor

The ctl of type AtomicInteger represents the control state in ThreadPoolExecutor. It is a member variable of a check type and an atomic integer. It wraps two concepts with the help of high and low bits:

(1) workerCount: the number of threads currently active in the thread pool, accounting for the lowest 29 bits of ctl.

(2) runState: the running state of thread pool occupies the top 3 bits of ctl, and there are five states: RUNNING, SHUTDOWN, STOP, TIDYING and TERMINATED.

/ / COUNT_BITS divides the 32-bit binary offset, Integer.SIZE is the Integer type length (32), COUNT_BITS=29, the high 3 bits save the state of the thread pool, and the low 29 bits are used to measure the number of worker threads in the object pool private static final int COUNT_BITS= Integer.SIZE-3 private static final int RUNNING =-1 = (1 > > 29)-1, otherwise you are not allowed to create worker threads if (wc > = CAPACITY | | wc > = (core? CorePoolSize: maximumPoolSize) return false; / / CAS current value + 1 replace current value, radical replacement return value to determine whether the replacement is successful, successful, directly jump out of the cycle if (compareAndIncrementWorkerCount (c)) break retry / / it means that the CAS replacement failed, then reset the value, determine whether the latest pool status is still RUNNING,RUNNNING status, then continue to execute the loop body, try the ctl+1 operation / / otherwise jump directly into the outer loop, and determine whether the creation of task threads is allowed by c = ctl.get () / / Re-read ctl if (runStateOf (c)! = rs) continue retry; / / else CAS failed due to workerCount change; retry inner loop}} / / the creation tag for this task and the corresponding thread startup tag boolean workerStarted = false; boolean workerAdded = false; Worker w = null Try {/ / create a task worker thread, see the following Worker source code w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {final ReentrantLock mainLock = this.mainLock; / / try to hold the exclusive lock mainLock.lock () of the worker collection Try {/ / if the thread pool state is not RUNNING or the SHUTDOWN state TASK is not empty when the lock is acquired, the task work object is not allowed to join the collection, nor is the thread allowed to start int rs = runStateOf (ctl.get ()); if (rs

< SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //检查线程状态是否可启动 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //记录worker集合在某一刻的长度最大数,按照配置来说,也就是同时存活存货线程数最大也顶多就是MaxPoolSize if (s >

LargestPoolSize) largestPoolSize = s; / / Open the task work object to join the collection success mark workerAdded = true }} finally {/ / release the exclusive lock on the permissions of the worker collection, because there may be N tasks at the same time that need to create objects to join the workers collection mainLock.unlock () } if (workerAdded) {/ / join the working set successfully, then you need to start the built-in thread of the working object t.start (); / / the working object thread starts successfully marking workerStarted = true } finally {if (! WorkerStarted) / / failed to add task work object, see the following source code addWorkerFailed (w);} return workerStarted;} / / hold the MainLock lock, let the workers collection remove the task that failed to add, and roll back the cas self-increment operation of ctl above, attempt to abort the thread, and finally release the lock private void addWorkerFailed (Worker w) {final ReentrantLock mainLock = this.mainLock; mainLock.lock () Try {if (w! = null) workers.remove (w); / / Recursive loop until the drop operation succeeds decrementWorkerCount (); / / attempts to stop the thread pool, which is not described in detail, but tryTerminate () will be introduced in runWorker;} finally {mainLock.unlock () }} / / ThreadPoolExectutor inner class private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ / the task work object has built-in threads (used to process the tasks in the work object and the tasks in the queue). If ThreadFactory is abnormal, thread must be the initialization task of the task work object null final Thread thread; / /. It may be NULL (Wake up Task) Runnable firstTask. / / the number of tasks completed by the task volatile long completedTasks; / * construct the internal variable of the New instance * state is-1 by default. The thread interrupt signal can not be triggered until the lock modification state=1 is executed in runWorker. See the following interruptIfStarted * / Worker (Runnable firstTask) {setState (- 1) This.firstTask = firstTask; this.thread = getThreadFactory (). NewThread (this);} / * * hand over the processing logic to ThrearunWorker * / public void run () {runWorker (this);} / / determine whether the working object of the task has a thread holding lock protected boolean isHeldExclusively () {return getState ()! = 0 } / / when the thread pool is modified to STOP state, an interrupt signal void interruptIfStarted () {Thread t; if (getState ()) > = 0 & & (t = thread)! = null & &! t.isInterrupted ()) {try {t.interrupt () is sent to the built-in thread of the work object in the worker collection } catch (SecurityException ignore) {} / / the number of rollback worker threads after failed task creation private void decrementWorkerCount () {/ / you can see that it is a recursive reduction operation, a loop reduction operation, and does not exit the loop do {} while (! CompareAndDecrementWorkerCount (ctl.get ());}

Flow chart:

3. Task worker thread execution parsing (runWorker)

Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); / / get the task Runnable task = w.firstTask; / / and then leave it empty to prevent repeated execution of w.firstTask = null; / / this place is not releasing the lock, but changing the default state (- 1) to 0 to allow interruption. W.unlock (); boolean completedAbruptly = true; try {/ / if the built-in task is NULL, the task processing will be popped out of the queue, and the empty queue will block or timeout block. While (task! = null | | (task = getTask ())! = null) {w.lock () / * double check * first check if it is > = STOP status * the second check gets the current thread interrupt signal (the static method clears the interrupt signal) and determines whether > = STOP status is satisfied * continue to check whether the thread is interrupted. Uninterrupted will interrupt the thread * / if ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & &! wt.isInterrupted () wt.interrupt () The hook function beforeExecute (wt, task) before the try {/ / subimplementation class executes the task; Throwable thrown = null; try {/ / execute the task passed in by execute, or the task that execute joins the queue task.run () } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error (x) } finally {/ / the hook function afterExecute (task, thrown) after the subimplementation class executes the task;}} finally {task = null / / each work object records how many tasks are handled by the thread corresponding to worker (regardless of whether there is an exception or not), but if a task throws an exception, the thread will release / / for example, the queue capacity is 80, and the maximum number of working threads in the pool is 20, and then the queue is full. It is highly likely that each thread is performing the initialized built-in task zhi W. completedTaskscomplete + W.unlock ();}} / / if an exception occurs during task execution, the exit processing after the release of the completedAbruptly = false;} finally {/ / thread will not be performed, and the result of the task execution and the work object will be passed to the function. ProcessWorkerExit (w, completedAbruptly) }} / / the pre-hook function protected void beforeExecute (Thread t, Runnable r) {} / / for the task execution of the runWorker of the subclass implementation class protected void afterExecute (Thread t, Runnable r) {} private void processWorkerExit (Worker w, boolean completedAbruptly) {/ / uncompleted flag, roll back the quantity first, such as work task execution exception, or turn on core thread timeout configuration The queue Wake up if (completedAbruptly) decrementWorkerCount () was not received at the specified time Final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {/ / holds the lock, counts the number of completions in the work object, accumulates the cumulative variable completedTaskCount + = w.completedTasks in the thread pool; / / the collection removes the work object workers.remove (w);} finally {mainLock.unlock () } / / try to interrupt. There are tasks in the queue in RUNNING or SHUTDOWN status. Ignore this operation. This method is described in detail below: tryTerminate (); int c = ctl.get (); / * 1. If the STOP status is above the value, the operation skips * 2. This step is mainly judged by the thread processing the result of the task. if it is an abnormal exit, directly create a processing thread of the empty task * 3. If the normal thread processes the released thread and determines whether allowCoreThreadTimeOut is true, if so and the queue is empty, it is possible that the thread is released because the thread timed out and did not get the task, then the return of all threads is returned to release directly without creating a thread, otherwise, check whether there are any tasks in the queue that are not finished (if there is a task, at least one thread is required, if it is the last thread) You need to create another empty task processing thread, which will be rotated by the queue pop-up task. If allowCoreThreadTimeOut is the default false, determine whether the number of core threads exceeds and release the thread directly. Otherwise, you need to create another empty task processing thread * * / if (runStateLessThan (c, STOP)) {if (! completedAbruptly) {int min = allowCoreThreadTimeOut? 0: corePoolSize. If (min = = 0 & &! WorkQueue.isEmpty () min = 1; if (workerCountOf (c) > = min) return;} / / create a non-core empty task thread to process tasks in the queue addWorker (null, false) }} final void tryTerminate () {/ / spin attempts to stop the thread pool, provided that it is either in a non-RUNNING state or not (the queue is not empty in the SHUTDOWN state), otherwise it directly jumps out of the loop for (;) {int c = ctl.get () If (isRunning (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN & &! WorkQueue.isEmpty ()) return; / / if the number of active threads is > 0, it will be fetched from the worker list from the first one to the uninterrupted worker thread, and then the interrupt signal if (workerCountOf (c)! = 0) {interruptIdleWorkers (ONLY_ONE); return will be sent to that thread. } final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {/ / after holding the lock, try to reverse the shutdown or stopz state to the tidying state if (ctl.compareAndSet (c, ctlOf (TIDYING, 0)) {try {terminated () } finally {ctl.set (ctlOf (TERMINATED, 0)); / / Wake up threads blocked in the condition queue during an attempt to tryTerminate termination.signalAll ();} return }} finally {mainLock.unlock ();} / continue to spin this operation}} this is the end of the introduction of "what is ThreadPoolExecutor". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report