Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Subject in RxJS

2025-02-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article will explain in detail how to use Subject in RxJS. The editor thinks it is very practical, so I share it with you as a reference. I hope you can get something after reading this article.

Observer Pattern

Observer pattern definition

The Observer pattern, also known as the publish-subscribe pattern (Publish/Subscribe), defines an one-to-many relationship that allows multiple observer objects to listen to a topic object at the same time. When the state of the topic object changes, it will notify all the observer objects so that they can update themselves automatically.

We can use daily examples of periodical subscriptions to vividly explain the above concepts. Periodical subscriptions include two main roles: periodical publishers and subscribers, and the relationship between them is as follows:

Periodical publisher-responsible for the publication and distribution of periodicals

Subscribers only need to perform a subscription operation, and after the publication of the new edition of the journal, they will be notified on their own initiative. If they cancel the subscription, they will not receive any more notification in the future.

There are also two main roles in observer mode: Subject (theme) and Observer (observer). They correspond to the journal publishers and subscribers in the example, respectively. Next, let's look at a picture to deepen our understanding of the above concepts.

Observer pattern structure

Observer mode actual combat

Subject class definition

Class Subject {constructor () {this.observerCollection = [];} addObserver (observer) {/ / add Observer this.observerCollection.push (observer);} deleteObserver (observer) {/ / remove Observer let index = this.observerCollection.indexOf (observer); if (index > = 0) this.observerCollection.splice (index, 1) } notifyObservers () {/ / notify the observer this.observerCollection.forEach ((observer) = > observer.notify ());}}

Observer class definition

Class Observer {constructor (name) {this.name = name;} notify () {console.log (`${this.name} has been notified.`);}

Use the example

Let subject = new Subject (); / / create subject object let observer1 = new Observer ('semlinker'); / / create observer A -' semlinker'let observer2 = new Observer ('lolo'); / / create observer B -' lolo'subject.addObserver (observer1); / / register observer Asubject.addObserver (observer2); / / register observer B subject.notifyObservers (); / / notify observer subject.deleteObserver (observer1) / / remove the observer Asubject.notifyObservers (); / / verify whether it has been removed successfully

The output of the console after the above code runs successfully:

Semlinker has been notified.

Lolo has been notified.

Lolo has been notified.

Observable subscribe

Before introducing RxJS-Subject, let's look at an example:

Const interval$ = Rx.Observable.interval (1000) .take (3); interval$.subscribe ({next: value = > console.log ('Observer A get value:' + value);}); setTimeout (() = > {interval$.subscribe ({next: value = > console.log ('Observer B get value:' + value);});}, 1000)

After the above code is run, the output of the console:

Observer A get value: 0

Observer A get value: 1

Observer B get value: 0

Observer A get value: 2

Observer B get value: 1

Observer B get value: 2

From the above example, we can draw the following conclusions:

Observable objects can be repeatedly subscribed

Each time the Observable object is subscribed, it will be reexecuted

In the above example, we can simply think of calling a normal function twice, referring to the following code:

Function interval () {setInterval () = > console.log ('..'), 1000);} interval (); setTimeout () = > {interval ();}, 1000)

The default behavior of Observable objects applies to most scenes. But sometimes, we want to receive the value sent by Observable not from scratch on the second subscription, but from the value currently being processed on the first subscription. We call this processing multicast, so how do we achieve it? Recall that we just introduced the observer model and whether you have a plan in mind. Yes, we can do this by customizing Subject.

Custom Subject

Subject class definition

Class Subject {constructor () {this.observers = [];} addObserver (observer) {this.observers.push (observer);} next (value) {this.observers.forEach (o = > o.next (value));} error (error) {this.observers.forEach (o = > o.error (error));} complete () {this.observers.forEach (o = > o.complete ());}}

Use the example

Const interval$ = Rx.Observable.interval (1000) .take (3); let subject = new Subject (); let observerA = {next: value = > console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')} Var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer complete')}; subject.addObserver (observerA); / / add Observer Ainterval$.subscribe (subject); / / subscribe to interval$ object setTimeout (() = > {subject.addObserver (observerB); / / add Observer B}, 1000)

After the above code is run, the output of the console:

Observer A get value: 0

Observer A get value: 1

Observer B get value: 1

Observer A get value: 2

Observer B get value: 2

Observer A complete!

Observer B complete!

By customizing the Subject, we have implemented the functionality mentioned earlier. Next, let's get to the point-RxJS Subject.

RxJS Subject

First, let's rewrite the above example through RxJS Subject:

Const interval$ = Rx.Observable.interval (1000) .take (3); let subject = new Rx.Subject (); let observerA = {next: value = > console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')} Var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer complete')}; subject.subscribe (observerA); / / add Observer Ainterval$.subscribe (subject); / / subscribe to interval$ object setTimeout (() = > {subject.subscribe (observerB); / / add Observer B}, 1000)

RxJS Subject source code fragment

/ * Suject inherits from Observable * / export class Subject extends Observable {constructor () {super (); this.observers = []; / / Observer list this.closed = false; this.isStopped = false; this.hasError = false; this.thrownError = null;} next (value) {if (this.closed) {throw new ObjectUnsubscribedError () } if (! this.isStopped) {const {observers} = this; const len = observers.length; const copy = observers.slice (); for (let I = 0; I

< len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new ObjectUnsubscribedError(); } this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new ObjectUnsubscribedError(); } this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空内部观察者列表 }} 通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论: Subject 既是 Observable 对象,又是 Observer 对象 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast) Angular 2 RxJS Subject 应用 在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下: message.service.ts import { Injectable } from '@angular/core';import {Observable} from 'rxjs/Observable';import { Subject } from 'rxjs/Subject';@Injectable()export class MessageService { private subject = new Subject(); sendMessage(message: string) { this.subject.next({ text: message }); } clearMessage() { this.subject.next(); } getMessage(): Observable { return this.subject.asObservable(); }} home.component.ts import { Component } from '@angular/core';import { MessageService } from '../_services/index';@Component({ moduleId: module.id, templateUrl: 'home.component.html'})export class HomeComponent { constructor(private messageService: MessageService) {} sendMessage(): void { // 发送消息 this.messageService.sendMessage('Message from Home Component to App Component!'); } clearMessage(): void { // 清除消息 this.messageService.clearMessage(); }} app.component.ts import { Component, OnDestroy } from '@angular/core';import { Subscription } from 'rxjs/Subscription';import { MessageService } from './_services/index';@Component({ moduleId: module.id, selector: 'app', templateUrl: 'app.component.html'})export class AppComponent implements OnDestroy { message: any; subscription: Subscription; constructor(private messageService: MessageService) { this.subscription = this.messageService.getMessage() .subscribe(message =>

{this.message = message;});} ngOnDestroy () {this.subscription.unsubscribe ();}}

The function of the above example is to communicate messages between components, that is, HomeComponent child components, to send messages to the AppComponent parent component. After the code runs, the browser displays the following results:

Problems in Subject

Because when Subject subscribes, it stores observer in the observer list, and when it receives a new value, it traverses the observer list and calls the next method on the observer, as shown below:

Next (value) {if (this.closed) {throw new ObjectUnsubscribedError ();} if (! this.isStopped) {const {observers} = this; const len = observers.length; const copy = observers.slice (); for (let I = 0; I

< len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } }} 这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下: const source = Rx.Observable.interval(1000);const subject = new Rx.Subject();const example = subject.map(x =>

{if (x = 1) {throw new Error ('oops');} return x;}); subject.subscribe (x = > console.log (' Agar, x)); example.subscribe (x = > console.log ('Bread, x)); subject.subscribe (x = > console.log (' console.log, x)); source.subscribe (subject)

After the above code is run, the output of the console:

A 0

B 0

C 0

A 1

Rx.min.js:74 Uncaught Error: oops

JSBin-Subject Problem Demo

Before the code runs, you will think that Observer B will throw an exception when it receives a value of 1, and observers An and C will still work properly. But in fact, in the current version of RxJS, if Observer B reports an error, Observer An and C will also stop running. So how to solve this problem? At present, the easiest way is to add exception handling for all observers. The updated code is as follows:

Const source = Rx.Observable.interval (1000); const subject = new Rx.Subject (); const example = subject.map (x = > {if (x = 1) {throw new Error ('oops');} return x;}); subject.subscribe (x = > console.log (' An Error:', x), error = > console.log ('An Error:' + error)); example.subscribe (x = > console.log ('Bamboo, x), error = > console.log (' B Error:' + error)) Subject.subscribe (x = > console.log ('Error:', x), error = > console.log (' C Error:' + error)); source.subscribe (subject)

JSBin-RxJS Subject Problem Solved Demo

RxJS Subject & Observable

Subject is actually an implementation of the observer pattern, so when an observer subscribes to a Subject object, the Subject object adds the subscriber to the observer list. Whenever a subject object receives a new value, it traverses the observer list, calling the next () method within the observer in turn, sending out the values one by one.

Subject has all the methods in Observable because the Subject class inherits the Observable class, and there are five important methods in the Subject class:

Next-whenever the Subject object receives a new value, the next method is called

Error-if an exception occurs while running, the error method will be called

When the Observable object subscribed to by complete-Subject ends, the complete method is called

Subscribe-add Observer

Unsubscribe-unsubscribe (set termination identifier, empty observer list)

BehaviorSubject

BehaviorSubject definition

BehaviorSubject source code fragment

Export class BehaviorSubject extends Subject {constructor (_ value) {/ / set the initial value super (); this._value = _ value;} get value () {/ / get the current value return this.getValue ();} _ subscribe (subscriber) {const subscription = super._subscribe (subscriber); if (subscription & &! subscription.closed) {subscriber.next (this._value) / / send the current latest value} return subscription;} getValue () {if (this.hasError) {throw this.thrownError;} else if (this.closed) {throw new ObjectUnsubscribedError ();} else {return this._value for new subscribers }} next (value) {/ / calls the next method of the parent class Subject, updating the current value super.next (this._value = value);}}

BehaviorSubject application

Sometimes we want Subject to keep the current state, rather than simply sending events, that is, every time we add an observer, we want Subject to immediately issue the latest value instead of no response. Let's take a look at the example first:

Var subject = new Rx.Subject (); var observerA = {next: value = > console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')} Var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer completion')}; subject.subscribe (observerA); subject.next (1); subject.next (2); subject.next (3); setTimeout (() = > {subject.subscribe (observerB); / / subscribe in 1 second}, 1000)

After the above code is run, the output of the console:

Observer A get value: 1

Observer A get value: 2

Observer A get value: 3

By outputting the results, we find that after observerB subscribes to the Subject object, it no longer receives any values. Because the Subject object no longer calls the next () method. But many times we want the Subject object to save the current state and automatically send the latest value to the subscriber when a new subscriber is added. To do this, we need to use BehaviorSubject.

The biggest difference between BehaviorSubject and Subject is that BehaviorSubject is used to hold the latest values, rather than simply sending events. BehaviorSubject remembers the last sent value and saves it as the current value in the internal property. Next, let's revisit the above example using BehaviorSubject:

Var subject = new Rx.BehaviorSubject (0); / / set the initial value var observerA = {next: value = > console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')} Var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer completion')}; subject.subscribe (observerA); subject.next (1); subject.next (2); subject.next (3); setTimeout (() = > {subject.subscribe (observerB); / / subscribe in 1 second}, 1000)

After the above code is run, the output of the console:

Observer A get value: 0

Observer A get value: 1

Observer A get value: 2

Observer A get value: 3

Observer B get value: 3

JSBin-BehaviorSubject

ReplaySubject

ReplaySubject definition

ReplaySubject source code fragment

Export class ReplaySubject extends Subject {constructor (bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {super (); this.scheduler = scheduler; this._events = []; / / ReplayEvent object list this._bufferSize = bufferSize

< 1 ? 1 : bufferSize; // 设置缓冲区大小 this._windowTime = windowTime < 1 ? 1 : windowTime; } next(value) { const now = this._getNow(); this._events.push(new ReplayEvent(now, value)); this._trimBufferThenGetEvents(); super.next(value); } _subscribe(subscriber) { const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表 let subscription; if (this.closed) { throw new ObjectUnsubscribedError(); } ... else { this.observers.push(subscriber); subscription = new SubjectSubscription(this, subscriber); } ... const len = _events.length; // 重新发送设定的最后bufferSize个值 for (let i = 0; i < len && !subscriber.closed; i++) { subscriber.next(_events[i].value); } ... return subscription; }}class ReplayEvent { constructor(time, value) { this.time = time; this.value = value; }} ReplaySubject 应用 有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下: var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值var observerA = { next: value =>

Console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')}; var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer B completeness')}; subject.subscribe (observerA) Subject.next (1); subject.next (2); subject.next (3); setTimeout () = > {subject.subscribe (observerB); / / subscribe in 1 second}, 1000)

After the above code is run, the output of the console:

Observer A get value: 1

Observer A get value: 2

Observer A get value: 3

Observer B get value: 2

Observer B get value: 3

Some people may think that ReplaySubject (1) is not the same as BehaviorSubject, but they are different. When you create a BehaviorSubject object, you set the initial value, which is used to represent the current state of the Subject object, while ReplaySubject is just a playback of the event.

JSBin-ReplaySubject

AsyncSubject

AsyncSubject definition

AsyncSubject source code fragment

Export class AsyncSubject extends Subject {constructor () {super (.. subscriber); this.value = null; this.hasNext = false; this.hasCompleted = false; / / identify whether it has been completed} _ subscribe (subscriber) {if (this.hasError) {subscriber.error (this.thrownError); return Subscription.EMPTY } else if (this.hasCompleted & & this.hasNext) {/ / wait until the final value subscriber.next (this.value); subscriber.complete (); return Subscription.EMPTY;} return super._subscribe (subscriber);} next (value) {if (! this.hasCompleted) {/ / save the current value this.value = value; this.hasNext = true if not finished }}}

AsyncSubject application

AsyncSubject is similar to the last operator in that it emits the last value after the end of the Subject, as shown in the following example:

Var subject = new Rx.AsyncSubject (); var observerA = {next: value = > console.log ('Observer A get value:' + value), error: error = > console.log ('Observer An error:' + error), complete: () = > console.log ('Observer A completeness')} Var observerB = {next: value = > console.log ('Observer B get value:' + value), error: error = > console.log ('Observer B error:' + error), complete: () = > console.log ('Observer Bcompleteness')}; subject.subscribe (observerA); subject.next (1); subject.next (2); subject.next (3); subject.complete (); setTimeout () = > {subject.subscribe (observerB); / / subscription after 1 second}, 1000)

After the above code is run, the output of the console:

Observer A get value: 3

Observer A complete!

Observer B get value: 3

Observer B complete!

JSBin-AsyncSubject

This is the end of the article on "how to use Subject in RxJS". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report