In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the knowledge of "how to understand Java 8 asynchronous programming CompletableFuture". Many people will encounter this dilemma in the operation of actual cases, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
A quick look at the outline of this article
I. Asynchronous programming
Generally speaking, programs are executed sequentially, and only one thing happens at a time. If a function depends on the result of another function, it can only wait for that function to finish before it can continue to execute. From the user's point of view, the entire program is finished. But today's computers generally have multicore CPU, so there is no point in waiting there. You can do other work on another processor core, and you will be notified when the time-consuming task is over. This is the starting point of asynchronous programming: make full use of the advantages of multicore CPU to maximize program performance. In a word: asynchronous programming is to implement a method that allows an operation to continue without waiting for the return value of the called function.
Second, throw out a question: how to realize the procedure of boiling water and making tea
Finally, we will use the traditional method and Java8 asynchronous programming method to compare the implementation complexity.
Third, asynchronous programming realized by Future of Java5.
Future is a class added by Java 5 to describe the result of an asynchronous calculation. You can use the isDone () method to check whether the calculation is complete, or you can use the get () method to block the calling thread until the calculation returns a result, or you can use the cancel () method to stop the task.
Public static void main (String [] args) throws InterruptedException, ExecutionException {ExecutorService es = Executors.newFixedThreadPool (5); Future f = es.submit (()-> 100); System.out.println (f.get ()); es.shutdown ();}
Although Future provides the ability to execute tasks asynchronously, it is very inconvenient to obtain the results, and the results of the task can only be obtained by blocking or polling. Blocking is obviously contrary to the original intention of asynchronous programming, polling will consume unnecessary CPU resources, and can not get the results in time.
Of course, many other languages use callbacks to implement asynchronous programming, such as some frameworks of Node.js;Java, such as Netty,Google Guava also extends the Future interface, provides a lot of callback mechanisms, encapsulates tool classes, and assists asynchronous programming development.
Java, as an established programming language, is naturally not out of date. In Java 8, a new class containing more than 50 methods is added: CompletableFuture, which provides a very powerful Future extension that can help us simplify the complexity of asynchronous programming and provide functional programming capabilities.
IV. Overview of CompletableFuture class functions
The following figure shows the interface implemented by CompletableFuture:
It implements the Future interface, with all the features of Future, for example, you can use the get () method to get the return value, etc.; it also implements the CompletionStage interface, which has more than 40 methods, which is too rich, it is mainly to orchestrate the workflow of the task.
We can classify the relationship between workflow and workflow into three types: serial relationship, parallel relationship and aggregation relationship.
Serial relation
The following api is provided to implement it (take a quick look at it first):
CompletionStage thenApply (fn); CompletionStage thenApplyAsync (fn); CompletionStage thenAccept (consumer); CompletionStage thenAcceptAsync (consumer); CompletionStage thenRun (action); CompletionStage thenRunAsync (action); CompletionStage thenCompose (fn); CompletionStage thenComposeAsync (fn)
Parallel relation
Multithreaded asynchronous execution is a parallel relationship.
Convergence relationship
The aggregation relationship is divided into AND aggregation relationship and OR aggregation relationship.
AND aggregation relationship, that is, all dependent tasks are completed before execution; OR aggregation relationship, that is, when one of the dependent tasks is completed, execution begins.
AND aggregation relationships are expressed by these interfaces:
CompletionStage thenCombine (other, fn); CompletionStage thenCombineAsync (other, fn); CompletionStage thenAcceptBoth (other, consumer); CompletionStage thenAcceptBothAsync (other, consumer); CompletionStage runAfterBoth (other, action); CompletionStage runAfterBothAsync (other, action)
OR aggregation relationships are expressed by these interfaces:
CompletionStage applyToEither (other, fn); CompletionStage applyToEitherAsync (other, fn); CompletionStage acceptEither (other, consumer); CompletionStage acceptEitherAsync (other, consumer); CompletionStage runAfterEither (other, action); CompletionStage runAfterEitherAsync (other, action)
5. Detailed description of CompletableFuture interface
1. Submit the static method for execution
Method name description
The method name describes that runAsync (Runnable runnable) executes asynchronous code, uses ForkJoinPool.commonPool () as its thread pool runAsync (Runnable runnable, Executor executor) to execute asynchronous code, uses the specified thread pool supplyAsync (Supplier supplier) to execute code asynchronously, has a return value, uses ForkJoinPool.commonPool () as its thread pool supplyAsync (Supplier supplier, Executor executor) asynchronous execution code, has a return value, and uses the specified thread pool to execute asynchronously
The above four methods all submit tasks. The runAsync method needs to pass a method that implements the Runnable interface, and the supplyAsync needs to pass a method that implements the Supplier interface, implements the get method, and returns a value.
(1) the difference between run and supply
Run simply executes a method with no return value, and supply executes a method with a return value.
(2) the difference between one parameter and two parameters
The second parameter is the thread pool. If it is not passed, the built-in ForkJoinPool.commonPool () is used as the thread pool. The default number of threads created in this thread pool is the number of cores of CPU (you can also set the number of threads in the ForkJoinPool thread pool through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism).
2. Serial relationship api
The main difference between these api is whether you can get the return value of the previous task and whether you have the return value or not.
Whether api can get the return value of the previous task whether there is a return value thenApply can have thenAccept can not have thenRun can not be without thenCompose
(1) use of thenApply and thenApplyAsync
ThenApply and thenApplyAsync serialize two parallel tasks, and the other task does some processing and transformation after getting the return value of the previous task. It also has a return value.
Public class BasicFuture4 {@ Data @ AllArgsConstructor @ ToString static class Student {private String name;} public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync (()-> "Jack") .thenApply (s-> s + "Smith") .thenApply (String::toUpperCase) .thenApplyAsync (Student::new) System.out.println (future.get ());}}
As you can see, the input is a string, spliced a string, converted to uppercase, and new returned by a Student object.
BasicFuture4.Student (name=JACK SMITH)
Along with thenApply, thenAccept and thenRun,thenAccept can get the return value of the previous task, but there is no return value themselves; thenRun cannot get the return value of the previous task, and there is no return value itself.
(2) the difference between thenApply and thenApplyAsync
The difference between the two methods lies in who performs the task. If thenApplyAsync is used, the thread of execution is fetched from ForkJoinPool.commonPool () or a self-defined thread pool to execute. If you use thenApply, there are two cases, if the supplyAsync method is particularly fast, then the thenApply task is executed using the main thread, and if the supplyAsync execution is particularly slow, it is the same as the supplyAsync thread.
You can use the following example to demonstrate:
Package com.dsj361.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; / * * @ Author wangkai * / public class BasicFuture8 {public static void main (String [] args) throws ExecutionException, InterruptedException {System.out.println ("- supplyAsync execution is fast"); CompletableFuture future1 = CompletableFuture.supplyAsync (()-> {System.out.println (Thread.currentThread (). GetName () Return "1";}) .thenApply (s-> {System.out.println (Thread.currentThread (). GetName ()); return "2";}); System.out.println (future1.get ()); System.out.println ("- supplyAsync execution is slow") CompletableFuture future2 = CompletableFuture.supplyAsync (()-> {try {Thread.sleep (1000);} catch (InterruptedException e) {} System.out.println (Thread.currentThread (). GetName ()); return "1" }) .thenApply (s-> {System.out.println (Thread.currentThread (). GetName ()); return "2";}); System.out.println (future2.get ());}}
Execution result:
-supplyAsync execution is fast ForkJoinPool.commonPool-worker-1 main 2-supplyAsync execution is slow ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 2
(3) the use of thenCompose
Suppose there are two asynchronous tasks, and the second task wants to get the return value of the first task and do the operation, we can use thenCompose. At this point, you can also use thenApply. Look at a piece of code to find out the difference:
Public class BasicFuture9 {public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture future = getLastOne (). ThenCompose (BasicFuture9::getLastTwo); System.out.println (future.get ()); CompletableFuture future2 = getLastOne (). ThenApply (s-> getLastTwo (s)); System.out.println (future2.get (). Get ()) } public static CompletableFuture getLastOne () {return CompletableFuture.supplyAsync (()-> "topOne");} public static CompletableFuture getLastTwo (String s) {return CompletableFuture.supplyAsync (()-> s + "topTwo");}}
You can see that when using thenApply, you need to use two get () methods to get the final return value, and you only need one using thenCompose.
3. And aggregation relationship Api
(1) the use of thenCombine
Add that if we want to calculate the sum of the return values of the two asynchronous methods, we have to wait until the two asynchronous tasks have been calculated before the sum can be calculated, which can be done with thenCombine.
Public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture thenComposeOne = CompletableFuture.supplyAsync (()-> 1992); CompletableFuture thenComposeTwo = CompletableFuture.supplyAsync (()-> 196); CompletableFuture thenComposeCount = thenComposeOne .thenCombine (thenComposeTwo, (smemy)-> s + y); thenComposeOne.thenAcceptBoth (thenComposeTwo, (srecoy)-> System.out.println ("thenAcceptBoth")); thenComposeOne.runAfterBoth (thenComposeTwo, ()-> System.out.println ("runAfterBoth")) System.out.println (thenComposeCount.get ());}
You can see that the second argument to thenCombine is a Function function, which is used to perform some operations after the first two asynchronous tasks have been completed.
(2) thenAcceptBoth
Receives the results of the previous two asynchronous tasks and executes a callback function, but this callback function does not return a value.
(3) runAfterBoth
Receives the results of the previous two asynchronous tasks, but the callback function does not receive parameters and does not return values.
4. Or aggregation relationship Api
Public class BasicFuture11 {public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture thenComposeOne = CompletableFuture.supplyAsync (()-> 192); CompletableFuture thenComposeTwo = CompletableFuture.supplyAsync (()-> 196); CompletableFuture thenComposeCount = thenComposeOne .applyToEither (thenComposeTwo,s-> s + 1); thenComposeOne.acceptEither (thenComposeTwo,s-> {}); thenComposeOne.runAfterEither (thenComposeTwo, ()-> {}) System.out.println (thenComposeCount.get ());}}
(1) applyToEither
Any one executes the callback method after execution, and the callback method receives a parameter with a return value
(2) acceptEither
Any one executes the callback method after execution, and the callback method receives a parameter with no return value.
(3) runAfterEither
Any one executes the callback method after execution, and the callback method does not receive parameters and no return value
5. Handle exceptions
Above we talked about how to orchestrate several asynchronous tasks to perform some serial or aggregation operations. Another important thing is exception handling.
Let's take a look at the following example:
Public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync (()-> {System.out.println ("execute one"); return 100;}) .thenApply (s-> 10 / 0) .thenRun (()-> System.out.println ("thenRun")) .thenAccept (s-> System.out.println ("thenAccept")) CompletableFuture.runAsync (()-> System.out.println ("other");}
Results:
Execute one other
It can be found that as long as there is an exception in a task on the chain, the tasks under the chain are no longer executed.
But the rest of the code on the main method will still be executed.
So at this time, it is necessary to handle exceptions reasonably to complete some finishing touches.
Public class BasicFuture12 {public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync (()-> {System.out.println ("execute one"); return 100 ) .thenApply (s-> 10 / 0) .thenRun (()-> System.out.println ("thenRun")) .thenAccept (s-> System.out.println ("thenAccept")) .exceptionally (s-> {System.out.println ("exception handling") Return null;}); CompletableFuture.runAsync (()-> System.out.println ("other"));}}
You can use exceptionally to handle exceptions.
You can also handle exceptions using the handle () method. But the difference with the handle () method is that it executes even if no exception occurs.
6. the realization of the program of boiling water and making tea.
1. Use Thread multithreading and CountDownLatch to implement
Public class MakeTee {private static CountDownLatch countDownLatch = new CountDownLatch (2); static class HeatUpWater implements Runnable {private CountDownLatch countDownLatch; public HeatUpWater (CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;} @ Override public void run () {try {System.out.println ("washing kettle"); Thread.sleep (1000) System.out.println ("boiling water"); Thread.sleep (5000); countDownLatch.countDown ();} catch (InterruptedException e) {} static class PrepareTee implements Runnable {private CountDownLatch countDownLatch; public PrepareTee (CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch } @ Override public void run () {try {System.out.println ("washing teapots"); Thread.sleep (1000); System.out.println ("washing cups"); Thread.sleep (1000); System.out.println ("taking tea") Thread.sleep (1000); countDownLatch.countDown ();} catch (InterruptedException e) {} public static void main (String [] args) throws InterruptedException {new Thread (new HeatUpWater (countDownLatch)) .start (); new Thread (new PrepareTee (countDownLatch)) .start (); countDownLatch.await () System.out.println ("ready, start making tea");}}
Here we use two threads to execute the procedures of boiling water and making tea respectively, and use CountDownLatch to coordinate the progress of the two threads, and then perform the action of making tea after they have finished.
You can see that this method, a lot of unnecessary code, new Thread, manual maintenance of the progress of CountDownLatch.
2. Use CompletableFuture to implement
Public class MakeTeeFuture {public static void main (String [] args) throws ExecutionException, InterruptedException {CompletableFuture future1 = CompletableFuture.runAsync (() -) > {try {System.out.println ("washing kettle"); Thread.sleep (1000); System.out.println ("boiled water"); Thread.sleep (5000) } catch (InterruptedException e) {e.printStackTrace ();}}); CompletableFuture future2 = CompletableFuture.runAsync (()-> {try {System.out.println ("teapot"); Thread.sleep (1000); System.out.println ("teacup") Thread.sleep (1000); System.out.println ("take tea"); Thread.sleep (1000);} catch (InterruptedException e) {e.printStackTrace ();}}) CompletableFuture finish = future1.runAfterBoth (future2, ()-> {System.out.println ("ready, start making tea");}); System.out.println (finish.get ());}}
This program is extremely simple, does not require manual maintenance of threads, and does not require attention to the work of assigning threads to tasks.
At the same time, the semantics are clearer, future1.runAfterBoth (future2,.) Be able to state clearly that "Task 3 can not continue until Task 1 and Task 2 are completed."
Then the code is more concise and focuses on the business logic, and almost all the code is related to the business logic.
This is the end of "how to understand Java 8 Asynchronous programming CompletableFuture". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.