ZHANGYU.dev

October 14, 2023

RxJS学习笔记(七)回压控制

JavaScript6.1 min to read

RxJS,数据流可以理解为经过一条一条的管道,如果这个时候,某一处的管道处理速度跟不上数据产生的速度,就会造成在内存中积压数据,相当于对上游施加了压力,在RxJS中称之为“回压”

throttle和debounce

这两个操作符是常见的,称为“节流”和”防抖“

在浏览器中,比如resize事件后对元素进行重新布局,但是resize触发很频繁的话,每次都会重新布局,是十分消耗性能的,这时候就可以用这两个操作符进行处理

throttle称为节流,可以理解为每隔一段时间内只能进行一次操作,debounce称为“防抖”,可以理解为一段时间只能进行一次操作

比如移动鼠标,如果是throttle的话,也许移动过程中每5秒只会调用一次回调函数,而debounce则可以理解为如果鼠标一直在移动,则在鼠标停止的时候调用回调函数

RxJS里,这两个操作符分为throttledebouncethrottleTimedebounceTime,但是只有包含Time的操作符功能对应的是lodash里的throttledebounce

throttleTime和debounceTime

这两个操作符都接受必选参数dueTime

throttleTime

import { interval } from "rxjs";import { throttleTime } from "rxjs/operators";const source$ = interval(500).pipe(throttleTime(2000));source$.subscribe(console.log);// 0// 两秒后 5// 又两秒后 10// ...

使用了throttleTime操作符后,虽然interval是每500毫秒吐出一条数据,但是经过了throttleTime,在2秒的区间内,只会有一条数据吐出

debounceTime

import { fromEvent } from "rxjs";import { debounceTime } from "rxjs/operators";const source$ = fromEvent(document,"mousemove").pipe(debounceTime(200));source$.subscribe(console.log);

在上例中,只有鼠标停止移动后的200毫秒后,才会吐出数据

用数据流来控制

了解了throttleTimedebounceTime后,了解throttledebounce就不难了

throttledebounce都接受一个函数作为参数,这个函数的参数为当前的value值,需要返回一个可被订阅的对象或者是Promise

import { interval, timer } from "rxjs";import { throttle } from "rxjs/operators";const source$ = interval(1000).pipe(  throttle(value => timer(value % 3 === 0 ? 3000 : 1000)));source$.subscribe(console.log);// 0// 4// 6// 10// 12// ...

在当前值能被3整除时,下一段3秒内经过throttle只会吐出一个值

同样可以返回Promise

const source$ = interval(1000).pipe(  throttle(    value =>      new Promise(resolve => setTimeout(resolve, value % 3 === 0 ? 3000 : 1000))  ));source$.subscribe(console.log);// 和上例输出相同

auditTime和audit

aduit操作符和throttle操作符类似,不过throttle是吐出时间段内的第一条数据,aduti是吐出时间段最后一条数据

auditTime

import { interval } from "rxjs";import { auditTime } from "rxjs/operators";const source$ = interval(1000).pipe(auditTime(2000));source$.subscribe(console.log);// 2// 5// 8

audit

import { interval, timer } from "rxjs";import { audit } from "rxjs/operators";const source$ = interval(1000).pipe(audit(() => timer(2000)));source$.subscribe(console.log);// 和上例相同

sampleTime和sample

sample就是采样的意思,sample操作符需要从一段时间采样一个数据抛弃掉其他数据

sampleTime

sampleTime操作符实际的表现看上去似乎和auditTime类似,但是auditTime操作符是上游吐出数据后,接下来的时间段吐出上游在这段时间产生的最后一个数据

sampleTime是按照自己的节奏吐出数据

import { interval } from "rxjs";import { sampleTime } from "rxjs/operators";const source$ = interval(300).pipe(sampleTime(500));source$.subscribe(console.log);// 0 2 3 5 7 8 10 12 ...

不管上游吐出数据有多快,sampleTime操作符总会按照自己的时间吐出数据

sample

sample操作符和之前的throttle或者是audit都不同,它接受的参数是一个Observable对象,当这个对象产生一个数据的时候,sample就会从上游拿最后一个产生的数据传递给下游

import { fromEvent, interval } from "rxjs";import { map, sample } from "rxjs/operators";const source$ = interval(10).pipe(  map(value => value * 10),  sample(fromEvent(document, "click")));source$.subscribe(console.log);

上例在点击后,sample操作符从上游取最后一个数据,这个数据就是页面加载完成后,到点击时过去的时间

根据数据序列做回压控制

比如上游数据是0、0、1、1、1、2、2,如果说需要去掉重复的数据,只处理0、1、2,这时候,前面介绍的回压控制操作符就没用了

distinct

distinct操作符会对上游吐出的数据去重

import { of } from "rxjs";import { distinct } from "rxjs/operators";const source$ = of(0, 0, 1, 1, 2, 1, 2, 2, 0).pipe(distinct());source$.subscribe(console.log);// 0 1 2

普通情况下distinct操作符是使用===比较,如果是对象的话就比较不出来了,好在distinct操作符接收一个keySelector函数来比较复杂类型

import { of } from "rxjs";import { distinct } from "rxjs/operators";const source$ = of(  { count: 0 },  { count: 1 },  { count: 1 },  { count: 2 },  { count: 0 }).pipe(distinct(value => value.count));source$.subscribe(console.log);//  { count: 0 }//  { count: 1 }//  { count: 2 }

distinct操作符的问题在于,它会存储一个唯一数据集合,如果不同的数据越来越多,那么这个集合也就会越来越大,所以distinct提供了第二个参数,用于刷新内部的唯一数据集合

import { interval } from "rxjs";import { distinct, map } from "rxjs/operators";const source$ = interval(10).pipe(  map(value => (value % 2 === 0 ? 1 : 2)),  distinct(undefined, interval(2000)));source$.subscribe(console.log)// 1 2// 两秒后// 1 2// ...

第二个参数interval操作符每2秒吐出一个数据,通知distinct刷新内部唯一数据集合,所以两秒后又会输出与之前相同的数据

distinctUntilChanged

distinctUntilChanged操作符只会吐出与之前数据不同的数据,所以不需要维护一个内部唯一集合

import { of } from "rxjs";import { distinctUntilChanged } from "rxjs/operators";const source$ = of(1, 1, 1, 2, 2, 2, 1, 1, 2, 2).pipe(distinctUntilChanged());source$.subscribe(console.log);// 1 2 1 2

当前值与上一次吐出的值不同的时候,就会吐出当前的值

distinctUntilChanged操作符可以接收2个可选参数,第一个参数是compare函数,第二个参数是keySelector函数

import { of } from "rxjs";import { distinctUntilChanged } from "rxjs/operators";const source$ = of(  { age: 4, name: "Foo" },  { age: 7, name: "Bar" },  { age: 5, name: "Foo" },  { age: 6, name: "Foo" }).pipe(distinctUntilChanged((p, q) => p.name === q.name));source$.subscribe(x => console.log(x));// { age: 4, name: 'Foo' }// { age: 7, name: 'Bar' }// { age: 5, name: 'Foo' }

distinctUntilKeyChanged

distinctUntilKeyChanged操作符可以理解为distinctUntilChanged操作符的简写形式

import { of } from "rxjs";import { distinctUntilKeyChanged } from "rxjs/operators";const source$ = of(  { name: "Foo1" },  { name: "Bar" },  { name: "Foo2" },  { name: "Foo3" }).pipe(  distinctUntilKeyChanged(    "name",    (x, y) => x.substring(0, 3) === y.substring(0, 3)  ));source$.subscribe(x => console.log(x));// { age: 4, name: 'Foo1' }// { age: 7, name: 'Bar' }// { age: 5, name: 'Foo2' }

上例比较name字段前三个字符

其他操作符

ignoreElements

ignoreElements操作符会忽略上游所有数据,只吐出complete或者error

import { of } from "rxjs";import { ignoreElements } from "rxjs/operators";const source$ = of(1, 2, 3).pipe(ignoreElements());source$.subscribe({  next: console.log,  complete: () => console.log("complete")});// complete

elementAt

elementAt操作符只会取参数索引的哪一条,不过可以指定默认值

import { of } from "rxjs";import { elementAt } from "rxjs/operators";const source$ = of(1, 2, 3).pipe(elementAt(9, -1));source$.subscribe(console.log);// -1

single

single操作符用于判断上游是否只有一条符合条件的数据,是就向下游传递数据,否则抛出一条异常

import { of } from "rxjs";import { single } from "rxjs/operators";const source$ = of(1, 2, 1).pipe(single(value => value === 1));source$.subscribe(console.log);// 不满足,抛出异常// Uncaught Sequence contains more than one element

今天修了一天年假,在家休息,轻松学习的感觉真好 突然就很想念读大学的时候,每天都无忧无虑的学习Unity3D 站在人生的十字路口,难免迷茫