Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the RxJava thread switching process?

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Today, I will talk to you about how the RxJava thread switching process is, many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

Thread switching process

Let's take a look at another of its sharp tools, the scheduler Scheduler: as we know, Scheduler is prepared to add multithreading to the Observable data stream. Generally, we use the subscribeOn () and observeOn () methods to pass in the corresponding Scheduler to specify how each part of the data flow operation should run on which thread. For us, the most common scenario is to update the UI in the main thread after the non-main thread gets and processes the data:

This is a very common invocation method that handles all the processing between different threads in one go, and the structure is very clear because it is chained, so let's take a look at the thread switching process.

SubscribeOn ()

When we call subscribeOn ():

You can see that create () is also called here to generate an Observable, while OperatorSubscribeOn implements the OnSubscribe interface, passing in the original Observable and the scheduler we need:

As you can see, the processing of subscriber here is very similar to the treatment of subscriber by call () in OperatorMap earlier. Here we will also construct a new Subscribers based on the incoming subscriber, but most of this series of processes are executed by worker through schedule (). From the judgment of the thread in the later setProducer (), combined with the purpose of the subscribeOn () method, we can roughly infer that this worker is to some extent equivalent to the proxy executor of a new thread, and the implementation of schedule () should be very similar to that of run () in the Thread class. Let's now look at the implementation of this worker.

First enter from Schedulers.io ():

In the process of getting scheduler through hook, let's ignore it and go directly to CachedThreadScheduler to see its createWorker () method:

The pool here is an atomic variable reference AtomicReference, and what it holds is CachedWorkerPool, so the pool is the cache pool used to hold the worker. We get the required worker from the cache pool and encapsulate it into EventLoopWorker:

Here we finally find the target ThreadWorker, which inherits from NewThreadWorker, and the previous schedule () method ends up in this scheduleActual () method:

Here we see the executor thread pool, and that's the essence of the thread switch we finally achieved with Schedulers.io (). Now, combined with the previous process, let's sort it out from the beginning:

In subscribeOn (), we will generate a new Observable, and its member onSubscribe will use the worker of the incoming Scheduler as the thread scheduling executor when subscribing to the target Subscriber, and notify the original Observable in the corresponding thread to send a message to the temporarily generated Subscriber in the process, and the Subscriber will notify the target Subscriber, thus completing the process of subscribeOn ().

ObserveOn ()

Let's move on to observeOn ():

If we look directly at the part of the final call, we can see that here is another lift (), where OperatorObserveOn is passed. Unlike OperatorSubscribeOn, it is an Operator (we will not discuss the function of Operator above). It constructs a new observer ObserveOnSubscriber and implements the Action0 interface:

As you can see, all the messages sent by ObserveOnSubscriber to the target Subscriber child are switched to the thread of recursiveScheduler for processing, which achieves the purpose of switching the thread back.

The overall process of observeOn () is summarized as follows:

Comparing the two processes of subscribeOn () and observeOn (), it is not difficult to see the difference: subscribeOn () switches the subscription events of the initial Observable to another thread as a whole, while observeOn () switches the message sent by the initial Observable to another thread to notify the target Subscriber. The former switched "subscribe + send" to one thread, while the latter switched "send" to one thread. So, what we implement in our code is actually:

This makes it easy to achieve the functions of common scenarios such as time-consuming tasks operating in subthreads and updating operations in the main thread.

4. Other roles

Subject

Subject is a special role in Rx series, it inherits Observable and implements Observer interface at the same time, that is to say, it can be used as both an observer and an observer. It is generally used as a link between different Observable and Observer. You may wonder, don't we already have operators like map () and flatMap () to change the Observable data flow, so why introduce Subject? This is because the work undertaken by Subject is not the transformation connection for the content of the Observable data stream, but the scheduling of the data stream itself between Observable and Observer. That may still be vague. Let's take an example from "RxJava Essentials":

We created a PublishSubject through create (), and the observer successfully subscribed to the subject, but the subject did not have any data to send, we just knew that it would send string values in the future. Later, when we call subject.onNext (), the message is sent, and Observer's onNext () is triggered to output "Hello World".

Here we notice that when a subscription event occurs, our subject does not generate data flow, and the data flow does not start running until it launches "Hello World". Imagine that if we change the location of the subscription process and subject.onNext (), then Observer will definitely not accept "Hello World" (this is not nonsense-- |), so this fundamentally reflects the difference between hot and cold Observable.

Generally speaking, our Observable belongs to Cold Observables, just like watching video. Every time we click on a new video, we have to play it from scratch, while Subject belongs to Hot Observables by default, just like watching LVB. Video data is always new.

Based on this attribute, Subject naturally has the ability to select and schedule received data streams. Therefore, our use of Subject is usually based on the following ideas:

In the previous example, we used PublishSubject, which only sends data from the original Observable to the observer after the point in time when the subscription occurred. Wait a minute, does this function sound familiar?

Yes, it's EventBus and Otto. (the emergence of RxJava slowly took Otto off the stage. Now Otto's Repo is in Deprecated state, but EventBus is still strong.) based on the ability to observe subscription cancellation of RxJava and the function of PublishSubject, we can easily write a simple event bus framework that implements the most basic functions:

Of course, there are other types of Subject, such as BehaviorSubject, ReplaySubject, AsyncSubject and so on. You can take a look at the official documents, which are written in great detail and will not be introduced here.

three。 Postscript

I believe that in recent days, when you mention RxJava, you will think of Agera, which Google has just opened up recently. As a Reactive Programming framework specially built for Android, Agera will inevitably be compared with RxJava. The main process analysis of RxJava earlier in this article is drawing to a close, and now let's take a look at what Agera is all about.

First of all, come to a conclusion:

Agera, which was originally developed for Google Play Movies as an internal framework, is now open source, and although it appeared after RxJava, it is completely independent of RxJava and has nothing to do with it (it's just that the open source time is 233333). Compared to RxJava, Agera is more focused on the lifecycle of Android, while RxJava is more purely oriented towards the Java platform than Android.

You might ask, "what about RxAndroid? isn't it still there?" In fact, RxAndroid has done a lot of refactoring as early as version 1.0, many modules have been split into other projects, and some of the code has been deleted, and most of the remaining parts are related to Android threads, such as AndroidSchedulers, MainThreadSubscription and so on. In view of this situation, let's not pay attention to RxAndroid for the time being and focus on Agera first.

Also based on the observer pattern, the roles of Agera and RxJava are roughly similar. In Agera, there are two main roles: Observable (observed) and Updatable (observer).

Yes, compared with Observable in Observable,Agera in RxJava is a simple interface and there is no paradigm, so is Updatable, so how can we deliver messages? This requires another interface:

Finally seeing the generic T, our message delivery ability depends on this interface. So let's combine this interface with the basic Observable:

The Repository here is, to some extent, the Observable of the RxJava we want. Similarly, Repository has two types of implementations:

Direct-the data contained is always available or can be calculated synchronously; the Repository of a Direct is always active

Deferred-contains data that is calculated or pulled asynchronously; a Deffered Repository is inactive until Updatable is added

Do you feel deja vu? Yes, Repository also has a distinction between hot and cold, but let's not pay attention to that for the time being. Going back to the above, now that we have the role of sending data, how can we receive the data? The answer is Receiver:

I believe you should also feel vaguely that in the world of Agera, the transmission of data and events are isolated from each other, which is the biggest essential difference between Agera and Rx series at present. Agera uses a push event, pull data model, which means that event does not carry any data,Updatable and takes on the task of pulling data from the data source when it needs to be updated. In this way, the responsibility for providing data is split from the Observable to Repository, allowing it to focus on sending simple events such as button clicks, the trigger of a drop-down refresh, and so on.

So what are the benefits of such an implementation?

When the two processing distribution logic are separated, Updatable does not have to observe the history of complete data changes from Repository. After all, in most scenarios, especially those with updated UI, the latest data is often useful.

But I just need to see the changing historical data, what should I do?

Don't worry, here we have another character, Reservoir:

As the name implies, Reservoir is the place where we store changing data. It inherits Receiver and Repository, which is equivalent to the ability to receive and send data at the same time. By looking at its implementation, we can see that its essential operations are implemented using internal Queue: the data is received through accept () and entered into the column, and the data is obtained through get () and dequeued. If a Updatable observes this Reservoir, the next data to be dequeued after a scheduling change in its queue is available (not empty), the Updatable is notified, and the data is further pulled and sent to Receiver.

Now that we have a rough idea of the functional properties of these roles, let's take a look at an official sample code:

Is there a feeling in the clouds? Thanks to the comments, we can probably guess what's going on: splice it into url using the required image specifications as parameters, pull the corresponding image and display it in ImageView. Let's take a look at the whole process with API:

Repositories.repositoryWithInitialValue (Result.absent ())

Create a repository that can be run (or executed).

The initialization input value is Result, which is an immutable object that summarizes the results of operations such as apply () and merge (), and there are two states succeeded () and failed ().

Return to REventSource

Observe ()

Used to add a new Observable as an Event source to update our picture, which is not needed in this example.

Return to RFrequency

OnUpdatesPerLoop ()

If there is update () processing from multiple Event Source in each Looper Thread loop, only one data processing stream needs to be opened.

Return to RFlow

GetFrom (new Supplier (…))

Ignore the input value and use the newly obtained data from the given Supplier as the output value.

Return to RFlow

GoTo (executor)

Switch to the given executor to continue the data processing stream.

AttemptTransform (function ())

Using the given function () transform input value, if the transformation fails, the data flow is terminated; if successful, the new transformed value is taken as the output of the current stream instruction.

Return to RTermination

OrSkip ()

If the previous operation fails, the remaining data processing streams are skipped and all added Updatable are not notified.

ThenTransform (function ())

Similar to attemptTransform (function ()), except that notifications are issued when necessary.

Return to RConfig

OnDeactivation (SEND_INTERRUPT)

Used to clarify the behavior when repository is no longer active.

Return to RConfig

Compile ()

Execute the repository.

Return to Repository

At first glance, there is nothing special about the overall process, but the real mystery lies in the return value of performing each step:

The initial REventSource represents the beginning of the event source, receiving the T initialValue from the incoming value, where the first T is the data type of the current repository and the second T is the data type at the beginning of the data processing stream.

After that, when observe () is called, we pass the event source to REventSource, which is equivalent to setting the desired event source and the corresponding start. What is returned here is RFrequency, which inherits from REventSource and adds the attribute of the sending frequency of the event source.

After that, we come to onUpdatesPerLoop (), where we specify the number of data streams opened (that is, the frequency mentioned earlier), and return RFlow, which means that our data stream is officially generated. At the same time, this is also the starting point for streaming calls.

After we get our RFlow, we can provide it with the data source, that is, Supplier, so we call getFrom (), so that our data stream really has the "practical information" of data.

Once we have the data, we can convert the data according to the specific needs. Here, we can directly use transform () and return RFlow for further streaming calls; we can also call attemptTransform () to handle possible exceptions, such as orSkip () and orEnd (), and then continue the streaming calls.

After a series of streaming calls, we finally finished processing the data, and now we can choose to do the final wrapper thenTransform () on the molded data first, or merge thenMergeIn () with another Supplier. After these processing, our return value is also changed to RConfig, entering the final configuration and the end of the repository declaration.

During the final configuration, we call onDeactivation (), which defines the behavior of the repository when it finally enters the inactive state, and if we don't need any extra configuration, we can go to the final compile () method. When we call compile (), we execute and generate the repository according to all the processes and configurations we walked through earlier. It is only at this point that our repository is really created.

The above is the whole process of repository from scratch. When repository is born, we can also transfer the data we need. Go back to the sample code above:

We added and removed Updatable under the onResume () and onPause () lifecycles, respectively. Compared with the practice of unsubscribing through Subscription in RxJava, this way of writing Agera is obviously clearer and cleaner. Our Activity implements Updatable and Receiver interfaces, depending on how they are implemented:

You can see here that repository sends the data to receiver, that is, itself. After receiving the desired bitmap in the corresponding accept () method, this picture is displayed, and the complete process in the sample code is over.

Summarize the above process:

First Repositories.repositoryWithInitialValue () generates the origin REventSource.

After configuring Observable, enter the RFrequency state, and then configure the number of streams of the data flow.

After the previous configuration is completed, the data flow RFlow is generated, and then further streaming calls can be made through methods such as getFrom (), mergeIn (), transform (), etc.; you can also use the attemptXXX () method instead of the original method, followed by calls to orSkip () and orEnd () for error handling processing. When using the attemptXXX () method, the state of the data flow changes to RTermination, which means that the state at this time has the ability to terminate the data flow. Whether to terminate the data flow should be triggered according to failed check, combined with orSkip () and orEnd () followed by the call, our data stream will be switched from RTermination to RFlow again for subsequent streaming calls.

After the previous series of streaming processing, when we need to end the data flow, we can choose to call the thenXXX () method to carry out the final processing of the data flow. After processing, the data flow state will change to RConfig; or you can add error handling processing for this behavior. Select the thenAttemptXXX () method, followed by orSkip () and orEnd (), and the final data flow will also change to Rconfig state.

At this point, we can select the final configuration of the data flow as needed before the end, for example, call onDeactivation () to configure whether the process from "subscribe" to "unsubscribe" needs to continue to execute the data flow, and so on.

After everything is deployed, we compile () this RConfig to get the final formed Repository, which has the ability to add Updatable and send data notification Receiver.

We add Updatable,repository as needed and send an event notification Updatable via update () after the data stream processing is complete.

After receiving the notification, Updatable will pull the result data of repository and send the data to Receiver through accept (). Complete the process of Push event and pull data.

After reading the above, do you have any further understanding of the RxJava thread switching process? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report