In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to parse ThreadPoolExecutor". In daily operation, I believe many people have doubts about how to analyze ThreadPoolExecutor. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful for you to answer the doubts of "how to analyze ThreadPoolExecutor". Next, please follow the editor to study!
Why use thread pools?
Do you have any doubts about why you use thread pools? You might say, I can reuse threads that have been created; threads are heavyweight objects, and thread pools are best managed to avoid frequent creation and destruction.
There's nothing wrong with it. Everyone knows it very well.
However, there is one important point about using thread pools: you can control the amount of concurrency. If the amount of concurrency is too large, resulting in an increase in the consumption of resources, it will certainly not work to bring down the server directly.
Several parameters that can't be bypassed.
When it comes to ThreadPoolExecutor, then your little head will definitely think of a few parameters. Let's take a look at the source code (I'll just put the method with 7 parameters):
Public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Let's look at it separately:
CorePoolSize:
The number of core threads, there are two kinds of threads in the thread pool, core threads and non-core threads. The core thread in the thread pool, even if it does nothing, remains in the thread pool unless the allowCoreThreadTimeOut parameter is set
MaximumPoolSize:
The maximum number of threads that a thread pool can create. This value = number of core threads + number of non-core threads
KeepAliveTime & unit:
Thread pools can undo threads, so when will they be undone? If a thread does not perform a task for a period of time, it means that the thread is idle. Can it be cancelled?
So, if a thread is not a core thread and has not been working during the period of keepAliveTime & unit, then I am sorry to have to ask you to leave. Even if the core thread is idle, it will not be cleared from the thread pool. There is no way to make it a core thread.
WorkQueue:
Work queue, which maintains Runnable task objects waiting to be executed
Several commonly used queues: LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue, DelayQueue
As you all know, the coding specification of the big factory does not recommend using Executors. One of the most important reasons is that many methods provided by Executors use unbounded LinkedBlockingQueue by default. Under high load, unbounded queues can easily lead to OOM, and OOM will make all requests impossible to process. So when using bounded queues, it is strongly recommended to use bounded queues, because if you use bounded queues, when there are too many threads. It will take the rejection strategy.
ThreadFactory:
Create a factory for threads, which is used to create threads in batches. If not specified, a default thread factory is created
Handler:
Reject the processing policy. At workQueue, it is said that if a bounded queue is used, the deny processing strategy works when the number of threads is greater than the maximum number of threads.
There are four common processing strategies:
-AbortPolicy: the default reject policy, which discards the task and throws a RejectedExecutionException exception-CallerRunsPolicy: the thread that submitted the task executes the task itself-DiscardOldestPolicy: discard the new task directly, and there is no exception thrown-DiscardOldestPolicy: discard the oldest task and then add the new task to the work queue
The default reject policy is AbortPolicy, which throw RejectedExecutionException exceptions, but this is a run-time exception, and the compiler does not force catch for run-time exceptions, so it is easier to ignore errors.
So, if the task handled by the thread pool is very important, try to customize your own reject policy.
Several states of the thread pool
In the source code, you can clearly see that the thread pool has five states:
Private static final int RUNNING =-1 = CAPACITY | | wc > = (core? CorePoolSize: maximumPoolSize) return false; / / CAS operation increases the value of workerCount. If you successfully jump out of the loop if (compareAndIncrementWorkerCount (c)) break retry; c = ctl.get () / / Re-read ctl / / determine whether the thread pool state has changed, and if so, retry if (runStateOf (c)! = rs) continue retry; / / else CAS failed due to workerCount change; retry inner loop}} / / workerCount. After the addition is successful, start to go to the following code boolean workerStarted = false; boolean workerAdded = false; Worker w = null Try {/ / create a worker object w = new Worker (firstTask); / / instantiate a Thread object final Thread t = w. Thread; if (t! = null) {/ / the next operation needs to be locked for final ReentrantLock mainLock = thread () Try {/ / Recheck while holding lock. / / Back out on ThreadFactory failure or if / / shut down before lock acquired. Int rs = runStateOf (ctl.get ()); if (rs)
< SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将任务线程添加到线程池中 workers.add(w); int s = workers.size(); if (s >LargestPoolSize) largestPoolSize = s; workerAdded = true;}} finally {mainLock.unlock ();} if (workerAdded) {/ / start the task thread and start the task t.start (); workerStarted = true } finally {if (! WorkerStarted) / / if the task thread fails to start, there are two main things to do in calling the addWorkerFailed / / addWorkerFailed method: remove the thread from the thread pool; subtract the value of workerCount by 1 addWorkerFailed (w);} return workerStarted;}
Worker class
In addWorker, it is mainly for the Worker class to do some corresponding processing. Worker inherits AQS and implements the Runnable interface.
The thread pool maintains HashSet, a HashSet made up of worker objects
Private final HashSet workers = new HashSet ()
Worker inherits AQS mainly uses the AQS exclusive lock mechanism to identify whether the thread is idle; in addition, worker also implements the Runnable interface, so it itself is a thread task, in the constructor to create a thread, the thread's task is its own this. Thread = getThreadFactory () .newThread (this)
Let's take a look at the source code inside:
Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ * * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. * / private static final long serialVersionUID = 6138294804551838833L; / / final Thread thread; / / worker incoming task Runnable firstTask; / * * Per-thread task counter * / volatile long completedTasks; / * Creates with given first task and thread from ThreadFactory. * @ param firstTask the first task (null if none) * / Worker (Runnable firstTask) {/ / set state to-1 to prevent worker from being interrupted by setState (- 1) before execution; / / inhibit interrupts until runWorker this.firstTask = firstTask; / / create a thread to execute task this.thread = getThreadFactory () .newThread (this) } / * * Delegates main run loop to outer runWorker * / public void run () {runWorker (this);} / / Lock methods / The value 0 represents the unlocked state. / / The value 1 represents the locked state. Protected boolean isHeldExclusively () {return getState ()! = 0;} protected boolean tryAcquire (int unused) {if (compareAndSetState (0,1)) {setExclusiveOwnerThread (Thread.currentThread ()); return true;} return false;} protected boolean tryRelease (int unused) {setExclusiveOwnerThread (null) SetState (0); return true;} public void lock () {acquire (1);} public boolean tryLock () {return tryAcquire (1);} public void unlock () {release (1);} public boolean isLocked () {return isHeldExclusively ();} void interruptIfStarted () {Thread t If (getState () > = 0 & & (t = thread)! = null & &! t.isInterrupted ()) {try {t.interrupt ();} catch (SecurityException ignore) {}}
RunWorker
When the worker class executes the run method, it actually calls the runWorker method
Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = W. firstTask; w.firstTask = null; / / allow interrupt w.unlock (); / / allow interrupts boolean completedAbruptly = true Try {/ / determines whether the task is empty, if not, execute directly / / if the task is empty, call the getTask () method, take the new task from the workQueue and execute while (task! = null | | (task = getTask ())! = null) {/ / lock to prevent w.lock () from being interrupted by other threads / / If pool is stopping, ensure thread is interrupted; / / if not, ensure thread is not interrupted. This / / requires a recheck in second case to deal with / / shutdownNow race while clearing interrupt / / checks the state of the thread pool. If the thread pool is in stop state, the current thread if ((runStateAtLeast (ctl.get () needs to be interrupted. STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & &! wt.isInterrupted () wt.interrupt () Try {/ / execute beforeExecute beforeExecute (wt, task); Throwable thrown = null; try {/ / execute task task.run () } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error (x) } finally {/ / execute afterExecute method afterExecute (task, thrown);}} finally {/ / set task to null, loop operation task = null; W. completedTasksgiving + / / release lock w.unlock ();}} completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}}
In the runWorker method, it first executes the task that existed when the worker was created. After executing this task, the worker is not destroyed, but in the while loop, worker will constantly call the getTask method to get the task from the blocking queue and then call task. Run () to execute the task, thus achieving the goal of reusing threads. Through the loop condition while (task! = null | | (task = getTask ())! = null), you can see that as long as the return value of the getTask method is not null, the loop will continue, and the thread will always execute, thus achieving the purpose of thread reuse.
GetTask
Let's look at the implementation of the getTask method:
Private Runnable getTask () {boolean timedOut = false; / / Did the last poll () time out? For (;;) {int c = ctl.get (); int rs = runStateOf (c); / / Check if queue empty only if necessary. If (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {decrementWorkerCount (); return null;} int wc = workerCountOf (c) The / / Are workers subject to culling? / allowCoreThreadTimeOut variable defaults to false, that is, the core thread will not be destroyed even if it is idle / / if it is true and the core thread is idle within the keepAliveTime, it will be destroyed boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize / / if the number of running threads is greater than the maximum number of threads, but the cache queue is empty, the number of worker is decremented / / if there is a setting that allows threads to time out or the number of threads exceeds the number of core threads, and the thread has no poll to the task within the specified time and the queue is listed as empty The number of worker is also decreasing if ((wc > maximumPoolSize | | (timed & & timedOut)) & & (wc > 1 | | workQueue.isEmpty () {if (compareAndDecrementWorkerCount (c)) return null Continue } try {/ / if timed is true, the poll method of workQueue will be called / / the timeout will be keepAliveTime. If the keepAliveTime length is exceeded, poll will return null / / if returned as null In runWorker, / / while (task! = null | | (task = getTask ())! = null) the loop condition is broken, thus jumping out of the loop, and the thread finishes executing / / if timed is false (allowCoreThreadTimeOut is false) And wc > corePoolSize is false) / / will call the take method of workQueue to block until the current / / when a task joins the queue, the thread is awakened, the take method returns the task, and starts executing Runnable r = timed? WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); if (r! = null) return r; timedOut = true;} catch (InterruptedException retry) {timedOut = false;}
The source code analysis is almost clear at this point.
Thread reuse is mainly reflected in the while loop in the runWorker method. In the while loop, worker will constantly call the getTask method, while in the getTask method, if there is no task in the task queue, if the thread is the core thread, it will always be stuck in the workQueue. The take method, which is blocked and suspended at this time, does not consume CPU resources until the task is obtained and returned to true, and the task is obtained in runWorker to continue to execute the task, thus achieving thread reuse.
At this point, the study on "how to analyze ThreadPoolExecutor" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.