In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the knowledge of "how Java thread pool works". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
How thread pools work
First, let's take a look at how thread pools are handled when a new task is submitted to the thread pool.
1. The thread pool determines whether all the threads in the core thread pool are performing tasks. If not, create a new worker thread to execute the task. If all the threads in the core thread pool are performing tasks, perform the second step.
2. The thread pool determines whether the work queue is full. If the work queue is not full, the newly submitted task is stored in this work queue to wait. If the work queue is full, perform the third step
3. The thread pool determines whether all the threads in the thread pool are working. If not, a new worker thread is created to execute the task. If it is full, leave it to the saturation strategy to handle the task
Thread pool saturation strategy
The saturation strategy for thread pools is mentioned here, so let's briefly introduce what saturation strategies are available:
AbortPolicy
The default blocking policy for the Java thread pool, do not perform this task, and directly throw a runtime exception, keep in mind that ThreadPoolExecutor.execute requires try catch, otherwise the program will exit directly.
DiscardPolicy
Direct abandonment, task not executed, empty method
DiscardOldestPolicy
Discard one of head's tasks from the queue and execute the task again.
CallerRunsPolicy
Executing this command in the thread calling execute will block the entry
User-defined rejection policy (most commonly used)
Implement RejectedExecutionHandler and define the policy pattern yourself
Let's take ThreadPoolExecutor as an example to show the workflow flow chart of the downline pool.
1. If there are fewer threads currently running than corePoolSize, create a new thread to execute the task (note that performing this step requires a global lock).
2. If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.
3. If the task cannot be added to the BlockingQueue (the queue is full), create a new thread in the non-corePool to process the task (note that performing this step requires a global lock).
4. If creating a new thread will cause the currently running thread to exceed the maximumPoolSize, the task will be rejected and the RejectedExecutionHandler.rejectedExecution () method will be called.
The overall design idea for ThreadPoolExecutor to take the above steps is to avoid acquiring global locks as much as possible when executing the execute () method (which would be a serious scalable bottleneck). After ThreadPoolExecutor finishes warm-up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute () method calls execute step 2, which does not need to acquire a global lock.
Source code analysis of key methods
Let's take a look at the source code that the core method adds to the thread pool method execute:
/ Executes the given task sometime in the future. The task / / may execute in a new thread or in an existing pooled thread. / If the task cannot be submitted for execution, either because this / / executor has been shutdown or because its capacity has been reached, / / the task is handled by the current {@ code RejectedExecutionHandler}. / @ param command the task to execute / / @ throws RejectedExecutionException at discretion of / / {@ code RejectedExecutionHandler}, if the task / / cannot be accepted for execution / / @ throws NullPointerException if {@ code command} is null / / public void execute (Runnable command) {if (command = = null) throw new NullPointerException () / Proceed in 3 steps: / 1. If fewer than corePoolSize threads are running, try to / / start a new thread with the given command as its first / / task. The call to addWorker atomically checks runState and / / workerCount, and so prevents false alarms that would add / / threads when it shouldn't, by returning false. / / determine whether the current number of threads is less than corePoolSize. If so, use the input task to create a new thread through the addWord method. / / if the new thread can be completed, the exexute method ends. Successfully submitted the task / / 2. If a task can be successfully queued, then we still need / / to double-check whether we should have added a thread / / (because existing ones died since last checking) or that / / the pool shut down since entry into this method. So we / / recheck state and if necessary roll back the enqueuing if / / stopped, or start a new thread if there are none. / / the task submission is not completed in the first step; after the status is running and whether the task can be successfully joined to the work queue, check is performed again. If the status / / becomes non-running after the task is queued (it is possible that the thread pool shutdown is executed here), of course / / reject is required in the non-running state. Then determine whether the current number of threads is 0 (it is possible that the number of threads becomes 0). If so, add a new thread; / / 3. If we cannot queue task, then we try to add a new / / thread. If it fails, we know we are shut down or saturated / / and so reject the task. / / if you cannot join the task to the work queue, you will try to add a new thread using the task. If it fails, the thread pool is already shutdown or the thread pool / / has reached saturation, so reject this other task / / int c = ctl.get (); / / the number of working threads is less than the number of core threads if (workerCountOf (c))
Let's move on to how addWorker is implemented:
Private boolean addWorker (Runnable firstTask, boolean core) {/ / java tag retry: / / endless loop for (;;) {int c = ctl.get (); / / get the current thread state int rs = runStateOf (c); / / Check if queue empty only if necessary. / / this logical judgment can be changed to / / rs > = shutdown & & (rs! = shutdown | | firstTask! = null | | workQueue.isEmpty ()) / / logical judgment can be divided into the following situations: do not accept new tasks / / 1, rs > shutdown:-- do not accept new tasks / / 2, Rs > = shutdown&&firstTask! = null:-- does not accept new tasks / / 3, rs > = shutdown&& workQueue.isEmppty:-- does not accept new tasks / / logical judgment is not valid / / 1, rs==shutdown&&firstTask! = null: do not accept new tasks at this time However, tasks in the queue are still executed / / 2, rs==shotdown&&firstTask = = null: addWork (null,false) / / prevents there are no active threads in the SHUTDOWN state, but there are still tasks in the queue that are not executed. / / add a null task because in the SHUTDOWN state, the thread pool no longer accepts new tasks if (rs > = SHUTDOWN & &! (rs = = SHUTDOWN & & firstTask = = null &! WorkQueue.isEmpty ()) return false; / / endless loop / / if the thread pool state is RUNNING and there are tasks to be executed in the queue for (;) {/ / get the number of threads in the thread pool int wc = workerCountOf (c) / / if the capacity is exceeded or the maximum thread pool capacity is no longer accepting a new task if (wc > = CAPACITY | | wc > = (core? CorePoolSize: maximumPoolSize) return false; / / increase the number of worker threads if (compareAndIncrementWorkerCount (c)) / / Pop out retry break retry; c = ctl.get () / / Re-read ctl / / if the state of the thread pool changes, re-cycle if (runStateOf (c)! = rs) continue retry; / / else CAS failed due to workerCount change; retry inner loop}} / / to indicate that the number of worker threads increased successfully boolean workerStarted = false Boolean workerAdded = false; Worker w = null; try {final ReentrantLock mainLock = this.mainLock; w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {/ / locked mainLock.lock (); try {/ / Recheck while holding lock. / / Back out on ThreadFactory failure or if / / shut down before lock acquired. Int c = ctl.get (); int rs = runStateOf (c); / / RUNNING status | | Clean the remaining tasks in the queue if (rs if (t.isAlive ()) / / precheck that t is startable throw new IllegalThreadStateException ()) in SHUTDONW status / / add newly started threads to the thread pool workers.add (w); / / update the number of thread pool threads and do not exceed the maximum value int s = workers.size (); if (s > largestPoolSize) largestPoolSize = s WorkerAdded = true;}} finally {mainLock.unlock () } / / start the newly added thread, which first executes firstTask, and then constantly fetches tasks from the queue to execute if (workerAdded) {/ / execute ThreadPoolExecutor's runWoker method t.start (); workerStarted = true } finally {/ / thread startup fails, remove w from wokers and decrement wokerCount if (! WorkerStarted) / / decreasing wokerCount triggers the tryTerminate method addWorkerFailed (w);} return workerStarted;}
AddWorker is followed by runWorker. The first startup will execute the task firstTask; passed in by initialization, and then fetch the task from workQueue for execution. If the queue is empty, wait for keepAliveTime for such a long time.
Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = W. firstTask; w.firstTask = null; / / allow interrupt w.unlock (); / / allow interrupts boolean completedAbruptly = true Try {/ / if getTask returns null, workerCount will be decremented in getTask. If there is an exception, the decrement operation will process while in processWorkerExit (task! = null | | (task = getTask ())! = null) {w.lock (); / / If pool is stopping, ensure thread is interrupted; / / if not, ensure thread is not interrupted. This / / requires a recheck in second case to deal with / / shutdownNow race while clearing interrupt if ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & &! wt.isInterrupted () wt.interrupt () Try {beforeExecute (wt, task); Throwable thrown = null; try {task.run ();} catch (RuntimeException x) {thrown = x; throw x } catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error (x);} finally {afterExecute (task, thrown) }} finally {task = null; W.completedTaskscomplete; w.unlock ();}} completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}}
Let's take a look at how getTask works.
Private Runnable getTask () {boolean timedOut = false; / / Did the last poll () time out? / / endless loop retry: for (;;) {/ / get thread pool status int c = ctl.get (); int rs = runStateOf (c); / / Check if queue empty only if necessary. / / 1.rs > SHUTDOWN, so rs is at least equal to STOP. At this time, tasks in the queue are no longer processed / / 2.rs = SHUTDOWN, so rs > = STOP is definitely not true. In this case, tasks in the queue need to be processed unless the queue is listed as empty / / both cases will return null to let runWoker exit the while loop, that is, the current thread ends. So you must have decrement if (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {/ / decreasing workercount value decrementWorkerCount () Return null;} / / whether to set the timeout boolean timed; / / Are workers subject to culling? / / 1.RUNING status / / 2.SHUTDOWN status when fetching tasks from the queue, but there are still tasks in the queue that need to be executed for (;;) {int wc = workerCountOf (c) / / 1.core thread is allowed to time out, then threads exceeding corePoolSize must have timeouts / / 2.allowCoreThreadTimeOut = = false & & wc > / / corePoolSize. Core thread will not be reclaimed even if it is idle, and only threads exceeding it will timed = allowCoreThreadTimeOut | | wc > corePoolSize | / / We can see from addWorker that the general wc will not be greater than maximumPoolSize, so we are more concerned about the situation in the second half of the sentence: / / 1. TimedOut = = false executes the loop for the first time The task taken from the queue does not return for the null method or / / poll has an exception. Retry / / 2.timeOut = = true & & timed = = / / false: look at the following code workerQueue.poll timeout timeOut is true, / / and timed is false These two conditions cannot be true at the same time (since there is a timeout, the timed must be true) / / so the timeout does not continue but ends the thread by return null. If (wc break; / / workerCount decrement, ending the current thread if (compareAndDecrementWorkerCount (c)) return null; c = ctl.get () / / Re-read ctl / / the thread pool status needs to be rechecked, because during the above operation, the thread pool may be SHUTDOWN if (runStateOf (c)! = rs) continue retry; / / else CAS failed due to workerCount change; retry inner loop} try {/ / 1. Fetching tasks from the queue with the specified timeout / / 2.core thread No timeout Runnable r = timed? WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); if (r! = null) return r; timedOut = true;// timeout} catch (InterruptedException retry) {timedOut = false;// thread interrupted retry}
Let's take a look at how processWorkerExit works
Private void processWorkerExit (Worker w, boolean completedAbruptly) {/ / normal getTask method workerCount of runWorker has been reduced by one if (completedAbruptly) decrementWorkerCount (); final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {/ / completedTasks completedTaskCount + = w.completedTasks of cumulative threads / / remove the thread workers.remove (w) that timed out or encountered an exception from the thread pool;} finally {mainLock.unlock ();} / / attempt to stop the thread pool tryTerminate (); int c = ctl.get () / / runState is RUNNING or SHUTDOWN if (runStateLessThan (c, STOP)) {/ / Thread is not abnormally terminated if (! completedAbruptly) {/ / minimum idle number of thread pool, core thread timeout is allowed to be 0, otherwise it is corePoolSize int min = allowCoreThreadTimeOut? 0: corePoolSize / / if min = = 0 but the queue is not empty, make sure there is 1 thread to execute the task in the queue if (min = 0 & &! workQueue.isEmpty ()) min = 1; / / if the thread pool is not empty, then don't worry about if (workerCountOf (c) > = min) return. / / replacement not needed} / / 1. Thread exited abnormally / / 2. The thread pool is empty, but there are still tasks in the queue that have not been executed. See how the addWoker method handles this situation: addWorker (null, false);}}
TryTerminate
An attempt is made in the processWorkerExit method to call tryTerminate to terminate the thread pool. This method is executed after any action that may cause the thread pool to terminate, such as removing tasks from the queue in a reduced wokerCount or SHUTDOWN state.
Final void tryTerminate () {for (;;) {int c = ctl.get (); / / the following status is returned directly: / / 1. The thread pool is still in RUNNING state / / 2.SHUTDOWN but the task queue is not empty / / 3.runState > = TIDYING thread pool has stopped or is stopping if (isRunning (c) | | runStateAtLeast (c, TIDYING) | | (runStateOf (c) = = SHUTDOWN & &! workQueue.isEmpty ()) return / / the following logic can only be continued in the following cases: end the thread pool. / / 1.SHUTDOWN status, when new tasks are no longer accepted and the task queue is empty / / 2.STOP status, when the shutdownNow method / / workerCount is not equal to 0, the thread pool cannot be stopped, and the thread is in an idle waiting state / / the thread needs to be interrupted to "wake up" so that the awakened thread can continue to process the shutdown signal. If (workerCountOf (c)! = 0) {/ / Eligible to terminate / / runWoker method w.unlock is so that it can be interrupted, and the getTask method also handles interrupts. / / ONLY_ONE: only one thread needs to be interrupted to process the shutdown signal. InterruptIdleWorkers (ONLY_ONE); return;} final ReentrantLock mainLock = this.mainLock; mainLock.lock () Try {/ / enter TIDYING status if (ctl.compareAndSet (c, ctlOf (TIDYING, 0)) {try {/ / subclass overload: some resource cleanup work terminated () } finally {/ / TERMINATED status ctl.set (ctlOf (TERMINATED, 0)); / / continue awaitTermination termination.signalAll ();} return }} finally {mainLock.unlock (); / / else retry on failed CAS}}
The shutdown method sets runState to SHUTDOWN and terminates all idle threads. The shutdownNow method sets runState to STOP. Unlike the shutdown method, this method terminates all threads. The main difference is that shutdown calls the interruptIdleWorkers method, while shutdownNow actually calls the interruptIfStarted method of the Worker class:
Their implementation is as follows:
Public void shutdown () {final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {checkShutdownAccess (); / / Thread pool status is set to SHUTDOWN. If it is at least this state, then return advanceRunState (SHUTDOWN) directly. / / notice here that all idle threads are interrupted: the thread waiting in runWorker is interrupted by → to enter the processWorkerExit → / / tryTerminate method to ensure that the remaining tasks in the queue are executed. InterruptIdleWorkers (); onShutdown (); / / hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock ();} tryTerminate ();} public List shutdownNow () {List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {checkShutdownAccess (); / / STOP status: new tasks are no longer accepted and tasks in the queue are no longer executed. AdvanceRunState (STOP); / / interrupts all threads interruptWorkers (); / / returns tasks in the queue that have not yet been executed. Tasks = drainQueue ();} finally {mainLock.unlock ();} tryTerminate (); return tasks;} private void interruptIdleWorkers (boolean onlyOne) {final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {for (Worker w: workers) {Thread t = w.thread / / w.tryLock can acquire the lock, which means that the thread is not running, because the execution of the task in runWorker will lock first, / / thus ensuring that the interrupted thread must be an idle thread. If (! t.isInterrupted () & & w.tryLock ()) {try {t.interrupt ();} catch (SecurityException ignore) {} finally {w.unlock ();}} if (onlyOne) break }} finally {mainLock.unlock ();}} void interruptIfStarted () {Thread t; / / initialization state = =-1 if (getState () > = 0 & & (t = thread)! = null & &! t.isInterrupted ()) {try {t.interrupt ();} catch (SecurityException ignore) {}} thread pool creation of thread pool
We can create a thread pool through ThreadPoolExecutor
/ * * @ param corePoolSize thread pool basic size, core thread pool size. If the active thread is less than corePoolSize, it will be created directly. If it is greater than or equal to, it will be added to the workQueue first. * create new threads only when the queue is full. When a task is submitted to the thread pool, the thread pool creates a thread to execute the task. Even if other idle basic threads are able to perform new tasks, threads are created, * when the number of tasks that need to be executed is greater than the basic size of the thread pool, it is no longer created. If the prestartAllCoreThreads () method of the thread pool is called, * the thread pool creates and starts all basic threads in advance. * @ param maximumPoolSize maximum number of threads, exceeding the maximum number of threads allowed to be created on the reject; thread pool. If the queue is full, * and the number of threads created is less than the maximum number of threads, the thread pool will create new threads to execute tasks * @ param keepAliveTime * the time for which the worker thread of the thread pool is idle. Therefore, if there are many tasks and the execution time of each task is relatively short, you can increase the utilization of threads * @ param unit units of thread activity hold time): optional units are days (DAYS), hours (HOURS), minutes (MINUTES), * milliseconds (MILLISECONDS), microseconds (MICROSECONDS, 1/1000 milliseconds) and nanoseconds (NANOSECONDS, 1/1000 microseconds) * @ param workQueue work queue The worker threads in the thread pool are constantly getting tasks from this work queue to execute * / public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {/ / threadFactory is used to set up the factory where the thread is created You can set a more meaningful name this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory (), defaultHandler) for each created thread through the thread factory. } submit tasks to the thread pool
You can use two methods to submit tasks to the thread pool: the execute () and submit () methods. The execute () method is used to submit a task that does not require a return value, so it is not possible to determine whether the task was successfully executed by the thread pool. You can see from the following code that the task entered by the execute () method is an instance of the Runnable class.
ThreadsPool.execute (new Runnable () {@ Override public void run () {}})
The submit () method is used to submit tasks that require a return value. The thread pool returns an object of type future, which can be used to determine whether the task was executed successfully, and the return value can be obtained through the get () method of future. The get () method blocks the current thread until the task is completed, while using the get (long timeout,TimeUnit unit) method blocks the current thread for a period of time and then returns immediately, which may not finish the task.
Future future = executor.submit (harReturnValuetask); try {Object s = future.get ();} catch (InterruptedException e) {/ / handle interrupt exception} catch (ExecutionException e) {/ / handle task exception} finally {/ / close thread pool executor.shutdown ();} close thread pool
You can close the thread pool by calling the thread pool's shutdown or shutdownNow method. Their principle is to traverse the worker threads in the thread pool and then call the thread's interrupt method one by one to interrupt the thread, so a task that cannot respond to the interrupt may never be terminated. But there are some differences between them. ShutdownNow first sets the state of the thread pool to STOP, then attempts to stop all threads that are executing or pausing tasks, and returns the list of tasks waiting for execution, while shutdown simply sets the state of the thread pool to SHUTDOWN state, and then interrupts all threads that are not executing tasks.
Whenever either of these two close methods is called, the isShutdown method returns true. When all tasks are closed, the thread pool is closed successfully, and calling the isTerminaed method returns true. Which method should be called to close the thread pool should be determined by the nature of the task submitted to the thread pool. The shutdown method is usually called to close the thread pool, or the shutdownNow method can be called if the task does not have to be finished.
Reasonable configuration of thread pool
If you want to configure the thread pool reasonably, you must first analyze the characteristics of the task, which can be analyzed from the following angles.
1. The nature of tasks: CPU-intensive tasks, IO-intensive tasks and mixed tasks.
2. The priority of the task: high, medium and low.
3. The execution time of the task: long, medium and short.
4. Task dependence: whether it depends on other system resources, such as database connections.
Tasks of different nature can be handled separately with thread pools of different sizes. CPU-intensive tasks should be configured with as small threads as possible, such as configuring a thread pool of Ncpu+1 threads. Because IO-intensive task threads are not always executing tasks, you should configure as many threads as possible, such as 2*Ncpu. If a mixed task can be split into a CPU-intensive task and an IO-intensive task, as long as the execution time of the two tasks is not too different, then the throughput of the decomposed task will be higher than that of serial execution. If the execution time of the two tasks is too different, there is no need to decompose it. You can get the number of CPU of the current device through the Runtime.getRuntime (). AvailableProcessors () method. Tasks with different priorities can be handled using the priority queue PriorityBlockingQueue. It allows high-priority tasks to be executed first.
If high-priority tasks are submitted to the queue all the time, low-priority tasks may never be executed. Tasks with different execution times can be handed over to thread pools of different sizes, or priority queues can be used to allow tasks with short execution times to be executed first. Tasks that rely on database connection pooling, because threads need to wait for the database to return results after submitting the SQL, and the longer the wait, the longer the CPU idle time, so the larger the number of threads should be set to make better use of CPU.
It is recommended to use bounded queues. Bounded queues can increase the stability and early warning ability of the system, which can be larger as needed, such as thousands. Sometimes the queue and thread pool of the background task thread pool in our system are full, and we constantly throw exceptions that abandon the task. Through troubleshooting, we find that there is a problem with the database, which causes the execution of SQL to become very slow. Because all the tasks in the background task thread pool need to query and insert data from the database, all the worker threads in the thread pool are blocked and the tasks are overstocked in the thread pool. If we had set up an unbounded queue at that time, there would be more and more queues in the thread pool, which might fill up memory and make the whole system unavailable, not just background tasks. Of course, all tasks in our system are deployed with separate servers, and we use thread pools of different sizes to complete different types of tasks, but such problems can also affect other tasks.
Monitoring of thread pool
If the thread pool is widely used in the system, it is necessary to monitor the thread pool so that when a problem occurs, it can quickly locate the problem according to the usage of the thread pool. It can be monitored through the parameters provided by the thread pool, and the following properties can be used when monitoring the thread pool
TaskCount: the number of tasks that the thread pool needs to perform. CompletedTaskCount: the number of tasks completed by the thread pool during operation, less than or equal to taskCount. LargestPoolSize: the maximum number of threads ever created in the thread pool. From this data, you can know whether the thread pool has ever been full. If this value is equal to the maximum size of the thread pool, it indicates that the thread pool has been full. GetPoolSize: the number of threads in the thread pool. If the thread pool is not destroyed, the threads in the thread pool will not be destroyed automatically, so this size will only increase. GetActiveCount: gets the number of active threads.
Monitor by extending the thread pool. You can customize the thread pool by inheriting the thread pool, override the beforeExecute, afterExecute, and terminated methods of the thread pool, or execute some code before, after, and before the thread pool is closed to monitor. For example, monitor the average execution time, maximum execution time, and minimum execution time of a task.
This is the end of how Java Thread Pool works. Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.