ZHANGYU.dev

October 14, 2023

RxJS学习笔记(三)合并类操作符

JavaScript5.5 min to read

合并类操作符

如果把Observable数据流比做一条河流,两条河流汇聚的时候将会合并成一条大江,但是在RxJS中,合并并不是这么简单,不同的Observable数据流合并有不同的规则,有些是相互交错合并,有些是像排队一样,一条数据流走完,另一条才能跟在后面

concat

Javascript中,数组有一个concat方法,这个方法可以把其他数组的值按照参数顺序合并到当前数组

const a = [1, 2, 3];const b = [4, 5, 6];const c = [7, 8, 9];const d = [].concat(a, b, c);// 1 2 3 4 5 6 7 8 9

RxJS中,concat操作符也有类似的功能,可以把多个Observable对象依次合并

import { of, concat } from "rxjs";const source$ = concat(    of(1, 2, 3),    of(4, 5, 6),    of(7, 8, 9));source$.subscribe(value => console.log(value));// 1 2 3 4 5 6 7 8 9

concat操作符的合并,必须等上一个Observable对象完结后,才会继续合并后面的Observable对象,如果一个Observable对象永不完结,那么concat操作符不会返回任何数据,这一点在合并异步数据时就能体现

合并异步Observable对象

const source$ = concat(  timer(2000).pipe(mapTo("a")),  timer(1000).pipe(mapTo("b")));source$.subscribe(value => console.log(value));// 2秒后输出 a// 1秒后输出 b

这里涉及到一个操作符mapTo,可以理解这个数据流只会返回传入参数的数据

merge

merge操作符在合并同步数据上,是和concat操作符一样的,但是在合并异步数据流时,就能展现出差异

import { merge, timer } from "rxjs";import { mapTo } from "rxjs/operators";const source$ = merge(  timer(2000).pipe(mapTo("a")),  timer(1000).pipe(mapTo("b")));source$.subscribe(value => console.log(value));// 1秒后输出 b// 2秒后输出 a

可以看出,merge操作符在合并异步数据流时,是不会按照参数顺序依次等待的,而是先完结,先合并

merge操作符还可以有一个number类型可选参数concurrent,这个参数的作用将限制merge的同时合并

不使用concurrent参数

import { interval, merge } from "rxjs";import { mapTo } from "rxjs/operators";const source$ = merge(  interval(1000).pipe(mapTo("a")),  interval(1000).pipe(mapTo("b")),  interval(1000).pipe(mapTo("c")));source$.subscribe(value => console.log(value));// 1秒后输出 a b c// 再1秒后输出 a b c// ...

使用concurrent参数

const source$ = merge(  interval(1000).pipe(mapTo("a")),  interval(1000).pipe(mapTo("b")),  interval(1000).pipe(mapTo("c")),  2);source$.subscribe(value => console.log(value));// 1秒后输出 a b// 再1秒后输出 a b// ,,,

使用concurrent参数后,同时进行的Observable数据流只会有限制的个数,后面限制住的Observable对象会在前面的Observable对象完结后,再加入merge合并

zip

zip的意思是拉链,拉链的链齿是一一对应的,zip就能将数据一一对应起来

import { zip, of } from "rxjs";const source$ = zip(    of(1, 2, 3),    of("a", "b", "c"));source$.subscribe(value => console.log(value));// [ 1, "a" ]// [ 2, "b" ]// [ 3, "c" ]

可以看见每次输出是一个数组,元素的值和Observable数据流是一一对应`,这里是同步的数据

如果是异步的数据呢?

import { zip, of, interval } from "rxjs";const source$ = zip(    of("a", "b", "c"),    interval(1000));source$.subscribe({  next: value => console.log(value),  complete: () => console.log("complete")});// [ "a", 0 ]// [ "b", 1 ]// [ "c", 2 ]// complete

如果是异步的数据,就算一个Observable对象先吐出数据,zip为了做到一一对应,会把这个数据暂存下来,等另一个Observable对象吐出数据后,一一对应后再输出

zip操作符会在任一Observable对象完结后,退订所有的Observable对象,所以在of("a", "b", "c")完结后,输出了complete

不过zip会有数据堆积的问题,如果一个Observable对象吐出数据很快,另一个很慢,那zip会把吐出快的数据堆积起来,等吐出慢点吐出数据后一一对应,这样会导致堆积的数据越来越多,内存也占用越多

上述的zip操作符例子,都只有2个Observable对象,其实zip操作符是可以处理个Observable对象的,数据也是一一对应,在理解了像拉链一一对应的两个Observable对象后,多个也不难理解了

combineLatest

combineLatestzip一样,也会将数据放入数组中输出,不同的是,只要有Observable吐出数据,combineLatest就会取当前数据流产生的最新数据输出,并且combineLatest的参数需要放入一个数组中

import { combineLatest, of, interval } from "rxjs";// 参数是一个数组const source$ = combineLatest([  interval(2000),  interval(1000)]);source$.subscribe({  next: value => console.log(value),  complete: () => console.log("complete")});// 等待两秒后// [ 0, 0 ] // 第一次输出// [ 0, 1 ] // 一个产生新数据(1),会使用另一个最新的数据(0)// [ 0, 2 ]// [ 1, 2 ]// [ 1, 3 ]// [ 1, 4 ]// ...

interval(1000)操作符产生数据时,interval(2000)还没有产生数据,所以不会有输出

2秒后,interval(2000)产生数据了,可以一一对应,所以输出[ 0, 0 ],同时interval(1000)输出了新值1,这时候interval(2000)没有产生新数据,所以会用之前产生的数据输出[ 0, 1 ]

combineLatest如果输出,会取所有数据流的最新数据,即使有一个数据流完结,另一个没有完结,combineLatest还是会使用完结的数据流的最新数据持续输出

combineLatest的第一次输出顺序也值得研究

同步数据流

const source$ = combineLatest([  of("a", "b", "c"),  of(1, 2, 3),]);// ["c", 1]// ["c", 2]// ["c", 3]

同步、异步数据流

const source$ = combineLatest([  of(1, 2, 3),  interval(10000),  interval(5000),]);// 10秒后// [3, 0, 0]// ...

我的理解是,如果是同步数据流,会根据顺序最后一个数据流吐出数据后,combineLatest开始输出,如果是异步数据流,则会等待最慢的数据流吐出数据后,combineLatest再输出

withLatestFrom

withLatestFrom的功能类似combineLatest,不过写法不同

import { interval } from "rxjs";import { withLatestFrom } from "rxjs/operators";const source$ = interval(1000).pipe(  withLatestFrom(interval(2000), interval(500)));source$.subscribe({  next: value => console.log(value),  complete: () => console.log("complete")});// [1, 0, 3]// [2, 0, 5]// [3, 1, 7]// ...

可以看到,输出是连接withLatestFromObservable对象主导的,只有interval(1000)输出的时候,withLatestFrom才会输出,这时候interval(500)的值已经是3了

race

race操作符就像是Javascript中的Promise.race只会输出最快完成的Promise一样,race操作符也只会输出最快的Observable对象产生的值

import { interval, race } from "rxjs";import { mapTo } from "rxjs/operators";const source$ = race(  interval(500).pipe(mapTo("a")),  interval(1000).pipe(mapTo("b")),  interval(2000).pipe(mapTo("c")));source$.subscribe({  next: value => console.log(value),  complete: () => console.log("complete")});// a// a// a// ...

interval(500)是最快输出的,race会退订其他的Observable对象,只输出interval(500)Observable对象

forkJoin

race像是Javascript中的Promise.raceforkJoin则像是Javascript中的Promise.all,它会等待所有的Observable对象完结后再输出

import { timer, forkJoin } from "rxjs";const source$ = forkJoin([  timer(1000),  timer(2000),  timer(3000)]);source$.subscribe(value => console.log(value));// 等待3秒// [ 0, 0, 0 ]

需要注意的是,Observable对象也得放在数组中

startWith

startWith操作符会在Observable对象被订阅的时候,先吐出若干条数据,相当于初始值或者是默认值

import { interval } from "rxjs";import { startWith } from "rxjs/operators";const source$ = interval(5000).pipe(startWith("a"));source$.subscribe(value => console.log(value));// a// 5秒后 0// 1// ...

与之对应的还有一个endWith