Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle of JUC-Future and FutureTask

2025-04-03 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains "what are the principles of JUC-Future and FutureTask". Friends who are interested may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the principle of JUC-Future and FutureTask"?

1 Future

Future represents the life cycle of a task and is a cancelable asynchronous operation. The corresponding methods are provided to judge the status of the task (complete or cancel), as well as to obtain the result of the task and cancel the task. It is suitable for asynchronous tasks with desirability and long execution time.

Many asynchronous task classes in concurrent packages inherit from Future, the most typical of which is FutureTask

1.1 introduction

Future represents the result of an asynchronous calculation. It provides a way to check whether the calculation is completed, to wait for the completion of the calculation, and to obtain the results of the calculation. After the calculation is complete, you can only use the get method to get the results, and if necessary, you can block this method before the calculation is complete. Cancellation is performed by the cancel method. Other methods are also provided to determine whether the task was completed normally or was cancelled. Once the calculation is complete, the calculation can no longer be cancelled. If you use Future for cancellation but do not provide available results, you can declare the Future form type and return null as the result of the underlying task.

That is to say, Future has such characteristics.

Execute asynchronously, and the execution result can be obtained by get method.

If the calculation is not completed, the get method will block. If it is done, it can be obtained many times and the result can be obtained immediately.

If the calculation has not been completed, the calculation can be cancelled

You can query the execution status of the calculation

2 FutureTask

FutureTask provides basic implementations for Future, such as getting task execution results (get) and canceling tasks (cancel), and so on. If the task has not been completed, it will block when getting the result of the task execution. Once the execution is complete, the task cannot be restarted or cancelled (unless the calculation is performed using runAndReset).

FutureTask is often used to encapsulate Callable and Runnable, or it can be submitted to a thread pool for execution as a task. In addition to being a stand-alone class, this class also provides for creating custom task classes. The thread safety of FutureTask is guaranteed by CAS.

FutureTask maintains an int variable modified by volatile-state, which represents the running state of the current task.

NEW: new

COMPLETING: done

NORMAL: normal operation

EXCEPTIONAL: abnormal exit

CANCELLED: task canceled

INTERRUPTING: thread interruption

INTERRUPTED: thread interrupted

Of the seven states, there are four task termination states: NORMAL, EXCEPTIONAL, CANCELLED, and INTERRUPTED. The transitions of various states are as follows:

Data structure and core parameters

/ / callable task held internally, empty private Callable callable; / / the result returned from get () or exception thrown private Object outcome; / / non-volatile, protected by state reads/writes / / thread running callable, CAS operation private volatile Thread runner; / / use Treiber stack to save waiting thread private volatile WaitNode waiters during run

FutureTask inherits Runnale and Future and runs itself as a thread that can be submitted to the thread pool for execution. An inner class WaitNode is maintained, implemented using a simple Treiber stack (unlocked concurrent stack) for storing waiting threads. FutureTask has only one custom synchronizer Sync property, and all methods are delegated to this synchronizer. This is also the general pattern for using AQS in JUC.

Source code parsing

FutureTask's synchronizer because Future is free to get results many times after the task is completed, the AQS used to control synchronization uses shared mode.

The execution state of the underlying tasks of FutureTask is saved in the state of AQS. Whether or not AQS allows threads to get (or block) depends on whether the task is completed, not the specific status value.

Private final class Sync extends AbstractQueuedSynchronizer {/ / defines a constant that represents the execution status of a task. Because the bit operation is used to judge, the state values are powers of 2, respectively. / / indicates that the task is ready and can be executed private static final int READY = 0; / / indicates that the task is in progress private static final int RUNNING = 1; / / indicates that the task has been executed and completed private static final int RAN = 2; / / indicates that the task has been cancelled private static final int CANCELLED = 4 / / the underlying executable object private final Callable callable; / / represents the execution result of the task, which is returned by the get method. Private V result; / / represents the exception in the execution of the task, which is thrown when the get method is called. Private Throwable exception; / * * Thread used to execute the task. Empty after the set/cancel method indicates that the result can be obtained. * must be volatile to ensure visibility after completion (result and exception). * (if runner is not volatile, then both result and exception must be volatile) * / private volatile Thread runner; / * successfully get * / protected int tryAcquireShared (int ignore) {return innerIsDone ()? 1:-1;} / * Let AQS always notify after setting the final completion status, by setting the runner thread to be empty. * this method does not update the state property of AQS, * so visibility is guaranteed by writing to the runner of volatile. * / protected boolean tryReleaseShared (int ignore) {runner = null; return true;} / / the method void innerRun () {/ / to ensure that the task does not repeat if (! compareAndSetState (READY, RUNNING)) return; / / because Future is usually executed asynchronously, runner is usually a thread in the thread pool. Runner = Thread.currentThread (); / / check again after setting the execution thread, check whether it has been asynchronously cancelled before execution / / since the previous CAS has set the state to RUNNING, if (getState () = = RUNNING) {/ / recheck after setting thread V result; / / try {result = callable.call () } catch (Throwable ex) {/ / catch all exceptions thrown during task execution setException (ex); return;} set (result);} else {/ / release waiting thread releaseShared (0) / / cancel}} / / set the result void innerSet (V v) {/ / in a loop to try again after failure. When for (;) {/ / AQS initializes, the state value defaults to 0, which corresponds to the READY state here. Int s = getState (); / / A completed task cannot set the result if (s = = RAN) return / / canceled tasks cannot set the result if (s = = CANCELLED) {/ / releaseShared will set runner to empty, / / this is in consideration of competing with other cancel request threads to interrupt runner releaseShared (0); return } / / before setting if (compareAndSetState (s, RAN)) {result = v; releaseShared (0); / / this method updates runner to ensure the visibility of result done (); return } / / get the result of asynchronous calculation V innerGet () throws InterruptedException, ExecutionException {acquireSharedInterruptibly (0); / / get the share, which will block if it is not completed. / / check whether if (getState () = = CANCELLED) throw new CancellationException (); / / exception if (exception! = null) throw new ExecutionException (exception) occurs during asynchronous computation; return result;} / / cancel task boolean innerCancel (boolean mayInterruptIfRunning) {for ( ) {int s = getState (); / / A completed or cancelled task cannot be cancelled again if (ranOrCancelled (s)) return false; / / Task is in READY or RUNNING if (compareAndSetState (s, CANCELLED)) break } / / after the task is cancelled, interrupt the execution thread if (mayInterruptIfRunning) {Thread r = runner; if (r! = null) r.interrupt ();} releaseShared (0); / / release the thread done () waiting for the access result; return true } / * check whether the task is completed or cancelled * / private boolean ranOrCancelled (int state) {return (state & (RAN | CANCELLED))! = 0;} / / other methods are omitted}

As you can see from the innerCancel method, the cancel operation simply changes the state of the task object and may interrupt the execution thread. If the logical code of the task does not respond to the interrupt, it will be executed asynchronously until it is completed, but the final execution result will not be returned through the get method, and the overhead of computing resources will still exist.

In general, Future is a tool for inter-thread coordination.

AbstractExecutorService.submit (Callable task)

The internal implementation of FutureTask is very simple, starting with the submit analysis of the thread pool. The submit method is implemented in AbstractExecutorService by default. The source codes of several implementations are as follows:

Public Future submit (Runnable task) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task, null); execute (ftask); return ftask;} public Future submit (Runnable task, T result) {if (task = = null) throw new NullPointerException (); RunnableFuture ftask = newTaskFor (task, result); execute (ftask); return ftask;} public Future submit (Callable task) {if (task = null) throw new NullPointerException () RunnableFuture ftask = newTaskFor (task); execute (ftask); return ftask;} protected RunnableFuture newTaskFor (Runnable runnable, T value) {return new FutureTask (runnable, value);} public FutureTask (Runnable runnable, V result) {this.callable = Executors.callable (runnable, result); this.state = NEW; / / ensure visibility of callable}

First call the newTaskFor method to construct FutureTask, then call execute to put the task into the thread pool and return FutureTask

FutureTask.run ()

Public void run () {/ / New task, and CAS replaces runner with the current thread if (state! = NEW | |! UNSAFE.compareAndSwapObject (this, runnerOffset, null, Thread.currentThread ()) return; try {Callable c = callable; if (c! = null & & state = = NEW) {V result; boolean ran Try {result = c.call (); ran = true;} catch (Throwable ex) {result = null; ran = false; setException (ex);} if (ran) set (result) / / set execution result} finally {/ / runner must be non-null until state is settled to / / prevent concurrent calls to run () runner = null; / / state must be re-read after nulling runner to prevent / / leaked interrupts int s = state; if (s > = INTERRUPTING) handlePossibleCancellationInterrupt (s); / / handle interrupt logic}

Run the task, and if the task state is NEW, it is modified to the current thread using CAS. After execution, call the set (result) method to set the execution result. The set (result) source code is as follows

First of all, use cas to modify the state state to

Set the returned result, and then set the state status to lazySet (UNSAFE.putOrderedInt)

After the result is set, call finishCompletion () to wake up the waiting thread

Private void finishCompletion () {for (WaitNode Q; (Q = waiters)! = null;) {if (UNSAFE.compareAndSwapObject (this, waitersOffset, Q, null)) {/ / remove waiting thread for (;;) {/ / spin traversing waiting thread Thread t = q.thread; if (t! = null) {q.thread = null LockSupport.unpark (t); / / Wake up waiting thread} WaitNode next = q.next; if (next = = null) break; q.next = null; / / unlink to help gc q = next;} break }} / / call the function after the task is completed, and customize the extension done (); callable = null; / / to reduce footprint}

Back to the run method, if it is interrupted during run, you need to call handlePossibleCancellationInterrupt to handle the interrupt logic to ensure that any interrupts (such as cancel (true)) only stay in the current run or runAndReset task

Private void handlePossibleCancellationInterrupt (int s) {/ / may delay before the interrupter interrupts the thread, so we just need to let the CPU time slice spin and wait for if (s = = INTERRUPTING) while (state = = INTERRUPTING) Thread.yield (); / / wait out pending interrupt}

FutureTask.runAndReset ()

RunAndReset is another method of FutureTask task execution, which does not return the execution result, and resets the status of stat to NEW after the task has been executed, so that the task can be executed multiple times. A typical application of runAndReset is to perform tasks periodically in ScheduledThreadPoolExecutor.

FutureTask.get ()

FutureTask gets the result of task execution through get (). If the task is in an unfinished state (state COMPLETING) {if (Q! = null) q.thread = null;// leaves the thread return s waiting for the node;} else if (s = = COMPLETING) / / cannot time out yet Thread.yield (); else if (Q = = null) Q = new WaitNode () Else if (! queued) / / CAS modifies waiter queued = UNSAFE.compareAndSwapObject (this, waitersOffset, q.next = waiters, Q); else if (timed) {nanos = deadline-System.nanoTime () If (nanos COMPLETING), empty the thread waiting for the node as needed and return the Future state

3. If it is currently in the process of completion (COMPLETING), the Future cannot make a timeout at this time, and the CPU execution time slot is released for the task.

4. If state is NEW, create a new WaitNode first, and then CAS modifies the current waiters

5. If the wait times out, removeWaiter is called to remove the waiting node and return to the task status; if the timeout is set but has not timed out, park blocks the current thread

6. Other situations directly block the current thread

FutureTask.cancel (boolean mayInterruptIfRunning)

Public boolean cancel (boolean mayInterruptIfRunning) {

/ / if the current Future status is NEW, change the Future status to INTERRUPTING or CANCELLED according to the parameters

If (! (state = = NEW & &

UNSAFE.compareAndSwapInt (this, stateOffset, NEW

MayInterruptIfRunning? INTERRUPTING: CANCELLED)

Return false

Try {/ / in case call to interrupt throws exception

If (mayInterruptIfRunning) {/ / can be interrupted at run time

Try {

Thread t = runner

If (t! = null)

T.interrupt ()

} finally {/ / final state

UNSAFE.putOrderedInt (this, stateOffset, INTERRUPTED)

}

}

} finally {

FinishCompletion (); / / remove and wake up all waiting threads

}

Return true

}

Description: try to cancel the task. This operation will fail if the task has been completed or cancelled. If the current Future status is NEW, modify the Future status to INTERRUPTING or CANCELLED according to the parameters. If the current state is not NEW, it can also be interrupted while the task is running according to the parameter mayInterruptIfRunning. After the interrupt operation is complete, the call finishCompletion removes and wakes up all waiting threads.

Example

At this point, I believe you have a deeper understanding of "what is the principle of JUC-Future and FutureTask". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report