Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How Reactive-MongoDB asynchronously Java Driver

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

How Reactive-MongoDB asynchronous Java Driver, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain for you in detail, people with this need can come to learn, I hope you can gain something.

I. about asynchronous drivers

Starting with version 3.0, MongoDB began to provide asynchronous drivers (Java Async Driver), which provides a higher performance option for applications.

But in essence, there are not a few projects that use synchronous drivers (Java Sync Driver), perhaps for preconceived reasons (the documentation for synchronizing Driver is better), or for compatibility with older versions of MongoDB.

However, due to the development of Reactive, the use of asynchronous drivers should be a trend in the future.

Before using Async Driver, you need to have some familiarity with the concept of Reactive.

Second, understand Reactive (responsive)

Responsive (Reactive) is an asynchronous, data flow-oriented development approach, originally from the Reactive Extensions library on the .NET platform, and then extended to the implementation of various programming languages.

In the famous Reactive Manifesto (responsive manifesto), four features are defined for Reactive:

Responsive: the system can respond to requests in a timely manner.

Resilient: the system can still respond when an exception occurs, that is, it supports fault tolerance.

Elastic: under different loads, the system can be stretched to ensure operation.

Message driven (Message Driven): asynchronous messaging is used between different components to interact and ensure loose coupling and isolation from each other.

All of these system characteristics defined in the responsive manifesto have something to do with responsive flow, which leads to the responsive flow specification (Reactive Stream Specification) launched in 2013.

Https://www.reactive-streams.org/

Among them, the processing link of responsive flow is defined as follows:

The ability to handle an unlimited number of elements, that is, to allow the flow to never end

Sequential processing

Pass elements asynchronously

Achieve non-blocking negative pressure (back-pressure)

The Java platform has released support for Reactive Streams on JDK 9.

Here are several key interfaces for responsive flows:

Publisher

Publisher is the publisher of the data. The Publisher interface has only one method subscribe, which is used to add the subscriber of the data, that is, Subscriber.

Subscriber

Subscriber is a subscriber to the data. The Subscriber interface has four methods, all of which act as handlers for different events. After the subscriber successfully subscribes to the publisher, its onSubscribe (Subscription s) method is called.

Subscription represents the current subscription relationship.

When the subscription is successful, you can use Subscription's request (long n) method to request the publisher to publish n pieces of data. The publisher may generate three different message notifications, corresponding to the other three callback methods of Subscriber.

Data notification: corresponding to the onNext method, which represents the data generated by the publisher.

Error notification: corresponds to the onError method, indicating that the publisher has made an error.

End notice: corresponding to the onComplete method, the publisher has finished publishing all the data.

In the above three kinds of notification, the error notification and the termination notice are the final notification, that is, after the termination notice, there will be no other notifications.

Subscription

Subscription represents a subscription relationship. In addition to the request method mentioned earlier, there is a cancel method to unsubscribe. It is important to note that after the cancel method call, it is still possible for the publisher to continue to publish notifications. But the subscription will eventually be cancelled.

The relationship between these interfaces is shown in the following figure:

Photo source: http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

The asynchronous driver of MongoDB is the mongo-java-driver-reactivestreams component, which implements the above interface of Reactive Stream.

In addition to reactivestream, the asynchronous driver of MongoDB also includes RxJava and other style versions, which can be further learned by interested readers.

Http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

Third, use examples

Next, use a simple example to demonstrate the Reactive style of code:

a. Introduce dependency

Org.mongodb mongodb-driver-reactivestreams 1.11.0

> the introduction of mongodb-driver-reactivestreams will automatically add reactive-streams, bson and mongodb-driver-async components

b. Connect to the database

/ / Server instance table List servers = newArrayList (); servers.add (newServerAddress ("localhost", 27018)); / / configuration builder MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder (); / / incoming server instance settingsBuilder.applyToClusterSettings (builder-> builder.hosts (servers)); / / build Client instance MongoClient mongoClient = MongoClients.create (settingsBuilder.build ())

c. Implement document query

/ / get database object MongoDatabase database = client.getDatabase (databaseName); / / get collection MongoCollection collection = database.getCollection (collectionName); / / return PublisherFindPublisher publisher = collection.find () asynchronously; / / subscribe implementation publisher.subscribe (newSubscriber () {@ Override publicvoid onSubscribe (Subscription s) {System.out.println ("start..."); / / execute request s.request (Integer.MAX_VALUE) } @ Override publicvoid onNext (Document document) {/ / get the document System.out.println ("Document:" + document.toJson ());} @ Override publicvoid onError (Throwable t) {System.out.println ("error occurs.");} @ Override publicvoid onComplete () {System.out.println ("finished.");}})

Notice that, unlike using synchronous drivers, the collection.find () method returns not Cursor, but a FindPublisher object, which is a layer extension of the Publisher interface.

Also, when the Publisher object is returned, no real database IO request is generated at this time. Actually initiating the request requires a call to the Subscription.request () method.

In the above code, in order to read the results produced by Publisher, by customizing a Subscriber, the request for the database is executed when the onSubscribe event is triggered, and then onNext, onError, and onComplete are processed respectively.

Although this implementation is purely asynchronous, it is cumbersome to use. Imagine that if you have to complete a Subscriber logic for each database operation, then the workload of development is huge.

In order to reuse repetitive logic as much as possible, you can encapsulate the logic of Subscriber with the following functions:

Use the List container to cache the request result

Implement the method of blocking waiting for the result, which can specify the timeout

Catch an exception and throw it while waiting for the result

The code is as follows:

PublicclassObservableSubscriberimplementsSubscriber {/ / response data privatefinalList received; / / error message privatefinalList errors; / / waiting object privatefinalCountDownLatch latch; / / subscriber privatevolatileSubscription subscription; / / whether privatevolatileboolean completed; publicObservableSubscriber () {this.received = newArrayList (); this.errors = newArrayList (); this.latch = newCountDownLatch (1) @ Override publicvoid onSubscribe (finalSubscription s) {subscription = s;} @ Override publicvoid onNext (finalT t) {received.add (t);} @ Override publicvoid onError (finalThrowable t) {errors.add (t); onComplete ();} @ Override publicvoid onComplete () {completed = true; latch.countDown () } publicSubscription getSubscription () {return subscription;} publicList getReceived () {return received;} publicThrowable getError () {if (errors.size () > 0) {return errors.get (0);} returnnull;} publicboolean isCompleted () {return completed } / * block for a certain time to wait for the result * * @ param timeout * @ param unit * @ return * @ throwsThrowable * / publicListget (finallong timeout,finalTimeUnit unit) throwsThrowable {return await (timeout, unit). GetReceived () } / * has been blocking waiting for the request to be completed * * @ return * @ throwsThrowable * / publicObservableSubscriber await () throwsThrowable {return await (Long.MAX_VALUE,TimeUnit.MILLISECONDS) } / * blocking for a certain amount of time waiting for completion * * @ param timeout * @ param unit * @ throwsThrowable * / publicObservableSubscriber await (finallong timeout,finalTimeUnit unit) throwsThrowable {subscription.request (Integer.MAX_VALUE); if (! latch.await (timeout, unit)) {thrownewMongoTimeoutException ("Publisher onComplete timed out") } if (! errors.isEmpty ()) {throw errors.get (0);} returnthis;}}

With this basic utility class, our asynchronous manipulation of documents becomes much easier.

For example, the operation of document query can be modified as follows:

ObservableSubscriber subscriber = newObservableSubscriber (); collection.find (). Subscribe (subscriber); / / result processing subscriber.get (15 TimeUnit. Seconds). ForEach (d-> {System.out.println ("Document:" + d.toJson ());})

Of course, this example can be further improved, such as using List as a cache to consider the amount of data to avoid transferring all (or excess) documents to memory at once.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report