Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Java Thread Pool

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

Editor to share with you the example analysis of the Java thread pool, I believe that most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

Why do you need a thread pool

We know that the common way to create a thread is new Thread (), and each time new Thread () will recreate a thread, and thread creation and destruction will take time, which will not only consume system resources, but also reduce the stability of the system. There is an Executors in jdk1.5 's JUC package that enables the threads we create to be reused without frequently creating and destroying threads.

The thread pool first creates some threads whose collection is called the thread pool. Using the thread pool can improve performance very well. The thread pool creates a large number of idle threads when the system starts, and the program passes a task to the thread pool, and the thread pool starts a thread to execute the task. After execution, the thread does not die, but returns to the thread pool to become idle again, waiting for the next task to be executed.

Whatever it is, let's take a look at the time-consuming use of thread pools and new Thread ():

Public class ThreadPoolTest {static CountDownLatch latch = new CountDownLatch (100000); static ExecutorService es = Executors.newFixedThreadPool (4); public static void main (String [] args) throws InterruptedException {long timeStart = System.currentTimeMillis (); for (int I = 0; I

< 100000; i++) { newThread(); //executors(); } latch.await(); System.out.println(System.currentTimeMillis() - timeStart); es.shutdown(); } /** * 使用线程池 */ public static void executors() { es.submit(() ->

{latch.countDown ();});} / * * Direct new * / public static void newThread () {newThread (()-> {latch.countDown ();}) .start ();}}

For 100000 threads running at the same time, if using new is time-consuming:

It takes time to use thread pools:

In general, the proper use of thread pools can bring the following benefits:

1. Reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing created threads.

two。 Improve response speed. When a task arrives, it can be executed without waiting for the thread to be created.

3. Increase thread manageability. Threads are scarce resources, and thread pools can be used for unified allocation, tuning and monitoring.

Design idea of thread pool

Let's take a look at the thread pool idea, even if you don't know what a thread pool is, so we won't tell you a bunch of thread pool parameters right away. I tried a variety of ideas to explain its design ideas, but all of them were too official, but when I was looking for information, I saw a very easy-to-understand description on the blog. It goes like this. Imagine the production process of a factory:

There is a fixed group of workers in the factory, called regular workers, who complete the orders received by the factory. When the order increases and the regular workers are too busy, the factory will temporarily pile up the raw materials in the warehouse and deal with them when there are free workers (because the workers will not take the initiative to deal with the production tasks in the warehouse when they are free, so the dispatcher is required to schedule in real time). What if the orders are still increasing after the warehouse is full? The factory can only temporarily expand the recruitment of a group of workers to cope with the production peak, and these workers have to retire after the peak, so they are called temporary workers. At that time, after the recruitment of temporary workers was full (limited by station restrictions, there was an upper limit on the number of temporary workers), the subsequent orders had to be turned down painfully.

And the thread pool are mapped as follows:

Factory-thread pool

Order-Task (Runnable)

Regular worker-core thread

Temporary worker-ordinary thread

Warehouse-task queue

Dispatcher-getTask ()

GetTask () is a method to schedule tasks in the task queue to idle threads, which can be understood by source code analysis.

After mapping, the thread pool flow chart is as follows:

Working mechanism of thread pool

After understanding the thread pool design idea, we can summarize the working mechanism of the thread pool:

In the thread pool programming mode, the task is submitted to the entire thread pool rather than directly to a thread. After getting the task, the thread pool looks internally to see if there is an idle thread, and if so, hand the task over to some idle thread. If there are no idle threads, that is, the number of threads in the thread pool is greater than the core thread corePoolSize, then the task is added to the task queue workQueue. If the task queue is bounded and full, it will determine whether the number of threads in the thread pool is greater than the maximum number of threads maximumPoolSize. If so, a new thread will be created to execute the task, otherwise the decision policy handler will be executed if there are no idle threads.

Note: there are no threads in the thread pool at first, and when a task is submitted to the thread pool, a new thread is created to execute the task. A thread can perform only one task at a time, but can submit multiple tasks to a thread pool at the same time.

Parameters and usage of thread pool

The real implementation class of the thread pool is ThreadPoolExecutor, and the class integration relationship is as follows:

There are several construction methods for ThreadPoolExecutor, and you can master the most important ones, including 7 parameters:

Public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

CorePoolSize (required), number of core threads in the thread pool.

When a task is submitted, the thread pool creates a new thread to execute the task until the current number of threads equals corePoolSize.

If the current number of threads is less than corePoolSize, there are idle threads at this time, and the submitted task creates a new thread to execute the task.

If the current number of threads is equal to corePoolSize, the tasks that continue to be submitted are saved to the blocking queue, waiting to be executed.

If the thread pool prestartAllCoreThreads () method is executed, the thread pool creates and starts all core threads ahead of time.

MaximumPoolSize (required), the maximum number of threads allowed in the thread pool.

When the queue is full and the number of threads created is less than maximumPoolSize, the thread pool creates new threads to execute the task. In addition, for unbounded queues, this parameter can be ignored.

KeepAliveTime (required), thread survival hold time.

The amount of time that a thread continues to survive when there is no task to execute. By default, this parameter is useful only if the number of threads is greater than corePoolSize, that is, when a non-core thread has been idle for more than that time, the thread will be recycled. When the allowCoreThreadTimeOut parameter is set to true, the core thread is also recycled.

Unit (required), the time unit of the keepAliveTime.

WorkQueue (required), task queue.

A blocking queue used to hold tasks waiting to be executed. WorkQueue must be a BlockingQueue blocking queue. When the number of threads in the thread pool exceeds its corePoolSize, the thread enters the blocking queue and waits.

In general, we should try to use bounded queues as much as possible, because using unbounded queues as work queues has the following impact on thread pools.

When the number of threads in the thread pool reaches corePoolSize, the new task will wait in an unbounded queue, so the number of threads in the thread pool will not exceed corePoolSize.

Because of 1, maximumPoolSize will be an invalid parameter when using unbounded queues.

Because of 1 and 2, keepAliveTime will be an invalid parameter when using unbounded queues.

More importantly, using unbounded queue may deplete system resources, bounded queues help prevent resource exhaustion, and even if you use bounded queues, try to keep the queue size within an appropriate range. General use, ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue and so on.

ThreadFactory (optional), the factory that creates the thread.

Through the custom thread factory, you can set an identifiable thread name for each newly created thread. The thread created by threadFactory also uses the new Thread () method, and the thread name created by threadFactory has the same style: pool-m-thread-n (m is the number of the thread pool, n is the thread number in the thread pool).

Handler (optional), thread saturation strategy.

When the blocking queue is full and there are no idle worker threads, if you continue to submit the task, you must adopt a strategy to deal with the task, and the thread pool provides four strategies:

AbortPolicy, which directly throws an exception. Default policy.

CallerRunsPolicy, which uses the caller's thread to execute the task.

DiscardOldestPolicy, discard the first task in the blocking queue, and execute the current task.

DiscardPolicy, just discard the task.

Of course, you can also implement the RejectedExecutionHandler interface according to the application scenario and customize the saturation policy, such as logging or tasks that cannot be handled by persistent storage.

Status of the thread pool

ThreadPoolExecutor uses the high 3 bits of int to represent the thread pool state, and the low 29 bits to indicate the number of threads:

The source code is as follows

Private final AtomicInteger ctl = new AtomicInteger (ctlOf (RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE-3); newThread 29private static final int CAPACITY = (1 = 0 will interrupt the thread / / the specific method can be seen in interruptIfStarted () setState (- 1); / / inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory (). NewThread (this)) } / * * Delegates main run loop to outer runWorker * / public void run () {runWorker (this);} / / omit other code.}

As you can see from the constructor of Worker, the thread attribute is created through this, so the core thread of the thread pool is mainly created by the runWorker method in the run method:

RunWorker

The runWorker core thread executes logic.

Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; / / call unlock () so that the outside can interrupt w.unlock (); / / allow interrupts / / the thread exits, true is the cause of the task, and false is the thread's normal exit boolean completedAbruptly = true; try {/ / 1. If firstTask is not null, execute firstTask / / 2. If firstTask is null, call getTask () to get the task / / 3 from the queue. The characteristic of blocking queue is that when the queue is empty, the current thread will be blocked waiting for while (task! = null | | (task = getTask ())! = null) {w.lock () / / judge the status of the thread pool. If the thread pool is stopping, the current thread will be interrupted by if ((runStateAtLeast (ctl.get (), STOP) | | (Thread.interrupted () & & runStateAtLeast (ctl.get (), STOP) & & wt.isInterrupted () wt.interrupt () / / interrupt try {/ / if there is no content in this method, you can extend the implementation yourself, such as the thread pool monitoring beforeExecute (wt, task) mentioned above; Throwable thrown = null; try {/ / execute specific tasks task.run () } catch (RuntimeException x) {/ / Thread operation after exception thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error (x) } finally {/ / same as beforeExecute () afterExecute (task, thrown);}} finally {task = null;//help gc / / Statistics on the number of tasks currently completed by worker / / release the lock w.unlock ();}} completedAbruptly = false;} finally {/ / the processing thread exits. CompletedAbruptly indicates that the thread exits processWorkerExit (w, completedAbruptly) abnormally due to task exception.}} getTask

For the getTask () method, where tasks in the task queue are scheduled to idle threads, this method is very important, why is it important? It involves the thread pool that the interviewer often asks how to ensure that the core thread will not be destroyed and the idle thread will be destroyed?

Private Runnable getTask () {/ / determine whether the latest poll timed out / / poll: take away the first object in the BlockingQueue, boolean timedOut = false; / / Did the last poll () time out? For (;;) {int c = ctl.get (); int rs = runStateOf (c); / / Check if queue empty only if necessary. / * condition 1: thread pool status SHUTDOWN, STOP, TERMINATED status * condition 2: thread pool STOP, TERMINATED status or workQueue is empty * condition 1 and condition 2 are true, then workerCount-1 is returned, and null is returned * Note: condition 2 is that the thread pool considering the SHUTDOWN status will not accept tasks But still handle the task (as mentioned earlier) * / if (rs > = SHUTDOWN & & (rs > = STOP | | workQueue.isEmpty ()) {decrementWorkerCount ()) Return null;} int wc = workerCountOf (c); / / Are workers subject to culling? / * * the purpose of this attribute is to determine whether the current thread allows timeouts: * 1.allowCoreThreadTimeOut * if it is false (default), the core thread remains active even when idle. * if true, the core thread uses keepAliveTime to time out to wait for work. * 2.wc > corePoolSize * whether the current thread has exceeded the number of core threads. * / boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; / * * determine whether the current thread can exit: * 1.wc > maximumPoolSize | | (timed & & timedOut) * wc > maximumPoolSize = true, indicating that the total number of worker threads is greater than the maximum number of threads in the thread pool. * timed & & timedOut = true, indicating that the current thread allows timeout and has timed out. * 2.wc > 1 | | workQueue.isEmpty () * if the total number of worker threads is greater than 1 or the task queue is empty, subtract 1 from the number of threads through CAS, and return null * / if ((wc > maximumPoolSize | | (timed & & timedOut)) & & (wc > 1 | | workQueue.isEmpty () {if (compareAndDecrementWorkerCount (c) return null; continue) } try {/ * * 1.poll (long timeout, TimeUnit unit): take an object at the head of the queue from BlockingQueue. * if within a specified period of time, once data is available in the queue, the data in the queue will be returned immediately. Otherwise, there is no data desirable until the time expires, and the return fails. * * 2.take (): take the first object in the BlockingQueue. If BlockingQueue is empty, block it from entering the waiting state until new data is added to the BlockingQueue. * if timed is true, timeout pull is done through the poll () method, and no valid task is waited within the keepAliveTime time, then null is returned. * * if timed is false, blocking pull through take () will block until the next valid task is available (usually not null). * / Runnable r = timed? WorkQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS): workQueue.take (); if (r! = null) return r; / pull the task from the task queue through the poll () method as null timedOut = true;} catch (InterruptedException retry) {timedOut = false;}

① for the code below getTask (), this logic is mostly for non-core threads:

Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize;if ((wc > maximumPoolSize | | (timed & & timedOut)) & & (wc > 1 | | workQueue.isEmpty ()) {if (compareAndDecrementWorkerCount (c)) return null; continue;}

② Let's read this code like this. When the number of worker threads is greater than the core thread corePoolSize, enter the second if statement in the execute () method:

If (isRunning (c) & & workQueue.offer (command)) {int recheck = ctl.get (); if (! IsRunning (recheck) & & remove (command) reject (command); else if (workerCountOf (recheck) = = 0) addWorker (null, false);}

At this point, the total number of thread pools has exceeded corePoolSize but less than maximumPoolSize. When the task queue is full, non-core threads will be added through addWorker (task,false).

In the case of high concurrency, there will certainly be extra threads, that is, wc > maximumPoolSize in ①, but what about these extra threads? will they be recycled? If workQueue.poll does not get a valid task, then the logic in ① is just the opposite of addWorker (task,false), reducing non-core threads through CAS, making the total number of worker threads tend to corePoolSize.

If for non-core threads, the last loop gets the task object as null, by default allowCoreThreadTimeOut = false, so timed = true in getTask (). If no task is obtained, then timedOut = true. This cycle can easily satisfy that timed & & timedOut is true. At this time, getTask () returns null will cause the Worker#runWorker () method to jump out of the endless loop, and then execute the processWorkerExit () method to handle the subsequent work. The Worker corresponding to the non-core thread becomes a free object, waiting to be reclaimed by JVM.

When allowCoreThreadTimeOut is set to true, the life cycle termination logic of the non-core thread analyzed here also applies to the core thread.

This leads to an interview question: if there are multiple threads in the thread pool that do not get the task at the same time, will they all be recycled?

For example: the number of core threads in the thread pool is 5, the maximum number of threads is 5, and the current number of working threads is 6 (6 > 5, which means thread recycling can be triggered). If three threads time out and do not get the task at the same time, will all three threads be reclaimed and destroyed?

Idea: the core of this problem is that multiple threads cannot get a task when they time out at the same time. Normally, the process of thread recycling is triggered at this time. But we know that when the allowCoreThreadTimeOut variable is not set normally, the thread pool maintains the number of core threads even if there is no task processing. If all 3 threads are recycled here, then the number of threads becomes 3, which does not match the number of core threads, so here we can get the answer first: not all of them will be recycled. At this time, the interviewer is sure to ask why?

According to the answer, it is not difficult to speculate that in order to prevent the concurrency recovery problem of this question, the thread recovery process must have concurrency control. CompareAndDecrementWorkerCount (c) uses the CAS method, continue if the CAS fails, enter the next cycle and rejudge.

Like the above example, one of the threads will CAS fail, and then re-enter the loop, and find that the number of worker threads is only 5, timed = false, this thread will not be destroyed, it can be blocked all the time, and workQueue.take () will be called to block the task waiting for the next time, that is, the core thread will not die.

It can also be seen here that although there are core threads, there is no distinction between core and non-core threads. They are not created first or cores. After exceeding the number of core threads, non-core threads are created, and which threads are retained at last. Completely random.

You can then answer the previous question, how does the thread pool ensure that core threads are not destroyed and idle threads are destroyed?

The core thread is not destroyed because the blocking method is called, and the idle thread calls the timeout method and dies without getting the task the next time it executes.

This answer is actually OK, but it may show that you have memorized the eight-part essay, so you should answer that the core thread will not be destroyed not only because the blocking method is called, but also using CAS to ensure.

You can also get what happens when getTask () returns null:

1. The status of the thread pool is already STOP,TIDYING, TERMINATED, or SHUTDOWN and the task force is empty.

two。 The number of worker threads is greater than the maximum number of threads or the current worker thread has timed out, and its worker thread exists or the task queue is empty.

The process of runWorker:

ProcessWorkerExit

In the finally block of runWorker, the task is processed after it is executed, and the thread does not really end until it has executed the processWorkerExit () method, which is as follows:

Private void processWorkerExit (Worker w, boolean completedAbruptly) {/ / causes thread termination by throwing a user exception, so you can directly reduce the number of worker threads by 1. / if there is no exception thrown, the thread is guided normally by getTask () to jump out of the while dead loop of the runWorker () method. In getTask (), the number of threads has been reduced by 1 if (completedAbruptly) / / If abrupt, then workerCount wasn't adjusted decrementWorkerCount () Final ReentrantLock mainLock = this.mainLock; mainLock.lock (); try {/ / Global completed task records plus the number of completed tasks in this terminating Worker completedTaskCount + = w.completedTasks; / / remove this terminating Worker workers.remove (w) from the worker thread collection;} finally {mainLock.unlock () } / / see the analysis in the next section, which is used to determine whether thread pool terminate processing tryTerminate (); int c = ctl.get (); / / if the thread pool state is less than STOP, that is, if it is in RUNNING or SHUTDOWN state: / / 1. If the thread is not terminated due to a user exception thrown, and if the core thread is allowed to time out, keep at least one worker thread / / 2 in the thread pool. If the thread terminates abnormally due to throwing a user, or the current number of worker threads, then directly add a new non-core thread if (runStateLessThan (c, STOP)) {if (! completedAbruptly) {/ / if the core thread is allowed to time out, the minimum value is 0, otherwise corePoolSize int min = allowCoreThreadTimeOut? 0: corePoolSize / / if the minimum value is 0 and the task queue is not empty, the minimum update value is 1 if (min = = 0 & &! WorkQueue.isEmpty () min = 1; / / if the number of worker threads is greater than or equal to the minimum, directly return no new non-core threads if (workerCountOf (c) > = min) return; / / replacement not needed} addWorker (null, false);}}

The later part of the code determines the state of the thread pool. If the thread pool is RUNNING or SHUTDOWN state, if the current worker thread is terminated due to throwing an exception, a new non-core thread will be created. If the current worker thread is not terminating a user exception (normally terminated), it will be handled as follows:

AllowCoreThreadTimeOut is true, which allows core threads to time out, and if the task queue is empty, at least one worker thread is kept in the thread pool by creating a non-core thread.

AllowCoreThreadTimeOut is false, return directly if the total number of worker threads is greater than corePoolSize, otherwise create a non-core thread, that is, it will tend to keep the number of worker threads in the thread pool tend to corePoolSize.

After the execution of processWorkerExit (), it means that the life cycle of the worker thread is over.

The above is all the content of the article "sample Analysis of Java Thread Pool". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report