In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
How to correctly use the Java thread pool, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can gain something.
In the daily development work, thread pool often carries the most important business logic in an application, so it is necessary for us to pay more attention to the execution of thread pool, including exception handling and analysis. This article focuses on how to use thread pools correctly, as well as providing some practical advice.
Exception handling of thread pool
UncaughtExceptionHandler
We all know that the run method in the Runnable interface does not allow exceptions to be thrown, so the main thread that spawned this thread may not be able to directly get the exception information of the thread during execution. Such as the following example:
Public static void main (String [] args) throws Exception {Thread thread = new Thread (()-> {Uninterruptibles.sleepUninterruptibly (2, TimeUnit.SECONDS); System.out.println (1 / 0); / / this line will cause an error! }); thread.setUncaughtExceptionHandler ((t, e)-> {e.printStackTrace (); / / if you comment out this line, the program will not throw any exceptions. }); thread.start ();}
What causes it? In fact, if we take a look at the source code in Thread, we will find that if Thread encounters an exception during execution, it will first determine whether the current thread has set UncaughtExceptionHandler. If not, it will get it from the ThreadGroup where the thread resides.
Note: each thread has its own ThreadGroup, even if you don't specify it, and it implements the UncaughtExceptionHandler interface.
Let's look at the default implementation of the UncaughtExceptionHandler interface in ThreadGroup:
Public void uncaughtException (Thread t, Throwable e) {if (parent! = null) {parent.uncaughtException (t, e);} else {Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler (); if (ueh! = null) {ueh.uncaughtException (t, e) } else if (! (e instanceof ThreadDeath)) {System.err.print ("Exception in thread\"+ t.getName () +"\ "); e.printStackTrace (System.err);}
If the ThreadGroup has a parent ThreadGroup, then call the parent ThreadGroup's uncaughtException, otherwise call the global default Thread.DefaultUncaughtExceptionHandler, if the global handler is not set, then simply locate the exception information to the System.err, which is why we should create a thread to implement its UncaughtExceptionHandler interface, which makes it easier for you to troubleshoot the problem.
Submit tasks to the thread pool through execute
Going back to the thread pool topic, what effect would it have on the thread pool if we didn't try...catch the exception in the task we submitted to the thread pool, and there was an exception at run time? The answer is no, the thread pool still works, but the exception is swallowed. This is usually not a good thing because we need to get the original exception object to analyze the problem.
So how can we get the original exception object? We start with the source code of the thread pool. Of course, there are many articles about thread pool source code parsing on the Internet, here is limited to space, directly give the most relevant part of the code:
Final void runWorker (Worker w) {Thread wt = Thread.currentThread (); Runnable task = w.firstTask; w.firstTask = null; w.unlock (); / / allow interrupts boolean completedAbruptly = true; try {while (task! = null | | (task = getTask ())! = null) {w.lock () / / If pool is stopping, ensure thread is interrupted; / / if not, ensure thread is not interrupted. This / / requires a recheck in second case to deal with / / shutdownNow race while clearing interrupt if ((runStateAtLeast (ctl.get (), STOP)) | (Thread.interrupted () & & runStateAtLeast (ctl.get ()) STOP)) & &! wt.isInterrupted () wt.interrupt () Try {beforeExecute (wt, task); Throwable thrown = null; try {task.run ();} catch (RuntimeException x) {thrown = x; throw x } catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error (x);} finally {afterExecute (task, thrown) }} finally {task = null; W.completedTaskscomplete; w.unlock ();}} completedAbruptly = false;} finally {processWorkerExit (w, completedAbruptly);}}
This method is the code that actually performs the task submitted to the thread pool.
Here we omit the irrelevant logic and focus on the logic from lines 19 to 32, where line 23 actually starts executing the task submitted to the thread pool, so what does line 20 do? In fact, we can do some pre-work before executing the task submitted to the thread pool. Similarly, we see line 31, which shows that some post-work can be done after the submitted task has been executed.
Let's put aside the beforeExecute for a while and focus on the afterExecute method. We can see that during the execution of the task, once any type of exception is thrown, it will be submitted to the afterExecute method. However, looking at the thread pool source code, we can see that the default afterExecute is an empty implementation, so it is necessary for us to inherit ThreadPoolExecutor to implement this afterExecute method.
Looking at the source code, we can see that this afterExecute method is of type protected, and it can also be seen from the official comments that this method is implemented by recommended subclasses.
Of course, this method cannot be implemented at will, and certain steps need to be followed. The specific official notes are also mentioned. The excerpt here is as follows:
* {@ code * class ExtendedExecutor extends ThreadPoolExecutor {* / /... * protected void afterExecute (Runnable r, Throwable t) {* super.afterExecute (r, t); * if (t = = null & & r instanceof Future) {* try {* Object result = ((Future) r). Get () *} catch (CancellationException ce) {* t = ce; *} catch (ExecutionException ee) {* t = ee.getCause (); *} catch (InterruptedException ie) {* Thread.currentThread () .interrupt () / / ignore/reset *} * if (t! = null) * System.out.println (t); *} *}}
In this way, exceptions that might have been swallowed by the thread pool can be successfully caught, making it easy to troubleshoot the problem.
But there is a small problem here. We notice that in the runWorker method, after the execution of the task.run (); statement, various types of exceptions are thrown, so where are the thrown exceptions? In fact, the exception object here will eventually be passed into the dispatchUncaughtException method of Thread. The source code is as follows:
Private void dispatchUncaughtException (Throwable e) {getUncaughtExceptionHandler () .uncaughtException (this, e);}
You can see that it gets the implementation class of UncaughtExceptionHandler and then calls the uncaughtException method in it, which goes back to the specific logic of the UncaughtExceptionHandler implementation we analyzed in the previous section. So in order to get the original exception object, in addition to implementing the UncaughtExceptionHandler interface, you can also consider implementing the afterExecute method.
Submit tasks to the thread pool through submit
This is also very simple, let's go back to the source code of the submit method:
Public Future submit (Callable task) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task); execute (ftask); return ftask;}
The execute method here calls the execute method in ThreadPoolExecutor, and the execution logic is the same as submitting the task to the thread pool through execute. Let's first focus on the newTaskFor method here, whose source code is as follows:
Protected RunnableFuture newTaskFor (Callable callable) {return new FutureTask (callable);}
You can see that the submitted Callable object is wrapped in FutureTask. We know that it will eventually be executed in the above runWorker method, and the core execution logic is task.run (); this line of code. We know that the task here is actually of type FutureTask, so we need to take a look at the implementation of the run method in FutureTask:
Public void run () {if (state! = NEW | |! UNSAFE.compareAndSwapObject (this, runnerOffset, null, Thread.currentThread ()) return; try {Callable c = callable; if (c! = null & & state = = NEW) {V result; boolean ran Try {result = c.call (); ran = true;} catch (Throwable ex) {result = null; ran = false; setException (ex) } if (ran) set (result);}} finally {/ / runner must be non-null until state is settled to / / prevent concurrent calls to run () runner = null / / state must be re-read after nulling runner to prevent / / leaked interrupts int s = state; if (s > = INTERRUPTING) handlePossibleCancellationInterrupt (s);}
You can see that the most critical code related to exceptions is on line 17, which is setException (ex);. Let's take a look at the implementation of this place:
Protected void setException (Throwable t) {if (UNSAFE.compareAndSwapInt (this, stateOffset, NEW, COMPLETING)) {outcome = t; UNSAFE.putOrderedInt (this, stateOffset, EXCEPTIONAL); / / final state finishCompletion ();}}
The most critical point here is that the exception object is assigned to the member variable outcome,outcome is in FutureTask. We call the submit method, get a Future object, and then call its get method. The core method is the report method. The source code of each method is given below:
The first is the get method:
Public V get () throws InterruptedException, ExecutionException {int s = state; if (s = CANCELLED) throw new CancellationException (); throw new ExecutionException ((Throwable) x);}
Above are some status judgments. If the current task is not completed normally or is cancelled, then the x here is actually the original exception object, which can be seen to be wrapped by ExecutionException. So when you call the get method, you may throw an ExecutionException exception, so you can get the original exception object by calling its getCause method.
To sum up, there are two main ways to deal with the problem that tasks submitted to the thread pool may throw exceptions:
Try...catch yourself in the submitted tasks, but the downside here is that if you can submit multiple types of tasks to the thread pool, each type of task needs to try...catch exceptions on its own, which is tedious. And if you just catch (Exception e), you may still miss some exceptions including the Error type, so just to be on the safe side, consider catch (Throwable t).
Implement the afterExecute method of the thread pool yourself, or implement the UncaughtExceptionHandler interface of Thread.
Here is an example of my personal creation of a thread pool for your reference:
BlockingQueue queue = new ArrayBlockingQueue (DEFAULT_QUEUE_SIZE); statisticsThreadPool = new ThreadPoolExecutor (DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, 60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder () .setThreadFactory (new ThreadFactory () {private int count = 0; private String prefix = "StatisticsTask") @ Override public Thread newThread (Runnable r) {return newThread (r, prefix + "-" + count++);}}) .setUncaughtExceptionHandler ((t, e)-> {String threadName = t.getName (); logger.error ("statisticsThreadPool error occurred! ThreadName: {}, error msg: {} ", threadName, e.getMessage (), e);}) .build (), (r, executor)-> {if (! executor.isShutdown ()) {logger.warn (" statisticsThreadPool is too busy! Waiting to insert task to queue! "); Uninterruptibles.putUninterruptibly (executor.getQueue (), r);}}) {@ Override protected void afterExecute (Runnable r, Throwable t) {super.afterExecute (r, t); if (t = = null & & r instanceof Future) {try {Future future = (Future) r Future.get ();} catch (CancellationException ce) {t = ce;} catch (ExecutionException ee) {t = ee.getCause ();} catch (InterruptedException ie) {Thread.currentThread () .interrupt () / / ignore/reset}} if (t! = null) {logger.error ("statisticsThreadPool error msg: {}", t.getMessage (), t);}; statisticsThreadPool.prestartAllCoreThreads (); setting the number of threads
We know that there are generally two kinds of tasks: CPU-intensive and IO-intensive. Then in the face of CPU-intensive tasks, the number of threads should not be too many, generally choose the number of CPU cores + 1 or 2 times the number of cores is a more reasonable value. So we can consider setting corePoolSize to the number of CPU cores + 1 mag PoolSize to twice the number of cores.
Similarly, when dealing with IO-intensive tasks, we can consider setting the number of threads by multiplying the number of cores by 4 times the number of core threads, and then multiplying the number of cores by 5 times the maximum number of threads, which is more reasonable than setting a value directly on the head.
Of course, the total number of threads should not be too much, and it is more reasonable to control within 100 threads, otherwise too many threads may lead to frequent context switching, resulting in worse system performance.
How to close a thread pool correctly
When it comes to how to close a thread pool correctly, it's also a little particular. In order to achieve the goal of graceful downtime, we should first call the shutdown method, which means that the thread pool will not receive any new tasks, but tasks that have been submitted will continue to execute, including those in the queue. So, after that, you should also call the awaitTermination method, which sets the maximum timeout of the thread pool before closing. If the thread pool closes normally before the timeout ends, this method will return true, otherwise, it will return false if it times out. Generally speaking, we cannot wait indefinitely, so we need to estimate a reasonable timeout in advance and then use this method.
If the awaitTermination method returns false, and you want to do other resource recovery work after the thread pool is closed as far as possible, consider calling the shutdownNow method again, where all outstanding tasks in the queue will be discarded and the interrupt flag bit for each thread in the thread pool will be set. ShutdownNow does not guarantee that a running thread will stop working unless the task submitted to the thread responds correctly to the interrupt. At this point, consider continuing to call the awaitTermination method, or just give up and do what you're going to do next.
Other useful methods in the thread pool
You may have noticed that when I created the thread pool, I also called this method: prestartAllCoreThreads. What is the use of this method? We know that after a thread pool is created, the number of threads in the thread pool is 0 before any tasks are submitted to it. Sometimes we know in advance that many tasks will be submitted to this thread pool, but it is too expensive to create new threads one by one, which affects system performance. therefore, you can consider creating all the core threads at once when you create the thread pool, so that you can use it directly when the system is up.
In fact, some other interesting methods are provided in the thread pool. For example, let's imagine a scenario where a thread pool has a high load and is about to burst, causing a rejection policy to be triggered. Is there any way to alleviate this problem? As a matter of fact, there are, because thread pools provide methods for setting the number of core threads and the maximum number of threads, which are the setCorePoolSize method and the setMaximumPoolSize method, respectively. Yes, you can also change the number of threads after the thread pool is created! Therefore, when the thread pool is running under heavy load, we can deal with it as follows:
A scheduled polling thread (daemon type) is used to detect the number of threads in the thread pool, specifically, to call the getActiveCount method.
When you find that the number of threads exceeds the number of core threads, you can consider multiplying the values of CorePoolSize and MaximumPoolSize by 2 at the same time. Of course, it is not recommended to set a large number of threads, because the more threads the better, you can consider setting an upper limit, such as 50,100 and so on.
At the same time, to get the number of tasks in the queue, specifically, call the getQueue method and then call the size method. When the number of tasks in the queue is less than 1/2 of the queue size, we can assume that the load on the thread pool is not so high now, so we can consider that if the thread pool has been previously expanded, restore CorePoolSize and MaximumPoolSize back, that is, divide by 2.
Specifically, it is shown in the following picture:
The above is a way that I personally suggest to use thread pools.
Is thread pool necessarily the best solution?
Thread pooling is not the best performance scenario in all cases. If it is a scenario in pursuit of extreme performance, consider using Disruptor, which is a high-performance queue. Excluding Disruptor, will there be a better plan based on JDK alone? The answer is yes.
We know that in a thread pool, multiple threads share a queue, so when there are many tasks, the queue needs to be read and written frequently, and locks are needed to prevent conflicts. In fact, when you read the thread pool source code, you can find that it is full of all kinds of locked code, is there a better way to implement it?
In fact, we can consider creating a list of single-thread thread pools, each of which uses bounded queues to implement multithreading. The advantage of this is that queues in each thread pool are operated by only one thread, so there is no contention problem.
In fact, this idea of exchanging space for time draws lessons from the implementation mechanism of EventLoop in Netty. Just imagine, if the performance of thread pools is really that good, why not Netty?
Other things that need to be noticed.
Scalable thread pools should not be used under any circumstances (thread creation and destruction costs are high).
Unbounded queues should not be used under any circumstances, except for a single test. Bounded queues are commonly used ArrayBlockingQueue and LinkedBlockingQueue, the former based on array implementation, the latter based on linked lists. From a performance point of view, the throughput of LinkedBlockingQueue is higher, but the performance is not stable. In fact, which recommendation should be used to test it by yourself. By the way, Executors's newFixedThreadPool uses LinkedBlockingQueue.
Recommended self-implementation RejectedExecutionHandler,JDK comes with is not very easy to use, you can implement your own logic in it. If you need some specific context information, you can add something of your own to the Runnable implementation class so that you can use it directly in RejectedExecutionHandler.
How not to lose a task?
This actually refers to a special situation, such as a sudden traffic spike, resulting in a very high thread pool load, that is, when the rejection policy is about to be triggered, what can we do to prevent the submitted task from being lost as much as possible. Generally speaking, when this situation is encountered, the alarm should be triggered as soon as possible to notify the R & D staff to deal with it. After that, it is possible to limit the current or increase the number of machines, or even use Kafka, Redis or even databases to store task data temporarily, but after all, distant water cannot put out a near fire. If we want to alleviate the problem as much as possible before formally solving this problem, what can we consider doing?
The first thing to consider is to dynamically increase the number of threads in the thread pool that I mentioned earlier, but if you have already expanded, you should not continue to expand at this time, otherwise the throughput of the system may be lower. In this case, you should implement RejectedExecutionHandler on your own, that is, in the implementation class, open a single thread pool separately, and then call the put method of the getQueue method of the original thread pool to try to plug in the tasks that can't be crammed in again. Of course, it can't be stuffed in when the queue is full, but at least it only blocks the separate thread and does not affect the main process.
Of course, this solution is a temporary solution, and there are many mature practices in the business community in the face of traffic surge, but from the perspective of thread pool, this approach can be regarded as a temporary and effective solution.
Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.