Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle and function of Java thread pool?

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly explains "what is the principle and function of Java thread pool". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the principle and function of Java thread pool".

I. reuse of threads

"the first problem is how to reuse threads."

/ * Created by Anur IjuoKaruKas on 2019-7-16 * / public class ThreadPoolExecutor {private final BlockingQueue workQueue = new LinkedBlockingQueue (); private final Runnable runnable = ()-> {try {while (true) {Runnable take = workQueue.poll (); if (take = = null) {Thread.sleep } else {take.run ();} catch (InterruptedException e) {e.printStackTrace ();}}; public ThreadPoolExecutor () {new Thread (runnable). Start ();} public void execute (Runnable command) {workQueue.offer (command);}}

After a glance at the fat house, we soon found the mystery: in Xiao Nai's ThreadPoolExecutor, we customized a set of runnable process, which is responsible for constantly pulling tasks submitted by the # execute method from the queue of workQueue and executing its run () method. In this way, no matter how many tasks are submitted, it is always the thread built into this thread pool that is executing the task. When a task is not available, the thread pool goes to sleep by itself.

2. Automatic creation and destruction of worker threads and the maximum number of worker

"although this achieves thread reuse, there is no way for your thread to be created and destroyed automatically? even the number of thread pools is uncontrollable." Although the fat house buried lamented that the other side could achieve thread reuse so quickly, it continued to carry out the offensive.

"since we want to achieve thread pool control, the most straightforward idea is to encapsulate the original runnable process into an object, and we only need to control the creation, destruction, and reuse of this object." As a female programmer who has been immersed in OOP thinking for a long time, this kind of problem is not difficult for Xiao Nai. She quickly wrote an inner class called Worker, where # runWorker (this); the runnable process is responsible for constantly fetching tasks from the queue and calling its # run () method.

Private final class Worker implements Runnable {final Thread thread; Runnable firstTask; Worker (Runnable firstTask) {this.firstTask = firstTask; this.thread = threadFactory.newThread (this);} @ Override public void run () {runWorker (this);}}

Xiao Nai laid the foundation for the subsequent control of the number of worker threads: a hash set was added to ThreadPoolExecutor for storing worker, and a ThreadFactory was added for users to customize the creation of worker threads.

The core method is called # addWorker (), which is responsible for creating and initializing worker threads and bringing them into hash centralized management. Of course, this thread pool cannot be created automatically, but it can be destroyed automatically. As you can see, when the task is not pulled, # getTask () returns empty, jumps out of the while loop of # runWorker (), and then calls # processWorkerExit (); to remove the worker thread from the hash set.

/ * Created by Anur IjuoKaruKas on 2019-7-16 * / public class ThreadPoolExecutor {private final HashSet workers = new HashSet (); private volatile ThreadFactory threadFactory; private final BlockingQueue workQueue; public ThreadPoolExecutor (BlockingQueue workQueue, ThreadFactory threadFactory) {this.threadFactory = threadFactory; this.workQueue = workQueue;} public void execute (Runnable command) {workQueue.offer (command) } / * create a new worker thread, start and incorporate workers * / private boolean addWorker (Runnable firstTask) {Worker w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {workers.add (w); t.start ();} return true } / * worker thread pool continues to pull task from workQueue for consumption * / private void runWorker (Worker w) {Runnable task = w.firstTask; w.firstTask = null; while (task! = null | | (task = getTask ())! = null) {task.run ();} processWorkerExit (w) } / * remove a thread from workers * / private void processWorkerExit (Worker w) {workers.remove (w);} private Runnable getTask () {return workQueue.poll ();}} before it finishes execution

See here, fat mansion buried has been able to predict the next train of thought.

A variable maximumPoolSize needs to be added to the thread pool to prevent unlimited thread creation. Each time you perform # addWorker (), you need to determine whether you can continue to add worker, and if so, add a new worker, otherwise the task will be thrown into the queue:

# addWorker () adds the logic of rejection to ensure that the worker cannot be created indefinitely.

Modify the # execute () method to create a worker first, and if you fail to create a worker (workers.size () > = maximumPoolSize), throw the task directly into the queue.

Public void execute (Runnable command) {if (addWorker (command)) {return;} workQueue.offer (command);} / * create a new worker thread, start and incorporate workers * / private boolean addWorker (Runnable firstTask) {int ws = workers.size (); if (ws > = maximumPoolSize) {return false } Worker w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {workers.add (w); t.start ();} return true;} III, core thread, maximum thread and keepAliveTime

It has been written here that Xiao Nai is arrogant, as if to implement a thread pool.

"does this seem to be a bit of a problem? although you have implemented the dynamic creation and destruction of threads, when the task is not so compact, basically every task needs to create a thread and destroy it again. Where is the reuse?" The fat house was buried by the inflated Xiao Nai on the head.

"ahem. Well, just make a judgment when destroying. Let's add a new variable called keepAliveTime. When we can't get the task, we hibernate. For example, 20ms, subtract 20ms from keepAliveTime every time until it is less than or equal to 0, and then destroy the thread." Xiao Nai reacted quickly, gave the answer quickly, and was ready to make changes to the thread pool.

Fat House sighed, "I see you are blinded by inflation. Now that we have used blocking queues, we can make full use of the features of blocking queues! blocking queues have an explicit lock built in, and thread scheduling can be implemented directly and accurately using the locked condition object and its # awaitNanos () and # notify () methods." After all, the fat house is also a high achiever, after hearing Xiao Nai's idea, he put forward a more constructive design.

Xiao Nai also quickly realized that the blocking queue has a # poll () method, and the bottom layer is LockSupport.parkNanos (this, nanosTimeout) encapsulated by the condition object. To achieve this, it will block until a new element is added, and when a new element is added, the condition will be awakened, so that when # poll () of the blocking queue is called, if the blocking queue is listed as empty, it will sleep for a period of time until it is awakened or the sleep timeout.

The fat house took over the power to transform the thread pool and immediately changed it drastically.

The change is simple: the original # getTask () called directly the # take () method of the blocking queue, and returned directly if the queue was empty, as long as it was changed to the # poll method.

/ * when runWorker cannot get the task within a certain period of time, processWorkerExit will destroy * / private Runnable getTask () {boolean timedOut = false; while (true) {try {if (timedOut) {return null;} Runnable r = workQueue.poll (keepAliveTime, unit) If (r! = null) {return r;} else {timedOut = true;}} catch (InterruptedException e) {timedOut = false;}

"generally speaking, our task submission is not too uniform. If we usually do not need so many threads to consume, but want to avoid tasks being piled up all the time, some tasks are not consumed for a long time. We need to introduce the concepts of * * core thread corePoolSize * * and * * maximum thread maximumPoolSize * *." Fat House thought of a simple point that could be optimized, and analyzed it eloquently: "We don't have to do such a complex dynamic worker consumption pool, and the simplest thing is to continue to create more thread pools if our blocking queue is full, so that the stacked tasks can fall faster than ever before."

It seems complicated, but in fact the code is very simple. Xiao Nai saw that the fat house buried modified the # addWorker () method by adding a parameter core, which has only one effect. If it is a core thread, the number must be less than or equal to corePoolSize, otherwise the number must be less than or equal to maximumPoolSize.

In addition, the change to the # execute () method is very simple, not much of the previous change, mainly to create a non-core worker thread when task # offer () fails.

/ * priority is given to the creation of core threads. When the core threads are full, priority is given to putting tasks into the queue * * when the queue is full, the non-core thread pool is enabled to prevent task accumulation * / public void execute (Runnable command) {if (getPoolSize ())

< corePoolSize) { if (addWorker(command, true)) { return; } } if (!workQueue.offer(command)) { addWorker(command, false); } } /** * 新建一个 worker 线程、启动并纳入 workers */ private boolean addWorker(Runnable firstTask, boolean core) { int ws = workers.size(); if (ws >

= (core? CorePoolSize: maximumPoolSize) {return false;} Worker w = new Worker (firstTask); final Thread t = w. Thread; if (t! = null) {workers.add (w); t.start ();} return true;} IV. Reject policy

"now this version of thread pool looks really good. Threads can be dynamically created and destroyed, threads can be reused, and more threads can be dynamically added to consume stacked threads!" Fat House buried looked at their masterpiece with satisfaction, "in fact, I also found that there is a place that is not very friendly. When pushing a task, the caller may not know whether his task has failed or not."

"for this simple duck, you just need to return flase when calling # execute () to represent the add failure, or throw the corresponding exception." Xiao Nai gives a very intuitive design.

"this is a good idea, but it would be too troublesome for the caller to make this judgment in all places that use the thread pool!" Fat House burial complements the solution: "this is an idea of aspect-oriented programming. We can provide a faceted entry where these queues can no longer fit and cannot create more consumer threads. Call it AbortPolicy!"

Fat House has modified the # execute () method to reject the task directly if it fails to create a non-core thread pool.

/ * priority is given to the creation of core threads. When the core threads are full, tasks will be queued first * * when the queue is full, the non-core thread pool will be enabled to prevent task accumulation * * if the creation of the non-core thread pool fails Then reject this task * / public void execute (Runnable command) {if (getPoolSize () < corePoolSize) {if (addWorker (command, true)) {return }} if (! workQueue.offer (command)) {if (! addWorker (command, false)) {reject (command);}

How to reject the task, give it to the caller to implement, the implementation of # reject () is very simple, is to call BiConsumer, which can be freely customized by the caller.

Private void reject (Runnable command) {abortPolicy.accept (command, this);} V. Executive thread pool

Xiao Nai and Feizhai have completed their thread pool, and now you need to test whether the thread pool can be used properly. The more careful Feizhai has written the test case as follows:

The number of core threads is 5, the maximum number of threads is 10, and then each thread survives for one minute when it cannot pull the task, there is a concurrent blocking queue of length 5, using the default ThreadFactory, and finally, using DiscardPolicy, when the task is rejected, the task is discarded directly and the log is printed.

They run the code, and the log is printed as follows. As expected, only five core threads are consuming tasks before the blocking queue is full. When the blocking queue is full, more threads will be created gradually, and when more threads cannot be created, the discard policy will be triggered.

Thank you for your reading, the above is the content of "what is the principle and function of Java thread pool". After the study of this article, I believe you have a deeper understanding of the principle and function of Java thread pool. the specific use also needs to be verified by practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report