Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is Reactive reactive programming and how to use it

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces what Reactive reactive programming is and how to use the relevant knowledge, the content is detailed and easy to understand, the operation is simple and fast, has a certain reference value, I believe that after reading this Reactive reactive programming is what and how to use the article will have a harvest, let's take a look at it.

A brief introduction to reactive programming

Reactor is an implementation of the Reactive Programming paradigm, which can be summarized as follows:

Reactive programming is an asynchronous programming paradigm that involves data flow and change propagation. This means that static (such as an array) or dynamic (such as an event emitter) data flow can be easily expressed through the programming language used.

As a first step in the direction of reactive programming, Microsoft created the Reactive Extensions (Rx) library in the .NET ecosystem. Then RxJava implements responsive programming on JVM. Over time, the standardization of Java emerged through Reactive Streams work, which defined a set of interfaces and interaction rules for the reaction library on JVM. Its interface has been integrated into Java 9 under the parent Flow class.

The reactive programming paradigm is usually presented in an object-oriented language as an extension of the Observer design pattern. One can also compare the main reactive flow patterns with familiar iterator design patterns because there is a duality to Iterable- Iterator pairs in all of these libraries. A major difference is that although iterators are pull-based, the response flow is push-based.

Using an iterator is an imperative programming mode, even if it is entirely responsible for Iterable the method of accessing the value. In fact, developers can choose when to access projects in the next () sequence. In the reaction flow, it is equivalent to the above-mentioned pair Publisher-Subscriber. However, when they appear, Publisher it notifies the subscriber of the new available values, and this push is the key to the passive response. In addition, operations applied to push values are expressed declaratively rather than commands: the programmer expresses the logic of the calculation rather than describing its precise control flow.

In addition to pushing values, error handling and completion aspects are covered in a well-defined way. A Publisher can push the new value to Subscriber (by calling onNext), but it can also issue an error (by calling onError) or complete (by calling onComplete). Both error and completion terminate the sequence. This can be summarized as follows:

OnNext x 0.. N [onError | onComplete]

This method is very flexible. This pattern supports use cases with no value, one value or n values (including an infinite sequence of values, such as the continuous tick of a clock).

But let's consider first, why do we need such an asynchronous response library in the first place?

Blocking may waste resources

Modern applications can cover a large number of concurrent users. Even with the continuous improvement of the function of modern hardware, the performance of modern software is still a key issue.

People can improve program performance in two ways:

Parallelization: use more threads and more hardware resources.

Seek higher efficiency in the way existing resources are used.

Typically, Java developers write programs in blocking code. This is fine until a performance bottleneck occurs, when additional threads need to be introduced to run similar blocking code. However, this extension of resource utilization can quickly introduce contention and concurrency problems.

To make matters worse, stop the waste of resources. If you look closely, once the program involves some delay (especially I / O, such as database requests or network calls), resources are wasted because threads (or many threads) are now idle, waiting for data.

So parallelization is not a panacea. It is necessary to obtain the full functionality of the hardware, but the reasons are also complex and vulnerable to waste of resources.

Use async to solve the problem?

The second method (mentioned earlier), which seeks higher efficiency, can solve the problem of waste of resources. By writing asynchronous, non-blocking code, you can use the same underlying resources to switch execution to another active task, and then return to the current process after the asynchronous process is complete.

But how do you generate asynchronous code on JVM? Java provides two asynchronous programming models:

Callback: the asynchronous method does not return a value, but requires additional callback parameters (lambda or anonymous classes) that are called when the results are available. A well-known example is Swing's EventListener hierarchy.

Futures: the asynchronous method Future returns immediately. The asynchronous process calculates a T value, but the Future object contains access to it. The value is not immediately available, and the object can be polled until it is available. For example, ExecutorService runs the Callable task using the Future object.

Are these technologies good enough? It does not apply to all use cases, and both approaches have limitations.

Callbacks are difficult to group together, quickly leading to code that is difficult to read and maintain (called "Callback Hell").

Consider an example: display the user's first five favorites on the user interface, or make a suggestion if she doesn't have one. This is done through three services (one provides a favorite ID, the second extracts the details of the favorite, and the third provides detailed advice):

Example of userService.getFavorites (userId, new Callback () {public void onSuccess (Listlist) {if (list.isEmpty ()) {suggestionService.getSuggestions (new Callback () {public void onSuccess (Listlist) {UiUtils.submitOnUiThread (()-> {list.stream () .limit (5) .forEach (uiList::show);})) } public void onError (Throwable error) {UiUtils.errorPopup (error);}}) } else {list.stream () .limit (5) .forEach (favId-> favoriteService.getDetails (favId, new Callback () {public void onSuccess (Favorite details) {UiUtils.submitOnUiThread (()-> uiList.show (details) } public void onError (Throwable error) {UiUtils.errorPopup (error);}});}} public void onError (Throwable error) {UiUtils.errorPopup (error);}})

We have a callback-based service: a Callback interface that contains methods that are called when the asynchronous process succeeds and methods that are called when an error occurs.

The first service invokes its callback with a favorite ID list.

If the list is empty, we must go to suggestionService.

A List to a second callback is given in suggestionService.

Since we are dealing with UI, we need to make sure that our consumer code will run in the UI thread.

We used Java 8 Stream to limit the number of suggestions processed to five and display them in the list of drawings in UI.

At each level, we handle errors in the same way: display them in a pop-up window.

Go back to your favorite ID level. If the service returns a complete list, then we need to go to favoriteService to get the detailed Favorite object. Since we only need five, we first stream the ID list, limiting it to five.

Once again, a callback. This time we get a full-fledged Favorite object, which we push to UI within the UI thread.

This is a lot of code, which is a bit difficult to follow and has repetitive parts. Consider its equivalent in Reactor:

Sample Reactor code equivalent to the callback code userService.getFavorites (userId) .flatMap (favoriteService::getDetails) .switchIfEmpty (suggestionService.getSuggestions ()) .take (5) .publishon (UiUtils.uiThreadScheduler ()) .subscribe (uiList::show, UiUtils::errorPopup)

Let's start with our favorite ID stream.

We convert them asynchronously to detailed Favorite objects (flatMap). We now have a mobile Favorite.

If the traffic Favorite is empty, we will switch to the backward suggestionService.

We are only interested in at most five elements of the final process.

Finally, we want to process each data in the UI thread.

We trigger the process by describing how to handle the final form of the data (shown in the UI list) and what to do if an error occurs (a pop-up window is displayed).

What if you want to ensure that your favorite ID is retrieved in less than 800ms, or what if it takes longer to get them from the cache? This is a complex task in callback-based code. In Reactor, it becomes as simple as timeout adding operators to the chain:

Sample Reactor code with timeout and fallback userService.getFavorites (userId) .timeout (Duration.ofMillis (800)) .onErrorResume (cacheService.cachedFavoritesFor (userId)) .flatMap (favoriteService::getDetails) .switchIfEmpty (suggestionService.getSuggestions ()) .take (5) .publishon (UiUtils.uiThreadScheduler ()) .subscribe (uiList::show, UiUtils::errorPopup)

If the above part takes more than 800 milliseconds, the error is propagated.

If an error occurs, please reply to cacheService.

The rest of the chain is similar to the previous example.

Despite the improvements in Java 8, futures are better than callbacks, but they still perform poorly in terms of composition, CompletableFuture. It is feasible but not easy to orchestrate multiple futures together. In addition, Future has other problems: Future can easily end another blocking condition of an object by calling the get () method, they do not support deferred calculation, and they do not support multiple values and advanced error handling.

Consider another example: we get an ID list from which we take a name and a statistic, and then combine them in pairs, all asynchronously.

Example of CompletableFuture combination CompletableFutureids = ifhIds (); CompletableFutureresult = ids.thenComposeAsync (l-> {Streamzip = l.stream (). Map (I-> {CompletableFuturenameTask = ifhName (I); CompletableFuturestatTask = ifhStat (I)) Return nameTask.thenCombineAsync (statTask, (name, stat)-> "Name" + name + "has stats" + stat);}); ListcombinationList = zip.collect (Collectors.toList ()); CompletableFuture [] combinationArray = combinationList.toArray (new CompletableFuture [combinationList.size ()]); CompletableFutureallDone = CompletableFuture.allOf (combinationArray) Return allDone.thenApply (v-> combinationList.stream () .map (CompletableFuture::join) .map (Collectors.toList ());}); Listresults = result.join () AssertThat (results) .contains ("Name NameJoe has stats 103"," Name NameBart has stats 104", "Name NameHenry has stats 105"," Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121")

We start with a future that provides us with a list of values to be processed by id.

Once we have the list, we want to start some more in-depth asynchronous processing.

For each element in the list:

Gets the associated name asynchronously.

Get related tasks asynchronously.

Combine the two results.

We now have a futures list that represents all the combined tasks. To perform these tasks, we need to convert the list to an array.

Pass the array to CompletableFuture.allOf and output the array that Future completes after all the tasks are completed.

The tricky point is that allOf returns CompletableFuture, so we reiterated the futures list by collecting the results join () (there is no block here, because allOf ensures that the futures are all completed).

Once the entire asynchronous pipeline is triggered, we wait for it to be processed and return a list of results that we can assert.

Because Reactor has more out-of-the-box combination operators, this process can be simplified:

Reactor code example equivalent to future code Fluxids = ifhrIds (); Fluxcombinations = ids.flatMap (id-> {MononameTask = ifhrName (id); MonostatTask = ifhrStat (id)) Return nameTask.zipWith (statTask, (name, stat)-> "Name" + name + "has stats" + stat);}); Monoresult = combinations.collectList (); Listresults = result.block () AssertThat (results) .containsExactly ("Name NameJoe has stats 103"," Name NameBart has stats 104", "Name NameHenry has stats 105"," Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121")

This time, we start with the ids (a Flux) sequence provided asynchronously.

For each element in the sequence, we process it asynchronously (flatMap inside the body function) twice.

Get the relevant name.

Get the relevant statistics.

Combine 2 values asynchronously.

Aggregates the value into a when the value List becomes available.

In production, we will continue to process Flux asynchronously by further combining or subscribing to it. Most likely, we will return to result Mono. Since we are in the test, we block, wait for the processing to complete, and then return directly to the list of aggregated values.

Assert the result.

These risks for Callback and Future are similar and are related to reactive programming to the Publisher-Subscriber pair.

From imperative to reactive programming

Response libraries such as Reactor are designed to address these shortcomings of the "classic" asynchronous approach on JVM, while also focusing on some other aspects:

Combinability and readability

Data as a process of manipulating with a rich operator vocabulary

Nothing happens before you subscribe

Back pressure or the ability of consumers to signal to producers that the emission rate is too high

Advanced but high-value abstraction, independent of concurrency

Combinability and readability

Through composability, we refer to the ability to orchestrate multiple asynchronous tasks, to use the results of previous tasks to provide input to subsequent tasks or to execute multiple tasks in fork-join, and to reuse asynchronous tasks as discrete components in a higher-level system.

The ability to orchestrate tasks is closely related to the readability and maintainability of the code. With the increase in the number and complexity of asynchronous process layers, it becomes more and more difficult to write and read code. As we can see, the callback model is simple, but one of its main drawbacks is that for complex processes, you need to perform a callback from the callback, which itself is nested in another callback, and so on. That mess is called Callback Hell. As you can guess (or know from experience), such code is difficult to return and reason.

Reactor provides a wealth of composition options, where the code reflects the organization of the abstract process, and everything is usually kept at the same level (nesting minimization).

Analogy assembly line workflow

You can think of the data processed by a responsive application as moving in the assembly line. The reactor is both a conveyor belt and a workstation. The raw material is poured out of the raw material (original Publisher) and eventually becomes a finished product, ready to be pushed to the consumer (or Subscriber).

Raw materials can go through various transformations and other intermediate steps, or as part of a larger assembly line that brings middleware together. If there is a burr or blockage at some point (it may take a disproportionate long time to pack the product), the affected workstation can signal upstream to restrict the flow of raw materials.

Operator (operator)

In Reactor, the operator is the workstation in our assembly analogy. Each operator adds the behavior to a Publisher and wraps the previous step Publisher into the new instance. Therefore, the entire chain is linked so that the data originates from the first Publisher chain and moves down the chain, converted by each chain. In the end, Subscriber completed the whole process. Remember, nothing happens before Subscriber subscribes to a, Publisher, which will be mentioned below.

Knowing the operator to create a new instance can help you avoid a common error that can cause you to think that the operator used in your chain is not applied. See common problems with this project.

Although the Reactive Streams specification does not specify operators at all, one of the best added values of reaction libraries such as Reactor is the rich operators they provide. These involve many aspects, from simple transformation and filtering to complex orchestration and error handling.

Nothing will happen until you subscribe.

In Reactor, when you write a Publisher chain, the data does not start by default. Instead, you can create an abstract description of the asynchronous process (which helps reuse and composition).

Through the subscription behavior, you bind the Publishera to a Subscriber, triggering the data flow throughout the chain. This is the Subscriber implemented internally by a single request signal propagated upstream, all the way back to the source Publisher.

Back pressure

The upstream propagation signal is also used to realize the back pressure, which is described in the assembly line as the feedback signal sent to the line when the workstation is slower than the upstream workstation.

The real mechanism defined by the Reactive Streams specification is very close to analogy: the subscriber can work in unlimited mode, allowing the source to push all data as fast as possible, or you can use the request mechanism to signal to the source that it is ready to process the most n elements.

The intermediate operator can also change the request on the way. Imagine a buffer operator that groups elements into 10. If the subscriber requests a buffer, the source can generate 10 elements. Some operators also implement a prefetch strategy, which avoids request (1) round trips, and is beneficial if it is not too expensive to generate elements before the request.

This converts the push model into a push-pull hybrid, and if they are readily available, the downstream can pull n elements from the upstream. But if the elements are not ready, they will be pushed upstream at generation time.

Hot and cold

In the Rx family of reaction libraries, people can distinguish between two types of reaction sequences: hot and cold. This difference is mainly related to how the response flow reacts to subscribed users:

Cold sequence means that no matter when subscribers subscribe to the sequence, they can always receive all the messages generated in the sequence.

The corresponding hot sequence is the continuous generation of messages, and subscribers can only get the messages generated after their subscriptions.

This is the end of the article on what Reactive reactive programming is and how to use it. Thank you for reading! I believe you all have a certain understanding of what Reactive reactive programming is and how to use it. If you want to learn more, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report