In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
According to the example analysis of the working principle of thread pool in Java concurrent programming, it is believed that many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
Thread pool inheritance relationship
The top-level interface of ThreadPoolExecutor implementation is Executor. In the interface Executor, users do not need to pay attention to how to create threads and how to schedule threads to execute tasks. Users only need to provide Runnable objects, submit the running logic of tasks to the executor Executor, and the Executor framework completes thread deployment and task execution.
Some capabilities have been added to ExecutorService interface
Expand the ability to execute tasks and supplement methods that can generate Future for one or a batch of asynchronous tasks
Provides methods to control thread pools, such as stopping the running of thread pools.
AbstractExecutorService is the upper abstract class:
The process of executing the task is concatenated to ensure that the lower-level implementation only needs to focus on one method of executing the task.
The most complex part of the ThreadPoolExecutor implementation is:
It can automatically create, manage and reuse a specified number of threads, and the applicable party only needs to submit tasks to be thread-safe. ThreadPoolExecutor has properties such as state, number of core threads and non-core threads. CAS and AQS locking mechanisms are widely used to avoid conflicts caused by concurrency.
The concepts of core thread, buffer blocking queue, non-core thread and abandonment strategy are provided, which can be combined according to the actual application scenario.
BeforeExecute and afterExecute () are provided to support the extension of thread pool functions.
The advantages of thread pools reduce the overhead caused by thread creation and destruction
Improve response speed: when a task arrives, it must be much faster to get a thread directly from the thread pool than to create a thread manually.
Improve thread manageability: threads are scarce resources, which, if created indefinitely, will not only consume system resources, but also reduce system stability. Thread pools can be used for agreed allocation, tuning, and monitoring.
Constructor public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory RejectedExecutionHandler handler) {if (corePoolSize < 0 | | Thread processing speed in the maximumPoolSize thread pool requires continuous creation of new threads Every time a task is submitted, there is a thread to process it immediately, so CachedThreadPool is suitable for handling a large number of less time-consuming tasks.
Single-thread thread pool (only one thread will run, otherwise it will pile up to the blocking queue)
It is suitable for applications where tasks need to be executed sequentially; and at any point in time, no multiple threads are active, the corePoolSize and maximumPoolSize of SingleThreadExecutor are set to 1, and the unbounded queue LinkedBlockingQueue is used as the thread pool work queue
NewSingleThreadExecutor creation is a single thread pool, that is, there is only one thread working in this thread pool, and all tasks are executed serially. If the only thread ends because of an exception, there will be a new thread to replace it. This thread pool ensures that all tasks are executed in the order in which they are submitted.
When there are no threads in the thread pool, a new thread is created to execute the task.
After there is a thread in the current thread pool, add the new task to the LinkedBlockingQueue
After the thread executes the first task, it repeatedly fetches the task from the LinkedBlockingQueue to execute in an infinite loop.
Usage scenarios: * * suitable for serial task execution scenarios * *
There will be OOM caused by excessive accumulation of blocking queues.
Public static ExecutorService newSingleThreadExecutor () {return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1,1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ();} fixed size thread pool (will run a specified number of threads, otherwise it will always pile up to the blocking queue) corePoolSize is equal to maximumPoolSize, so there are only core threads in the thread pool, using the unbounded blocking queue LinkedBlockingQueue as a work queue usage scenario
It is suitable for dealing with CPU-intensive tasks and ensures that CPU assigns as few threads as possible when it is used by worker threads for a long time, that is, it is suitable for performing long-term tasks.
NewFixedThreadPool: create a fixed size thread pool. Each time a task is submitted, a thread is created until the thread reaches the maximum size of the thread pool. Once the thread pool reaches its maximum size, it will remain the same. If a thread ends because of an execution exception, the thread pool will add a new thread. When threads are idle, they are not recycled unless the thread pool is closed. When all threads are active, new tasks wait until a thread is idle.
There will be OOM caused by large accumulation of blocking queues
Public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor (nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory);} scheduled task thread pool (task queue and maximum are infinite size, all the way up to blocking queue)
NewScheduledThreadPool creates a thread pool of unlimited size that supports the need for timing and periodic execution of tasks.
The threshold for the total number of threads is Integer.MAX_VALUE, the work queue uses DelayedWorkQueue, and the survival time of non-core threads is 0, so the thread pool contains only a fixed number of core threads.
There will be OOM caused by excessive accumulation of blocking queues.
Public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor (corePoolSize, threadFactory);} public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory) {super (corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue (), threadFactory);}
You can see that the above method uses DelayedWorkQueue, LinkedBlockingQueue, and SynchronousQueue. This is the blocking queue for one of the thread cores.
There are two ways to submit a task:
ScheduleAtFixedRate: execute on a fixed rate cycle
ScheduleWithFixedDelay: after the last task was delayed for a fixed time
Task blocking queue
It is generally divided into direct submission queue, bounded task queue, unbounded task queue and priority task queue.
SynchronousQueue
Direct submission queue: set to SynchronousQueue queue. SynchronousQueue is a special BlockingQueue that has no capacity. Each insert operation will block, and another delete operation will be awakened. Otherwise, each delete operation will have to wait for the corresponding insert operation.
A blocking queue that does not store elements, and each insert operation must wait until another thread calls the remove operation, otherwise the insert operation will always be blocked.
In the SynchronousQueue queue, submitted tasks are not saved and are always submitted for execution immediately. If the number of threads used to execute the task is less than maximumPoolSize, try to create a new process, and if the maximum value set by maximumPoolSize is reached, the reject policy will be executed according to the handler you set. Therefore, the tasks you submit in this way will not be cached, but will be executed immediately. In this case, you need to have an accurate assessment of the concurrency of your program in order to set the appropriate number of maximumPoolSize, otherwise the rejection policy will be easily implemented.
ArrayBlockingQueue bounded task queues: bounded task queues can be implemented using ArrayBlockingQueue, as follows: new ThreadPoolExecutor (1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue (10), Executors.defaultThreadFactory (), new ThreadPoolExecutor.AbortPolicy ())
Using the ArrayBlockingQueue bounded task queue, the thread pool creates new threads when new tasks need to be executed, and adds new tasks to the waiting queue until the number of threads created reaches corePoolSize. If the waiting queue is full, that is, beyond the capacity initialized by ArrayBlockingQueue, continue to create threads until the number of threads reaches the maximum number of threads set by maximumPoolSize, and if it is greater than maximumPoolSize, the reject policy is executed.
In this case, the upper limit of the number of threads is directly related to the state of the bounded task queue. If the initial capacity of the bounded queue is large or does not reach the overloaded state, the number of threads will remain below corePoolSize, otherwise, when the task queue is full, the maximum number of threads will be maximumPoolSize.
LinkedBlockingQueue unbounded task queue: unbounded task queue can be implemented using LinkedBlockingQueue, as follows: new ThreadPoolExecutor (1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), Executors.defaultThreadFactory (), new ThreadPoolExecutor.AbortPolicy ())
Using an unbounded task queue, the task queue of the thread pool can add new tasks indefinitely, and the maximum number of threads created by the thread pool is the number set by your corePoolSize, that is, the parameter maximumPoolSize is invalid in this case, even if you cache a lot of unexecuted tasks in the task queue, when the number of threads in the thread pool reaches corePoolSize, it will not increase. If new tasks are added later, go directly to the queue and wait. When using this task queue mode, be sure to pay attention to the coordination and control between task submission and processing. Otherwise, the number of tasks in the queue will continue to grow due to the inability to process them in time, until the resources are exhausted.
PriorityBlockingQueue priority task queue: the priority task queue is implemented through PriorityBlockingQueue, using a balanced binary tree heap to achieve an unbounded blocking queue with priority.
Tasks are rearranged and executed according to priority, and the number of threads in the thread pool has always been corePoolSize, that is, only one.
PriorityBlockingQueue is actually a special unbounded queue, in which no matter how many tasks are added, the number of threads created by the thread pool will not exceed the number of corePoolSize, but other queues generally process tasks according to first-in-first-out rules, while PriorityBlockingQueue queues can be executed according to the priority order of tasks.
In fact, LinkedBlockingQueue can also set limits, and its default limit is Integer.MAX_VALUE. It also supports setting the queue size when constructing.
DelayQueue
Unbounded blocking delay queue, each element in the queue has an expiration time, and when the element is obtained from the queue, only the expired element will be out of the queue. The queue header element is the most out-of-date element.
Reject policy public interface RejectedExecutionHandler {void rejectedExecution (Runnable r, ThreadPoolExecutor executor);}
When Executor has been turned off, that is, after the executorService.shutdown () method has been executed, or when Executor uses limited boundaries for maximum thread and work queue capacity and is saturated. New tasks submitted using the method execute () will be rejected. In the above case, the execute method will call the rejectExecution () method of its RejectedExecutionHandler
RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) AbortPolicy (default reject policy)
Also known as a termination policy, a rejection throws a runtime RejectedExecutionException. The business side can get timely feedback on the results submitted for this task by catching exceptions.
Public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy () {} public void rejectedExecution (Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException ("Task" + r.toString () + "rejected from" + e.toString ());}} CallerRunsPolicy
Having independent feedback control allows submitters to perform submission tasks, which can slow down the submission speed of new tasks. In this case, all tasks need to be completed.
Public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy () {} public void rejectedExecution (Runnable r, ThreadPoolExecutor e) {if (! e.isShutdown ()) {r.run ();} DiscardPolicy
Reject the handler of the task and silently discard the task. Using this strategy, we may not be aware of the abnormal state of the system. Use it with caution!
Public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy () {} public void rejectedExecution (Runnable r, ThreadPoolExecutor e) {}} DiscardOldestPolicy
Discard the first task in the queue and resubmit the rejected task. Whether you want to use this strategy depends on whether the business needs to replace the old and the new. Use it with caution. (LRU)
Public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy () {} public void rejectedExecution (Runnable r, ThreadPoolExecutor e) {if (! e.isShutdown ()) {e.getQueue () .poll (); e.execute (r);} run the execution process
Determine whether the number of core threads in the thread pool has reached the threshold corePoolSize. If not, create a new core thread to execute the task.
If the number of core threads has reached the threshold corePoolSize, determine whether the blocking queue workQueue is full. If not, add the new task to the blocking queue.
If it is full, determine whether the number of threads in the thread pool reaches the threshold maximumPoolSize, if not, create a new non-core thread to execute the task. If the threshold is reached, the thread pool saturation policy is executed.
There are several thread pool saturation strategies:
AbortPolicy: throw an exception directly. Default policy
DiscardPolicy: discarding tasks directly
DiscardOldestPolicy: discard the next task to be performed (the oldest task)
CallerRunsPolicy: executing tasks in the main thread
Rational configuration of thread pool size in order to configure thread pool reasonably, we must first analyze the task characteristics, which can be analyzed from the following angles:
The nature of tasks: CPU-intensive tasks, IO-intensive tasks and mixed tasks.
Priority of the task: high, medium and low.
The execution time of the task: long, medium and short.
Task dependency: whether to rely on other system resources, such as database connections.
According to the amount of cpu and io resources required by the task, it can be divided into
CPU-intensive tasks: mainly performing computing tasks, the response time is very fast, cpu has been running, this kind of task cpu utilization is very high.
IO-intensive tasks: mainly perform IO operations, and it takes a long time to perform IO operations, which is due to the idle state of cpu, resulting in low utilization of cpu.
In order to reasonably maximize the use of system resources and ensure the high performance of the program, we can configure some threads for CPU-intensive tasks and IO-intensive tasks.
CPU-intensive: the number of threads is CPU cores. These threads can be executed in parallel, and there is no thread switching overhead, which not only improves the utilization of cpu, but also reduces the performance loss caused by switching threads.
IO-intensive: the number of threads is twice the number of CPU cores. When the threads are operating in IO, other threads can continue to use cpu, which improves the utilization of cpu.
After reading the above, have you mastered the method of example analysis of how thread pools work in Java concurrent programming? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.