In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article introduces the relevant knowledge of "how to understand FutureTask and Future source code with thread pool". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1.FutureTask inheritance architecture
two。 Start with a simple example
I won't say much about multithreaded Runnable and Callable interfaces here. Callable has a return value and Runnable has no return value.
Public class FutureTaskTest {public static void main (String [] args) {ExecutorService executor = null; try {/ / Thread pool submits Runnable interface task executor.execute (new MyRunnable ()); / / Thread pool submits Callable interface task executor = Executors.newFixedThreadPool (2); Future f = executor.submit (new MyCallLable ()) System.out.println (f.get ()); / / single-threaded FutureTask ft = new FutureTask (new MyCallLable ()); Thread t = new Thread (ft); t.start (); System.out.println (ft.get ());} catch (InterruptedException e) {e.printStackTrace () } catch (ExecutionException e) {e.printStackTrace ();} finally {if (executor! = null) {executor.shutdown ();} static class MyCallLable implements Callable {@ Override public Object call () throws Exception {return 1 }} static class MyRunnable implements Runnable {@ Override public void run () {System.out.println (2);}} 3. Analyze FutureTask source code 3.1.Executors.newFixedThreadPool by thread pool submission public static ExecutorService newFixedThreadPool (int nThreads) {return new ThreadPoolExecutor (nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ());}
This method creates a thread pool with the same number of core threads and the maximum number of threads, and uses an unbounded queue like LinkedBlockingQueue to store redundant tasks, that is, if we use the threads that come with this jdk to submit tasks, because the queue is unbounded, it will cause memory overflow when the tasks reach a certain number. No longer analyze the ThreadPoolExecutor code here, if you are interested, you can see another blog post that specializes in analyzing ThreadPoolExecutor source code. The method returns an ExecutorService.
The ThreadPoolExecutor inheritance system is shown below:
3.2.ExecutorService.submit method analysis
This method actually calls the implementation class AbstractExecutorService.submit method.
Public Future submit (Callable task) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task); execute (ftask); return ftask;}
The newTaskFor method here passes the Callable task to the FutureTask class and encapsulates it in its Callable property
Protected RunnableFuture newTaskFor (Callable callable) {return new FutureTask (callable);} 3.3.FutureTask attribute analysis / * possible thread state transitions: * NEW-> COMPLETING-> NORMAL * NEW-> COMPLETING-> EXCEPTIONAL * NEW-> CANCELLED * NEW-> INTERRUPTING-> INTERRUPTED * / / current task status private volatile int state;// newly created private static final int NEW = 0 / / it is coming to an end, but it is not over yet. Private static final int COMPLETING = 1 flagging private static final int NORMAL / normal ending private static final int NORMAL = 2 racking / abnormal state: an exception occurred in the specific business logic in the Call method of the Callable interface. Private static final int EXCEPTIONAL = 3Exception / task is cancelled / task is interrupted / task is in interruption private static final int INTERRUPTING = 5bot / task is interrupted private static final int INTERRUPTED = 6 / / the task submits the incoming Callable, which is used to call the call method private Callable callable;//Call method to return the value / / 1. If the task ends normally, return the return value of the call method / / 2. If an exception occurs in the call method, return specific exception information private Object outcome;// the currently executing thread private volatile Thread runner;// a stack structure data type, store references to threads blocked by the get method private volatile WaitNode waiters;3.4.FutureTask constructor analysis public FutureTask (Callable callable) {/ / external need to pass in the Callable interface if (callable = = null) throw new NullPointerException (); this.callable = callable / / set the thread state to first create this.state = NEW;} 3.5.FutureTask to perform process analysis 3.5.1. Call procedure Analysis in the case of submitting Callable Interface using Thread Pool
Step by step analysis from the example of the thread pool submitting the Calllable interface: the 1.executor.submit (new MyCallLable ()) method submits a Callable implementation; 2. The first step actually calls the AbstractExecutorService.submit method; 3.AbstractExecutorService.submit internally calls the newTaskFor method to generate a FutureTask object and encapsulates the MyCallLable task in its Calllable property; the 4.AbstractExecutorService.submit method internally calls the ThreadPoolExecutor.execute method to submit the FutureTask object to the thread pool; 5-6-7-8. In fact, the thread pool is to submit the execution process of a task, the specific source code can see my other blog, here is more complex, an overview of the next; 9-10. Thread pool execute actually executes the run method of FutureTask, calling Callable.call in the run method, which is the process that the thread pool submits to Callable for execution 3.5.2.FutureTask.run method analyzes public void run () {/ / condition 1: the current task state is not a new state / / condition 2: the current thread is not a thread held by FutureTask if (state! = NEW | |! UNSAFE.compareAndSwapObject (this, runnerOffset, null, Thread.currentThread () / / exits the execution of return Try {/ / current FutureTask holds callable Callable c = callable; / / condition 1: the currently submitted Callable cannot be empty / / condition 2: the current thread task status is newly created if (c! = null & & state = = NEW) {/ / Callable return value V result; / / whether the task successfully executed boolean ran Try {/ / call the logic of the user-defined call method result = c.call (); / / the task successfully executes ran = true;} catch (Throwable ex) {/ / exception result = null; ran = false SetException (ex);} / / the return value of task successful execution setting if (ran) set (result);}} finally {/ / run method end holding thread is set to empty, help gc / / here the run method may be executed normally, or an exception may occur to exit runner = null / / current task execution status int s = state; / / if it is in an interrupted state, including interrupted and interrupted, release cpu resources if (s > = INTERRUPTING) handlePossibleCancellationInterrupt (s);}} 3.5.3.FutureTask.set method analysis
This method sets the execution result state and the return value after the successful execution of the task, encapsulates the return value in the outcome property, and wakes up the blocked thread because the get method is blocked.
Protected void set (V v) {/ / sets the state from new to ending if (UNSAFE.compareAndSwapInt (this, stateOffset, NEW, COMPLETING)) {/ / the return value outcome = v; / / sets the task status to normal ending UNSAFE.putOrderedInt (this, stateOffset, NORMAL); / / wakes up threads blocked by the get method finishCompletion () }} 3.5.6.FutureTask static inner class WaitNode analysis
Before analyzing the finishCompletion method, let's introduce the WaitNode class. Why is there such a class? We know that the FutureTask.get method is blocked, and if we call the get method multiple times in one thread, we don't need WaitNode in theory; what if we create threads multiple times to call the get method within other threads? Since LockSupport.park (Thread) or LockSupport.parkNanos is called inside the FutureTask.get method to block the thread, you need to wake up; and LockSupport.unpark (Thread) also needs to specify a thread to unblock, so you need a data structure to store references to the current thread. Here is the design of the WaitNode class, it is a single linked list, and uses the head insertion method, in the traversal is also traversed from the back, this is a typical stack structure, first-in-first-out, last-in-first-out. Why is there a single linked list structure here? This is to facilitate traversal at the end of the task.
Static final class WaitNode {/ / reference to the current thread volatile Thread thread; / / points to the next node volatile WaitNode next; WaitNode () {thread = Thread.currentThread ();}} 3.5.7.FutureTask.finishCompletion method analysis
Used to wake up threads blocked by get methods
Private void finishCompletion () {/ / assert state > COMPLETING; / / traverses for from scratch (WaitNode Q; (Q = waiters)! = null;) {/ / sets the current waiters to be empty using cas method to prevent external threads from calling cancel and causing the method to be called if (UNSAFE.compareAndSwapObject (this, waitersOffset, Q, null)) {for ( ) {/ / get the current WaitNode corresponding thread Thread t = q.thread; if (t! = null) {q.thread = null; / / help gc / / wake up the current node corresponding thread LockSupport.unpark (t) } / / get the next node of the current node WaitNode next = q.next; if (next = = null) break; q.next = null;//help gc / / point Q down to the node Q = next } break;}} done (); / / leave callable empty, help gc callable = null;} 3.5.8.FutureTask.setException method analysis
This method sets the return value to the thrown exception, sets the task state to EXCEPTIONAL state, and calls the finishCompletion method to wake up threads blocked by get.
Protected void setException (Throwable t) {if (UNSAFE.compareAndSwapInt (this, stateOffset, NEW, COMPLETING)) {outcome = t; UNSAFE.putOrderedInt (this, stateOffset, EXCEPTIONAL); / / final state finishCompletion ();}}
3.5.9.FutureTask.handlePossibleCancellationInterrupt method analysis
Private void handlePossibleCancellationInterrupt (int s) {/ / if the task state is interrupted, release the cpu resource if (s = = INTERRUPTING) while (state = = INTERRUPTING) Thread.yield (); / / wait out pending interrupt} 3.5.9.FutureTask.get and FutureTask.get (long timeout, TimeUnit unit) method analysis
There is little difference between the two methods, the only difference is that LockSupport.parkNanos (this, nanos) and LockSupport.park (this) are used when blocking threads. When there are time conditions, LockSupport.parkNanos (this, nanos) will automatically wake up the thread after the end of the specified time.
Here's the difference between sleep and LockSupport.parkNanos: sleep will judge the interrupt status after the expiration of the specified time, and determine whether an exception needs to be thrown according to the interrupt status, while LockSupport.parkNanos will not respond according to the interrupt status. Public V get () throws InterruptedException, ExecutionException {int s = state; if (s)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.