In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "how to understand Java multithreaded CompletionService". In daily operation, I believe many people have doubts about how to understand Java multithreaded CompletionService. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how to understand Java multithreaded CompletionService". Next, please follow the editor to study!
1 CompletionService introduction
CompletionService is used to submit a set of Callable tasks, and its take method returns the Future object corresponding to a completed Callable task.
If you submit a batch task to Executor and want to get the results after they are completed. To do this, you can save the Future of each task into a collection, and then loop through the collection and call Future's get () to fetch the data. Luckily, CompletionService did it for you.
CompletionService integrates the functions of Executor and BlockingQueue. You can submit the Callable task to it for execution, and then use take and poll methods similar to those in the queue to get the result when it is fully available, like a packaged Future.
The take of CompletionService returns which future is completed first, rather than according to the order in which it is submitted.
2 CompletionService source code analysis
First, take a look at the construction method:
Public ExecutorCompletionService (Executor executor) {if (executor = = null) throw new NullPointerException (); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService)? AbstractExecutorService) executor: null; this.completionQueue = new LinkedBlockingQueue ();}
The construction method mainly initializes a blocking queue to store completed task tasks.
Then take a look at the completionService.submit method:
Public Future submit (Callable task) {if (task = = null) throw new NullPointerException (); RunnableFuture f = newTaskFor (task); executor.execute (new QueueingFuture (f)); return f;} public Future submit (Runnable task, V result) {if (task = = null) throw new NullPointerException (); RunnableFuture f = newTaskFor (task, result); executor.execute (new QueueingFuture (f); return f;}
As you can see, the callable task is wrapped as QueueingFuture, and QueueingFuture is a subclass of FutureTask, so the run () method in FutureTask is finally executed.
Take a look at this method:
Public void run () {/ / determines the execution status to ensure that the callable task is run only once if (state! = NEW | |! UNSAFE.compareAndSwapObject (this, runnerOffset, null, Thread.currentThread ()) return; try {Callable c = callable; if (c! = null & & state = = NEW) {V result; boolean ran Try {/ / callback the call method result = c.call () in the callable object we created; ran = true;} catch (Throwable ex) {result = null; ran = false; setException (ex) } if (ran) / / process the execution result set (result);}} finally {runner = null; / / state must be re-read after nulling runner to prevent / / leaked interrupts int s = state; if (s > = INTERRUPTING) handlePossibleCancellationInterrupt (s);}}
You can see that the run method is executed in the FutureTask, and finally the call method in the custom callable is called back. After the execution is finished,
The execution result is processed by set (result):
/ * * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * *
This method is invoked internally by the {@ link # run} method * upon successful completion of the computation. * * @ param v the value * / protected void set (V v) {if (UNSAFE.compareAndSwapInt (this, stateOffset, NEW, COMPLETING) {outcome = v; UNSAFE.putOrderedInt (this, stateOffset, NORMAL); / / final state finishCompletion ();}}
Continue to follow the finishCompletion () method and find the done () method in this method:
Protected void done () {completionQueue.add (task);}
You can see that this method only does one thing, which is to add the task at the end of execution to the queue, and as long as there are elements in the queue, we can get the result of execution when we call the take () method.
At this point, it is clear that the implementation principle of asynchronous non-blocking to obtain execution results is actually realized through queues. FutureTask puts the execution results in the queue, first-in-first-out, and the order in which threads end execution is the order in which the results are obtained.
CompletionService can actually be seen as a combination of Executor and BlockingQueue. When CompletionService receives the task to be executed, it obtains the result of task execution through put and take similar to BlockingQueue. One implementation of CompletionService is that ExecutorCompletionService,ExecutorCompletionService hands over specific computing tasks to Executor.
In implementation, ExecutorCompletionService creates a BlockingQueue (an unbounded queue LinkedBlockingQueue based on a linked list) in the constructor that holds the results of Executor execution. When the calculation is complete, call the done method of FutureTask. When submitting a task to ExecutorCompletionService, first wrap the task as QueueingFuture, which is a subclass of FutureTask, then overwrite the done method of FutureTask, and then put the calculation performed by Executor into BlockingQueue.
The source code of QueueingFuture is as follows:
/ * FutureTask extension to enqueue upon completion * / private class QueueingFuture extends FutureTask {QueueingFuture (RunnableFuture task) {super (task, null); this.task = task;} protected void done () {completionQueue.add (task);} private final Future task } 3 CompletionService implementation task public class CompletionServiceTest {public static void main (String [] args) {ExecutorService threadPool = Executors.newFixedThreadPool (10); CompletionService completionService = new ExecutorCompletionService (threadPool); for (int I = 1; I
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.