RxJS

学习抄录加强记忆…

RxJS

入门

RxJS 是一个库,通过使用 observable 序列来编写异步和基于事件的程序,提供一个核心的类型 Observable,附属类型(Observer/Schedulers/Subjects)和受[Array#extras]启发的操作符(map,filter,reduce,every等等),这些数组操作符可以把异步事件作为集合来处理。

可以把 Rxjs当做是用来处理事件的 Lodash

ReactiveX 结合了观察者模式、迭代器模式和使用集合的函数式编程,以满足以一种理想方式来管理序列所需要的一切。

在 RxJS中用来解决异步事件管理的基本概念是:

  • Observable(可观察对象):表示一个概念,这个概念是一个可调用的未来值或事件的集合
  • Observer(观察者):一个回调函数的集合,它知道如何去监听由 Observable提供的值
  • Subscription(订阅):表示 Observable的执行,主要用于取消 Observable的执行。
  • Operators(操作符):采用函数式编程风格的纯函数(pure function),使用像map/filter/concat/flatMap等这样的操作符来处理集合
  • Subject(主体):相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式
  • Schedulers(调度器):用来控制并发并且是中央集权的调度员,运行我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame或其他。

例子

注册事件监听器常规写法与 RxJS写法

1
2
3
4
5
const button = document.querySelector('button');
button.addEventListener('click',()=>console.log('Clicked!'))

// RxJS
Rx.Observable.fromEvent(button,'click').subscribe(()=>console.log('Clicked!'))

纯净性(Purity)

RxJS 强大的正是它使用纯函数来产生值的能力,这意味这代码不容易出错。

通常会创建一个非纯函数,在这个函数之外也使用了共享变量的代码,使得应用状态一团糟

1
2
3
4
5
let count = 0;
button.addEventListener('click',()=>console.log(`Clicked ${++count} times`))

// RxJS
Rx.Observable.fromEvent(button,'click').scan(count=>count+1,0).subscribe(count=>console.log(`Clicked ${count} times`))

scan操作符的工作原理与数组的 reduce类似,它需要一个暴露给回调函数当参数的初始值,每次回调函数运行后的返回值会作为下次回调函数运行时的参数。

流动性(Flow)

RxJS 提供一整套操作符来帮助控制事件如何流经 observables.

下面的代码展示的是如何控制一秒钟内最多点击一次:

1
2
3
4
5
6
7
8
9
10
11
12
const count = 0;
const rate = 1000;
const lastClick = Date.now() - rate;
const button = document.querySelector('button');
button.addEventListener('click',()=>{
if(Date.now() - lastClick >= rate){
console.log(`Clicked ${++count} times`)
lastClick = Date.now();
}
})
// RxJS
Rx.Observable.fromEvent(button,'click').throttleTime(1000).scan(count=>count+1,0).subscribe(count=>console.log(`Clicked ${count} times`))

值(Values)

对于流经 observables 的值,可以对其进行转换

下面代码展示的是如何累加每次点击的鼠标x坐标

1
2
3
4
5
6
7
8
button.addEventListener('click',(event)=>{
if(Date.now() - lastClick >= rate){
count += event.clientX;
lastClick = Date.now();
}
})
// RxJS
Rx.Observable.FromEvent(button,'click').throttleTime(1000).map(event=>event.clickX).scan((count,clientX)=>count+clientX,0).subscribe(count=>console.log(count));

Observable(可观察对象)

Observable 是多个值的惰性推送集合,填补了下面表格的空白:

单个值 多个值
拉取 Function Iterator
推送 Promise Observable

实例:当订阅下面代码中的 Observable的时候会立即(同步地)推送值1,2,3,然后1秒后会推送值4,再然后是完成流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const { Observable } = require('rxjs');

const observable = new Observable(obeserver => {
obeserver.next(1);
obeserver.next(2);
obeserver.next(3);
setTimeout(() => {
obeserver.next(4);
obeserver.complete();
}, 1000)
})

// 要调用 Observable 并看到这些值,我们需要订阅 Observable
console.log('just before subscribe');

observable.subscribe({
next: x => console.log(`got value ${x}`),
error: err => console.error(`something wrong occurred: ${err}`),
complete: () => console.log('done')
})

console.log('just after subscribe')

拉取(Pull)vs. 推送(Push)

拉取和推送是两种不同的协议,用来描述数据生产者(Producer)如何与数据消费者(Consumer)进行通信的。

拉取-在拉取体系中,由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。

每个JS函数都是拉取体系,函数是数据的生产者,调用该函数的代码通过从函数调用中取出一个单个返回值来对该函数进行消费。

ES2015引入了 generator 函数和 iterators(function*) 这是另外一种类型的拉取体系。调用 iterator.next() 的代码是消费者,它会从 iterator(生产者)取出多个值

生产者 消费者
拉取 被动的:当被请求时产生数据 主动的:决定何时请求数据
推送 主动的:按自己的节奏产生数据 被动的:对收到的数据做出反应

推送-在推送体系中,由生产者来决定何时把数据发送给消费者,消费者本身不知道何时会接受到数据

Promise是最常见的推送体系类型。由Promise(生产者)将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由Promise来决定何时把值推送给回调函数。

RxJS引入了 Observables 一个新的推送体系。Observable 是多个值的生存者,并将值推送给观察者(消费者)。

  • Function 是惰性的评估运算,调用时会同步地返回一个单一值
  • Generator 是惰性的评估运算,调用时会同步地返回零到(有可能的)无限多个值
  • Promise 是最终可能(或可能不)返回单个值的运算
  • Observable 是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到(有可能的)无限多个值

Observable作为函数的泛化

与流行的说法相反,Observable 既不像 EventEmitters,也不像多个值的Promises,在某些情况下,即当使用RxJS的Subjects进行多播时,Observables的行为可能会比较像EventEmitters,但通常情况下Observables的行为并不像EventEmitters.

Observables 像是没有参数,但可以泛化为多个值的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function foo(){
console.log('Hello');
return 42;
}

const x = foo.call(); // 相当于 foo()
console.log(x)

const y = foo.call(); // 等同于 foo()
console.log(y)

// 输出 "Hello" 42 "Hello" 42
// Observables
const { Observable } = require('rxjs');

const foo = new Observable(observer => {
console.log('Hello');
observer.next(42)
})

foo.subscribe(x => console.log(x))
foo.subscribe(y => console.log(y))

因为函数和 Observables 都是惰性运算,如果不调用函数,不会执行console.log('Hello')。Observable 也是如此,不调用(使用 subscribe),console.log('Hello')也不会执行。此外,调用或订阅是独立的操作;两个函数调用会触发两个单独的副作用,两个Observable订阅同样也是触发两个单独的副作用。EventEmitters共享副作用并且无论是否存在订阅者都会尽早执行,Observables与之相反,不会共享副作用并且是延迟执行。

订阅 Observable 类似于调用函数

一些人声称Observables是异步的,那不是真的:

1
2
3
4
5
6
7
8
9
10
console.log('before');
foo.subscribe(x=>console.log(x))
console.log('after');
/**
before
Hello
42
after
*/
// 这说明了foo订阅是完全同步的,就像函数一样

那么 Observable 和函数的区别是什么?Observable可以随着时间的推移返回多个值,这是函数所做不到的

1
2
3
4
5
function foo(){
console.log('Hello');
return 45;
return 100; // 死代码,不会执行
}

函数只能返回一个值,但 Observables 可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const foo = new Observable(observer => {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
})

console.log('before')
foo.subscribe(x => console.log(x))
console.log('after')
/**
before
Hello
42
100
200
after
*/

也可以异步地返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const foo = new Observable(observer => {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300);
}, 1000)
})

console.log('before')
foo.subscribe(x => console.log(x))
console.log('after')
/**
before
Hello
42
100
200
after
300
*/

结论:

  • func.call() 意思是同步地给一个值
  • observable.subscribe() 意思是给我任意数量的值,无论是同步还是异步的

Observable剖析

Observables使用观察者来订阅,然后执行并发送 next/error/complete给观察者,而且执行可能会被清理。这四个方面全部编码在Observables实例中,但某些方面是与其他类型相关的,像 Observer观察者和Subscription订阅

Observable的核心关注点:

  • 创建 Observables
  • 订阅 Observables
  • 执行 Observables
  • 清理 Observables
创建
1
2
3
const observable = new Observable(observer => {
const id = setInterval(() => observer.next('hi'), 1000)
})
订阅
1
2
3
4
5
const observable = new Observable(observer => {
const id = setInterval(() => observer.next('hi'), 1000)
})

observable.subscribe(x => console.log(x))

subscribe调用在同一 Observable 的多个观察者之间是不共享的,当使用一个观察者调用 observable.subscribe时,new Observable(subscrive)中的 subscribe函数只服务给定的观察者,对于observable.subscribe的每次调用都会触发针对给定观察者的独立设置

订阅 Observable 像是调用函数,并提供接收数据的回调函数

这与像 addEventListener/removeEventListener 这样的事件处理方法API是完全不同的。使用 observable.subscribe 在 Observable中不会将给定的 观察者注册为监听器。Observable甚至不会去维护一个附加的观察者列表

subscribe 调用是启用Observable执行的一种简单方式,并将值或事件传递给本次执行的观察者

执行

Observable 是惰性运算,只有在每个观察者订阅后才会执行,随着时间的推移,执行会以同步或异步的方式产生值

Observable执行可以传递三种类型的值:

  • Next通知:发送一个值,比如数字、字符串、对象等等
  • Error通知:发送一个JS错误或异常
  • Complete通知:不再发送任何值

Next通知是最重要,也是最常见的类型,它们表示传递给观察者的实际数据,Error和Complete通知只会在Observable执行期间发生一次,并且只会执行其中的一个

这些约束用所谓的 Observable语法或合约表达最好,写为正则是这样的:

1
next*(error|complete)?

三个next然后是complete

1
2
3
4
5
6
7
const observable = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
// Observable严格遵守自身的规约,complete之后不会再发送

try/catch可以捕获异常

1
2
3
4
5
6
7
8
9
10
const observable = new Observable(observer => {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err)
}
})
清理

因为Observable执行可能是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个API来取消执行,因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力和内存资源。

当调用了 observable.subcribe 观察者会被附加到新创建的 Observable 执行中,这个调用还返回了一个对象,即 Subscription(订阅):

1
const subscription = observable.subscribe(x=>console.log(x))

Subscription表示进行中的执行,有最小化的API以允许你取消执行

1
2
3
4
5
const { from } = require('rxjs');

const observable = from([10,20,30])
const subscription = observable.subscribe(x=>console.log(x));
subscription.unsubscribe();

当订阅了 Observable会得到一个 Subscription,它表示进行中的执行,只要调用 unsubscribe方法就可以取消执行

Observable可以通过返回一个自定义 unsubscribe函数

1
2
3
4
5
6
7
8
9
10
11
12
13
const observable = new Observable(observer => {
const intervalID = setInterval(() => {
observer.next('hi')
}, 1000)
return function unsubscribe() {
clearInterval(intervalID)
}
})

const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 2000)

Observer(观察者)

观察者是由 Observable发送的值的消费者,观察者只是一组回调函数的集合,每个回调函数对应一种 Observable发送的通知类型:next/error和complete,下面是一个典型的观察者对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const observable = new Observable(observer => {
const intervalID = setInterval(() => {
observer.next('hi')
}, 1000)
return function unsubscribe() {
clearInterval(intervalID)
}
})

const observer = {
next: x => console.log(`Observer got a next value: ${x}`),
error: err => console.log(`Observer got a error: ${err}`),
complete: () => console.log(`Observer got a complete notification`),
}

const subscription = observable.subscribe(observer);
setTimeout(() => {
subscription.unsubscribe();
}, 2000)

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable发送的通知类型

RxJS的观察者也可能是部分的,如果没有提供某个回调函数,Observable的执行也会正常执行,只是某些通知类型可能会被忽略,因为观察者中没有相对应的回调函数

Subscription(订阅)

Subscription是表示可清理资源的对象,通常是 Observable的执行,Subscription中有一个重要的方法,即 unsubscribe,不需要任何参数,只是用来清理 Subscription占用的资源,上个版本又叫做 Disposable(可清理对象)

Subscription基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable执行

Subscription还可以合在一起,这样一个 Subscription调用 unsubscribe 方法,可能会有多个 Subscription取消订阅,可以通过把一个Subscription添加到另外一个来做这件事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first:', x))
const childSubscription = observable2.subscribe(x => console.log('second:', x))

subscription.add(childSubscription);

setTimeout(() => {
subscription.unsubscribe();
}, 1000)
/*
second: 0
first: 0
second: 1
first: 1
second: 2
*/

// Subscriptions 还有一个 remove(otherSubscription)方法,用来撤销一个已添加的子 Subscription

Subject(主体)

Subject是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以Subject是多播,而普通的 Observable是单播的(每个已订阅的观察者都拥有 Observable的独立执行)

Subject 像是 Observable,但是可以多播给多个观察者,Subject还像是 EventEmitters,维护着多个监听器的注册表

每个 Subject都是 Observable,对于 Subject,可以提供一个观察者并使用 subscribe方法,就可以开始正常接收值,从观察者角度,无法判断 Observable执行是普通的 Observable还是 Subject

在Subject内部,subscribe 不会调用发送值的新执行,它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener的工作方式

每个 Subject都是观察者,Subject是一个有如下方法的对象:next(v)/error(e)/complete(),要给 Subject提供新值,只要调用 next(theValue),它会将值多播给已注册监听该Subject的观察者们。

下面的例子给 Subject添加两个观察者,然后给 Subject提供一些值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const subject = new Subject();

subject.subscribe({
next: (v) => console.log(`ObserverA: `, v)
})

subject.subscribe({
next: (v) => console.log(`ObserverB: `, v)
})

subject.next(1);
subject.next(2);
/*
ObserverA: 1
ObserverB: 1
ObserverA: 2
ObserverB: 2
*/

因为 Subject是观察者,这也就意味着可以把 Subject作为参数传给任何 Observable的 subscribe方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const subject = new Subject();

subject.subscribe({
next: (v) => console.log(`ObserverA: `, v)
})

subject.subscribe({
next: (v) => console.log(`ObserverB: `, v)
})

const observable = from([1, 2, 3])
observable.subscribe(subject)
/*
ObserverA: 1
ObserverB: 1
ObserverA: 2
ObserverB: 2
ObserverA: 3
ObserverB: 3
*/

使用上面的方法基本上只是通过 Subject将单播的 Observable执行转换为多播的,也说明了Subjects是将任意的 Observable 执行共享给多个观察者的唯一方式。

还有一些特殊的类型的Subject: BehaviorSubject/ReplaySubject和AsyncSubject

多播的Observables

多播的Observable通过Subject来发送通知,这个Subject可能有多个订阅者,然后普通的单播 Observable值发送通知给单个观察者。

多播 Observable在底层是通过使用 Subject使得多个观察者可以看到同一个 Observable执行

在低层,这是 multicast操作符的工作原理:观察者订阅一个基础的Subject,然后Subject订阅源Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.multicast(subject);

// 在底层使用了 `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`ObserverA: `, v)
})
multicasted.subscribe({
next: (v) => console.log(`ObserverB: `, v)
})

// 在底层使用了 `source.subscribe(subject)`:
multicasted.connect();

multicast操作符返回一个Observable,看起来和普通的Observable没有什么区别,但当订阅就像是Subject,multicast返回的是ConnectableObservable,它只是一个有connect方法的 Observable.

connect决定了何时启动共享的Observable执行,因为connect在底层执行了source.subscribe(subject)。它返回的是Subscription,可以取消订阅以取消共享的Observable执行

引用计数

手动调用 connect并处理 Subscription过于笨重,当第一个观察者到达时我们要自动连接,而当最后一个观察者取消订阅时我们想要自动地取消共享执行。

  1. 第一个观察者订阅了多播的Observable
  2. 多播Observable已连接
  3. next值0发送给第一个观察者
  4. 第二个观察者订阅了多播Observable
  5. next值1发送给第一个观察者
  6. next值1发送给第二个观察者
  7. 第一个观察者取消了多播的Observable订阅
  8. next值2发送给第二个观察者
  9. 第二个观察者取消了多播 Observable 的订阅
  10. 多播 Observable的连接已中断(底层进行的操作是取消订阅)

要实现这点,需要显式调用 connect:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const { interval, Subject  } = require('rxjs');
const { multicast } = require('rxjs/operators');

const source = interval(500);
const multicasted = source.pipe(multicast(() => new Subject()));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
next: v => console.log(`ObserverA: ${v}`)
})

subscriptionConnect = multicasted.connect();

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: v => console.log(`ObserverB: ${v}`)
})
}, 600)


setTimeout(() => {
subscription1.unsubscribe();
}, 1200)

// 取消共享的 observable执行的订阅,因此此后 multicasted将不再有订阅者
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe();
}, 2000)

/*
ObserverA: 0
ObserverA: 1
ObserverB: 1
ObserverB: 2
*/

如果不想显式调用 connect ,可以使用 ConnectableObservable的 refCount 方法引用计数,这个方法返回 Observable,这个 Observable会追踪多个订阅者。当订阅者的数量从0变成1,它会调用connect以开启共享的执行,当订阅者数量从1变成0时,它会完全取消订阅,停止进一步的执行。

refCount的作用是,当有第一个订阅者时,多播 Observable会自动地启动执行,当最后一个订阅者离开时,多播 Observable会自定地停止执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
const { interval, Subject } = require('rxjs');
const { multicast, refCount } = require('rxjs/operators');

const source = interval(500);
const subject = new Subject();
/*
const refCounted = source.pipe(
multicast(subject),
refCount()
)
*/
const refCounted = source.pipe(multicast(subject)).pipe(refCount())

let subscription1, subscription2;

// 这里其实调用了 `connect()`,
// 因为 `refCounted` 有了第一个订阅者
console.log('observerA subscribed');

subscription1 = refCounted.subscribe({
next: v => console.log(`ObserverA: ${v}`)
})


setTimeout(() => {
subscription2 = refCounted.subscribe({
next: v => console.log(`ObserverB: ${v}`)
})
}, 600)


setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200)

// 这里共享的 Observable 执行会停止,
// 因为此后 `refCounted` 将不再有订阅者
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000)

/*
observerA subscribed
ObserverA: 0
ObserverA: 1
ObserverB: 1
observerA unsubscribed
ObserverB: 2
observerB unsubscribed
*/

BehaviorSubject

Subject的其中一个变体就是 BehaviorSubject,它有一个当前值的概念,它保存了发送给消费者的最新值,并且当有新的观察者订阅时,会立即从 BehaviorSubject那接收到当前值

BehaviorSubjects适合用来表示随着时间推移的值,例如生日的流是一个Subject,但年龄的流应该是一个BehaviorSubject

下面例子中,BehaviorSubject使用值0进行初始化,当第一个观察者订阅时会得到0,第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const beSubject = new BehaviorSubject(0);

beSubject.subscribe({
next: v => console.log(`observerA: ${v}`)
})

beSubject.next(1);
beSubject.next(2);

beSubject.subscribe({
next: v => console.log(`observerB: ${v}`)
})

beSubject.next(3)
/*
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
*/

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录Observable执行的一部分

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者

当创建 ReplaySubject时,可以指定回放多个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const reSubject = new ReplaySubject(3);

reSubject.subscribe({
next: v => console.log(`observerA: ${v}`)
})

reSubject.next(1);
reSubject.next(2);
reSubject.next(3)
reSubject.next(4)

reSubject.subscribe({
next: v => console.log(`observerB: ${v}`)
})

reSubject.next(5)
/*
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
*/

除了缓冲数量,还可以指定 window time(以毫秒为单位)来确定多久之前的值可以记录,在下面的示例中,使用了较大的缓存数量100,但 window time 参数只设置了500毫秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const reSubject = new ReplaySubject(100, 500);

reSubject.subscribe({
next: v => console.log(`observerA: ${v}`)
})

let i = 1;

setInterval(() => reSubject.next(i++), 200)

setTimeout(() => {
reSubject.subscribe({
next: v => console.log(`observerB: ${v}`)
})
}, 1000)

从下面的输出可以看出,第二个观察者得到的值是34,这两个值是订阅发生前的500毫秒内发生的:

1
2
3
4
5
6
7
8
9
10
11
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 3
observerB: 4
observerA: 5
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

只当 Observable 执行完成时(执行complete)它才会将执行的最后一个值发送给观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const asSubject = new AsyncSubject(100, 500);

asSubject.subscribe({
next: v => console.log(`observerA: ${v}`)
})

asSubject.next(1);
asSubject.next(2);
asSubject.next(3);
asSubject.next(4);

asSubject.subscribe({
next: v => console.log(`observerB: ${v}`)
})

asSubject.next(5);
asSubject.complete();
/*
observerA: 5
observerB: 5
*/

AsyncSubject和last操作符类似,因为它也是等待complete通知,一发送一个单一值

Operators(操作符)

操作符是运行复杂的异步代码以声明的方式进行轻松组合的基础代码单元

什么是操作符?

操作符是Observable类似上的方法。当操作符被调用时,它们不会改变已经存在的 Observable实例。相反,它们返回一个新的 observable,它的 subscription逻辑基于第一个 Observable.

操作符是函数,它基于当前的 Observable 创建一个新的 Observable,这是一个无副作用的操作:前面的 Observable保持不变

操作符本质上是一个纯函数,它接收一个 observable 作为输入,并生成一个新的 observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable。下面实例中,我们创建一个自定义操作符函数,它将输入 Observable 接受的每个值都乘以10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function multiplyByTen(input) {
return new Observable(observer => {
input.subscribe({
next: v => observer.next(10 * v),
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
}

const input = from([1, 2, 3, 4]);
const output = multiplyByTen(input);
output.subscribe(x => console.log(x))

/*
10
20
30
40
*/

注意,订阅 output 会导致 input Observable 也被订阅。我们称之为“操作符订阅链”。

实例操作符 vs. 静态操作符

通常提到的操作符是实例操作符,是 Observable 实例上的方法,如果上面的 multiplyByTen是官方提供的实例操作符,看起来大概是这样的

1
2
3
4
5
6
7
8
9
10
Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
const input = this;
return Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
}

实例运算是使用 this 关键字来指代输入的 Observable 函数

注意,这里的 input Observable 不再是一个函数参数,它现在是 this 对象。下面是我们如何使用这样的实例运算符:

1
2
const observable = from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));

而静态操作符是直接附加到 Observable类上的。静态操作符在内部不使用 this关键字,而是完全依赖于它的参数

静态操作符是附加到 Observable 类上的纯函数,通常用来从头开始创建 Observable

最常见的 静态操作符是所谓的创建操作符,它们只接收非 Observable参数,比如数字,然后创建一个新的Observable,而不是将一个输入 Observable转换为输出 Observable

一个典型的静态操作符例子就是 interval函数,它接收一个数字非Observable作为参数,并生产一个 Observable作为输出:

1
const observable = interval(1000);

然而,有些静态操作符可能不同于简单的创建。一些组合操作符可能是静态的,比如 mergecombineLatestconcat,等等。这些作为静态运算符是有道理的,因为它们将多个 Observables 作为输入,而不仅仅是一个,例如:

1
2
3
4
5
const { interval, merge } = require('rxjs');

const observable1 = interval(1000);
const observable2 = interval(400);
const merged = merge(observable1, observable2);
0%