In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the knowledge of "how to achieve asynchronous callback in Java8 through CompletableFuture". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
1 what is CompletableFuture?
CompletableFuture, a new class in Java 8, is an extension to the Future interface. From the class inheritance diagram below, we can see that it implements not only the Future interface, but also the CompletionStage interface. When the Future needs to be completed explicitly, we can use the CompletionStage interface to support the functions and operations triggered when it is completed. When more than two threads attempt to complete, exception complete, or cancel a CompletableFuture at the same time, only one can succeed.
The main function of CompletableFuture is to simplify the complexity of asynchronous programming, support functional programming, and process the calculation results through callback.
2 Why is there CompletableFuture?
In java5, JDK provides us with Callable and Future, so that we can easily obtain the results of asynchronous tasks, but obtaining the results of asynchronous tasks through Future's get will lead to blocking of the main thread, which is very CPU-consuming in some scenarios, so Java8 provides us with CompletableFuture, so that we do not have to block waiting, but through callbacks to process the results It also supports streaming, combining asynchronous tasks and other operations.
If you are not familiar with Callable and Future, you can read this article updated by the editor earlier. Java looks at asynchronous task calculation FutureTask from source code.
3 CompletableFuture is easy to use
Let's make a simple classification of the use of CompletableFuture:
Create a task:
SupplyAsync/runAsync
Asynchronous callback:
ThenApply/thenAccept/thenRun
ThenApplyAsync/thenAcceptAsync/thenRunAsync
Exceptionally
Handle/whenComplete
Combined processing:
ThenCombine / thenAcceptBoth / runAfterBoth
ApplyToEither / acceptEither / runAfterEither
ThenCompose
AllOf / anyOf
Please refer to the following case for details:
Public static void main (String [] args) throws Exception {/ / 1. Asynchronous task with return value (no thread pool specified, default ForkJoinPool.commonPool (), single-core ThreadPerTaskExecutor) CompletableFuture cf1 = CompletableFuture.supplyAsync (()-> {return 1 + 1;}); System.out.println ("cf1 result:" + cf1.get ()); / / 2. Asynchronous task with no return value (no thread pool specified, default ForkJoinPool.commonPool (), single-core ThreadPerTaskExecutor) CompletableFuture cf2 = CompletableFuture.runAsync (()-> {int a = 1 + 1;}); System.out.println ("cf2 result:" + cf2.get ()); / / 3. Specify the asynchronous task with return value of the thread pool, runAsync is the same as CompletableFuture cf3 = CompletableFuture.supplyAsync (()-> {return 1 + 1;}, Executors.newCachedThreadPool ()); System.out.println ("cf3 result:" + cf3.get ()); / / 4. Callback: CompletableFuture cf4 = cf1.thenApply ((result)-> {System.out.println ("cf4 callback gets the result of cf1 result:" + result); return result + 1;}); System.out.println ("cf4 result:" + cf4.get ()); / / 5. Asynchronous callback (submit the callback task to the thread pool). The actions executed after the completion of the task are executed asynchronously CompletableFuture cf5 = cf1.thenApplyAsync ((result)-> {System.out.println ("cf5 callback gets the result of cf1 result:" + result); return result + 1;}); System.out.println ("cf5 result:" + cf5.get ()) / / 6. Callback (same as thenApply but no result returned): CompletableFuture cf6 = cf1.thenAccept ((result)-> {System.out.println ("cf6 callback gets the result of cf1 result:" + result);}); System.out.println ("cf6 result:" + cf6.get ()); / / 7. Callback (same as thenAccept but no input parameters): CompletableFuture cf7 = cf1.thenRun (()-> {}); System.out.println ("cf7 result:" + cf7.get ()); / / 8. Exception callback: CompletableFuture cf = CompletableFuture.supplyAsync (()-> {throw new RuntimeException ("exception occurs"); CompletableFuture cf8 = cf.exceptionally ((result)-> {return-1;}); System.out.println ("cf8 result:" + cf8.get ()); / / 9. The callback method executed after the completion of a task will pass the execution result or the exception thrown during execution to the callback method / / if it is normal, the exception is null, and the result of the CompletableFuture corresponding to the callback method is the same as the task; / / if the task is executed normally, the get method returns the execution result, and if it is an execution exception, the get method throws an exception. CompletableFuture cf9 = cf1.handle ((a, b)-> {if (b! = null) {b.printStackTrace ();} return a;}); System.out.println ("cf9 result:" + cf9.get ()) / / 10 similar to handle, no return value try {CompletableFuture cf10 = cf.whenComplete ((a, b)-> {if (b! = null) {b.printStackTrace ();}}); System.out.println ("cf10 result:" + cf10.get ()) } catch (Exception e) {System.out.println ("cf10 has an exception!!") ;} / / 11 combined processing (both completed and then executed) have input parameters and return values CompletableFuture cf11 = cf1.thenCombine (cf3, (R1, R2)-> {return R1 + R2;}); System.out.println ("cf11 result:" + cf11.get ()) / / 12 combined processing (both completed and then executed) has input parameters, and no return value CompletableFuture cf12 = cf1.thenAcceptBoth (cf3, (R1, R2)-> {}); System.out.println ("cf12 result:" + cf12.get ()) / / 13 combined processing (both completed and then executed) No input parameter, no return value CompletableFuture cf13 = cf1.runAfterBoth (cf3, ()-> {}); System.out.println ("cf13 result:" + cf13.get ()) / / 14 combined processing (one is completed and then executed) has input parameters and the return value CompletableFuture cf14 = cf1.applyToEither (cf3, (r)-> {return r;}); System.out.println ("cf14 result:" + cf14.get ()) / / 15 combined processing (one is completed and then executed) has input parameters and no return value CompletableFuture cf15 = cf1.acceptEither (cf3, (r)-> {}); System.out.println ("cf15 result:" + cf15.get ()) / / 16 combined processing (one is completed and then executed) No input parameter, no return value CompletableFuture cf16 = cf1.runAfterEither (cf3, ()-> {}); System.out.println ("cf16 result:" + cf16.get ()) The / / 17 method returns a new CompletableFuture CompletableFuture cf17 = cf1.thenCompose ((r)-> {return CompletableFuture.supplyAsync (()-> {return 1 + 1;});}); System.out.println ("cf17 result:" + cf17.get ()) / / more than 18 tasks will be executed successfully before CompletableFuture.allOf (cf1,cf2,cf3) .whenComplete ((r, t)-> {System.out.println (r);}) / / any one of the 18 tasks will continue to execute CompletableFuture.anyOf (cf1,cf2,cf3). WhenComplete ((r, t)-> {System.out.println (r);});} 4 CompletableFuture source code analysis
First of all, we can see from the comments that it describes some of the CompletionStage and Future interface extensions, which are also some of its key points.
In addition to the relevant methods for directly manipulating the status and results, CompletableFuture also implements the following policies for the CompletionStage interface:
(1) the operations provided for dependent completion of non-asynchronous methods can be performed by the thread that completes the current CompletableFuture or by any other caller who completes the method.
(2) all asynchronous methods without explicit Executor parameters are executed using ForkJoinPool.commonPool () (unless it does not support at least two levels of parallelism, in which case a new thread is created to run each task). To simplify monitoring, debugging, and tracing, all generated asynchronous tasks are instances of CompletableFuture that complete tasks asynchronously.
If you don't know anything about ForkJoinPool, you can read this article updated by the editor to take you to know the ForkJoin in Java.
(3) all CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not affected by other method overrides in the subclass.
CompletableFuture implements the following policies for the Future interface:
Because (unlike FutureTask) this class has no direct control over the calculation that causes it to complete, canceling is considered another form of exception completion, so the cancel operation is considered another form of exception completion (new CancellationException () has the same effect.) . The method isCompletedExceptionally () can be used to determine whether a CompletableFuture is completed in any abnormal way.
If CompletionException occurs when the exception completes, the methods get () and get (long,TimeUnit) throw an ExecutionException for the same reason as in the corresponding CompletionException. To simplify use in most contexts, the class also defines join () and getNow () methods, in which case CompletionException is thrown directly.
4.1 create an asynchronous task
Let's take a look at how CompletableFuture creates asynchronous tasks. We can see that the core implementation of creating asynchronous tasks is two input parameters, one is Executor, and the other is Supplier (functional programming interface). It also provides an overload of the input parameter, and the overload method of the input parameter will get the default Executor. It will use ThreadPerTaskExecutor when the system is single-core and ForkJoinPool.commonPool () when the system is multi-core.
Note: ForkJoinPool.commonPool () thread pool is the default here. If all asynchronous tasks use this thread pool, it is not easy to locate the problem. If the thread pool is occupied for a long time, it may affect the normal operation of other businesses. The parallel flow of stream also uses this thread pool.
It also encapsulates the static inner class AsyncSupply, which represents the asynchronous task, implements Runnable, and rewrites the run method.
Private static final Executor asyncPool = useCommonPool? ForkJoinPool.commonPool (): new ThreadPerTaskExecutor (); private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism () > 1); public static CompletableFuture supplyAsync (Supplier supplier) {return asyncSupplyStage (asyncPool, supplier);} static CompletableFuture asyncSupplyStage (Executor e, Supplier f) {if (f = = null) throw new NullPointerException (); CompletableFuture d = new CompletableFuture () E.execute (new AsyncSupply (d, f)); return d;} / * static inner class, inheriting ForkJoinTask, implementing Runnable, AsynchronousCompletionTask * / static final class AsyncSupply extends ForkJoinTask implements Runnable, AsynchronousCompletionTask {CompletableFuture dep; Supplier fn; AsyncSupply (CompletableFuture dep, Supplier fn) {this.dep = dep; this.fn = fn } public final Void getRawResult () {return null;} public final void setRawResult (Void v) {} public final boolean exec () {run (); return true;} public void run () {CompletableFuture d; Supplier f; if ((d = dep)! = null & & (f = fn)! = null) {dep = null; fn = null If (d.result = = null) {try {d.completeValue (f.get ());} catch (Throwable ex) {d.completeThrowable (ex);}} d.postComplete () }}}
The Supplier class is a functional interface, and the @ FunctionalInterface annotation is the tag for functional programming.
Package java.util.function;@FunctionalInterfacepublic interface Supplier {T get ();} 4.2 Asynchronous Task callback
Asynchronous task callback. Let's take thenApply/thenApplyAsync as an example to see how it works. A method whose name contains Async will be passed into asyncPool. The uniApplyStage method distinguishes which method comes in by determining whether e has a value or not. ThenApply does not pass in Executor, it gives priority to the current thread to perform subsequent stage tasks.
When it is found that the previous stage has been executed, let the current thread execute the task of the subsequent stage directly.
When it is found that the previous stage is not finished, the current stage is wrapped as a UniApply object and placed on the stack of the previous stage. The thread that executes the previous stage, after execution, then executes the task of the subsequent stage.
ThenApplyAsync passes in an Executor, which always lets threads in the Executor thread pool perform subsequent stage tasks.
Wrap the current stage as a UniApply object, put it on the stack of the previous stage, and let Executor execute it directly.
Public CompletableFuture thenApply (Function c) {if (c! = null) {while (result = = null & &! tryPushStack (c)) lazySetNext (c, null); / / clear on failure}} final boolean completeValue (T t) {return UNSAFE.compareAndSwapObject (this, RESULT, null, (t = null)? NIL: t);} 4.3 Asynchronous Task combination
Let's take a look at the thenCombine method as an example to see how CompletableFuture handles the combination task. We can see that the source code of thenCombine and the source code of thenApply are basically the same, but the combination is not just to judge one, but to gather specific scenarios and judge multiple CompletableFuture.
Public CompletableFuture thenCombine (CompletionStage
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.