Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the state of the thread pool and the number of worker threads

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article introduces the knowledge of "how to understand the state of the thread pool and the number of worker threads". Many people will encounter such a dilemma in the operation of actual cases. next, let the editor lead you to learn how to deal with these situations! I hope you can read it carefully and be able to achieve something!

Java provides several convenient ways to create thread pools, which can be easily created through these built-in api. In the Executors class in the java.util.concurrent package, the static methods are used to create thread pools:

NewFixedThreadPool (): create a thread pool with a fixed number of threads, and after all the tasks in the thread pool are executed, the idle threads will not be shut down.

NewSingleThreadExecutor (): create a thread pool with only one thread and will not be closed when idle.

NewCachedThreadPool (): create a cacheable pool of threads, the number of threads is Integer.MAX_VALUE, idle threads will be temporarily cached, threads will wait for 60s or no tasks will be joined, it will be shut down.

There are also some ways to create thread pools in the Executors class (new to jdk8), but now this touches my knowledge blind spot ~ ~

The above methods, in fact, create a ThreadPoolExecutor object as the return value, to understand the principle of the thread pool is mainly to analyze the ThreadPoolExecutor class.

The construction method of ThreadPoolExecutor:

Public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}

The construction method of ThreadPoolExecutor consists of the following parameters:

CorePoolSize: number of core threads, threads resident in the thread pool, even if there are no tasks to execute in the thread pool and will not be shut down.

MaximumPoolSize: maximum number of threads

KeepAliveTime: idle thread survival time

Unit: the unit of idle thread survival time

WorkQueue: work queue. If the thread pool is too busy, the new tasks need to be queued, and the excluded tasks will be placed in workQueue.

ThreadFactory: thread factory for creating threads

Handler:RejectedExecutionHandler instances are used for policies when there are no idle threads in the thread pool to execute a task, and there is no room for a task to be rejected in the workQueue.

Threads in ThreadPoolExecutor are collectively referred to as worker threads, but there is a small concept of core threads, which are specified by the parameter corePoolSize, such as corePoolSize setting 5, then there will be 5 threads in the thread pool resident in the thread pool and will not be recycled, but there will be exceptions, if allowCoreThreadTimeOut is true idle for a period of time, it will also be closed.

The status of threads and the number of worker threads

The state and worker threads and the number of threads in a thread are represented by ctl and are an attribute of type AtomicInteger:

Private final AtomicInteger ctl = new AtomicInteger (ctlOf (RUNNING, 0))

The top four bits of ctl are the state of the thread, and the other digits are the number of worker threads, so the maximum number of worker threads in the thread is (2 ^ 29)-1.

There are five states in the thread pool:

RUNNING: receive new tasks and process tasks in the queue

SHUTDOWN: no new tasks can be added, but will continue to process tasks that have already been added

STOP: cannot add new tasks and will not continue to process tasks that have been added

TIDYING: all tasks have been terminated and the worker thread is 0

Execution of the TERMINATED:terminated () method is complete

The status code is defined as follows:

Private final AtomicInteger ctl = new AtomicInteger (ctlOf (RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE-3; private static final int CAPACITY = (1 corePoolSize is false is currently considered to be the core thread, calling workQueue.take () to wait for the task.

Shutdown of worker threads

Also in the runWorker method:

Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock (); / / allow interrupts boolean completedAbruptly = true; try {while (task! = null | | (task = getTask ())! = null) {task.run () } completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}}

CompletedAbruptly variable: marks whether the current worker thread completes execution normally or abnormally. CompletedAbruptly is false to determine that there are no executable tasks in the thread pool.

The above code is concise. A while loop ensures uninterrupted acquisition of tasks. No task can be executed (task is null) to exit the loop, and finally the processWorkerExit method is called:

Private void processWorkerExit (Worker w, boolean completedAbruptly) {if (completedAbruptly) / / If abrupt, then workerCount wasn't adjusted decrementWorkerCount (); final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {completedTaskCount + = w.completedTasks; workers.remove (w);} finally {mainLock.unlock ();} tryTerminate () Int c = ctl.get (); if (runStateLessThan (c, STOP)) {if (! completedAbruptly) {int min = allowCoreThreadTimeOut? 0: corePoolSize; if (min = = 0 & &! WorkQueue.isEmpty () min = 1; if (workerCountOf (c) > = min) return; / / replacement not needed} addWorker (null, false);}}

ProcessWorkerExit receives an Worker instance and a completedAbruptly variable. The general workflow of processWorkerExit:

To judge whether the current worker thread is completed abnormally, if it is to directly reduce the number of worker threads, simply correct the number of worker threads.

Increase the number of tasks completed and remove Worker from workers

TryTerminate () checks the state of the thread pool, because the thread pool can be closed late, and if you don't close immediately after you call the shutdown method, you have to wait for all the tasks to be executed, so call the tryTerminate () method here to try to call the terminated method.

Worker thread completion strategy

If a worker thread completes, the thread pool determines whether one needs to be restarted:

/ / determine the thread pool status if (runStateLessThan (c, STOP)) {if (! completedAbruptly) {/ / get the minimum number of worker threads int min = allowCoreThreadTimeOut? 0: corePoolSize; / / if the minimum number of worker threads is 0, but there are tasks in workQueue, then reset the minimum number of worker threads 1 if (min = 0 &! WorkQueue.isEmpty () min = 1; / / if the current number of worker threads is greater than or equal to the minimum number of worker threads, there is no need to start a new worker thread if (workerCountOf (c) > = min) return; / / replacement not needed} / / start a new worker thread addWorker (null, false);}

There are two processing strategies after the worker thread completes:

For worker threads that are completed abnormally, start a new replacement directly

For the normally completed worker thread, determine whether the current worker thread is sufficient, and if so, there is no need to start a new worker thread.

Note: the completion here means that the task execution of the worker thread is completed, and there are no tasks to get in workQueue.

Closure of thread pool

You can turn off the thread pool by using the shutdown method:

Public void shutdown () {final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {checkShutdownAccess (); advanceRunState (SHUTDOWN); interruptIdleWorkers (); onShutdown (); / / hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock ();} tryTerminate ();}

The first step of the shutdown method is to change the state of the thread pool, call the advanceRunState (SHUTDOWN) method, and change the current state of the thread pool to the SHUTDOWN,advanceRunState code as follows:

Private void advanceRunState (int targetState) {for (;;) {int c = ctl.get (); if (runStateAtLeast (c, targetState) | | ctl.compareAndSet (c, ctlOf (targetState, workerCountOf (c) break;}}

Then immediately call the interruptIdleWorkers () method, and interruptIdleWorkers () internally calls its overloaded method interruptIdleWorkers (boolean onlyOne) and the false passed by the onlyOne parameter to shut down the idle thread:

Private void interruptIdleWorkers () {interruptIdleWorkers (false);} private void interruptIdleWorkers (boolean onlyOne) {final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {for (Worker w: workers) {Thread t = w.thread If (! t.isInterrupted () & & w.tryLock ()) {try {t.interrupt ();} catch (SecurityException ignore) {} finally {w.unlock () }} if (onlyOne) break;}} finally {mainLock.unlock ();}}

The above code iterates through the Worker instance in workers and then calls the thread's interrupt () method.

What kind of thread is an idle worker thread?

As mentioned earlier, in getTask (), a thread blocks when getting a task from workQueue, and the blocked thread is idle.

Go back to the code in getTask () again:

Private Runnable getTask () {boolean timedOut = false; / / Did the last poll () time out? For (;) {/ / Check if queue empty only if necessary. If (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {decrementWorkerCount (); return null;}. Int wc = workerCountOf (c); / / Are workers subject to culling? Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize;. Try {Runnable r = timed? WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); if (r! = null) return r; timedOut = true;} catch (InterruptedException retry) {timedOut = false;}

Analyze again that there is a code block that captures InterruptedException in the code in getTask (). After the interruptIdleWorkers method interrupts the thread, getTask () catches the interrupt exception because there is a for loop outside, and then the code goes to the place where the thread pool state is determined:

If (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {decrementWorkerCount (); return null;}

The above code will judge the current thread pool state. If the status is greater than STOP or the status is equal to SHUTDOWN and workQueue is empty, return null,getTask () and return empty. Then the loop in runWorker will exit, and the task of the current worker thread is completed and can exit:

Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock (); / / allow interrupts boolean completedAbruptly = true; try {while (task! = null | | (task = getTask ())! = null) {task.run () } completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}} shutdownNow

In addition to the shutdown method that closes the thread pool, shutdownNow can also close the thread pool. The difference between the two is:

ShutdownNow will empty the tasks in the workQueue

ShutdownNow also aborts tasks that are currently running

ShutdownNow puts the thread into the STOP state, while shutdown () is the SHUTDOWN state

Public List shutdownNow () {List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {checkShutdownAccess (); advanceRunState (STOP); interruptWorkers (); tasks = drainQueue ();} finally {mainLock.unlock ();} tryTerminate (); return tasks;}

The basic flow of the above code is:

AdvanceRunState (STOP): make the thread pool STOP state, as in shutdown (), except that the status code used is STOP

InterruptWorkers (): consistent with shutdown ()

DrainQueue (): clear the queue

Is the mission aborted or continued?

The thread pool is in the STOP state after calling shutdownNow (), and then all worker threads are called the interrupt method. What happens if runWorker is still running?

There is a piece of code in runWorker, which is the important code for worker thread termination:

Final void runWorker (Worker w) {... While (task! = null | | (task = getTask ())! = null) {if ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & & wt.isInterrupted () wt.interrupt () Task.run ();}.

Focus on:

If ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & &! wt.isInterrupted () wt.interrupt ()

This if looks a bit difficult to understand, which roughly means: if the thread pool state is greater than or equal to STOP, interrupt the thread immediately, otherwise clear the thread's interrupt flag, that is, when the thread pool state is RUNNING and SHUTDOWN, the thread's interrupt flag will be cleared (the thread's interrupt code is in the interruptWorkers method) and can continue to execute the task.

Immediately after the above code is executed, the task.run () method is called, where we can determine whether the task is interrupted or not based on the thread's interrupt flag.

This is the end of "how to understand the state of the thread pool and the number of worker threads". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report