Skip to content

《深入浅出 RxJS》笔记

RxJS 兼具函数式和反应式编程特点,擅长处理异步操作,因为它对数据采用 “推” 的处理方式,当一个数据产生的时候,会被推送给对应的处理函数,这个函数不用关心数据是同步产生还是异步产生的。

函数反应式编程 FRP

  • 问题
    • 如何控制大量代码的复杂度
    • 如何保持代码的可读性/可维护性
    • 如何处理异步操作
  • 函数式编程特征
    • 声明式(对应命令式)
      • 不直接操作数据,以传递函数类型的参数来封装功能(只需表达想要做什么,而不是怎么去做)
    • 纯函数
      • 函数的执行结果完全由输入参数决定,不会受除参数之外的任何数据影响
      • 函数内不会修改任何外部状态(如引用类型的参数属性)
    • 数据不可变性
      • 不要去修改现有数据,而是产生一个新的数据来体现 “程序的功能”
  • 反应式编程
    • Rx 是一套通过监听流(程序的输入可以看成是一个数据流)来做异步编程的 API
    • Rx 诞生的主要目的是解决异步问题,而实际上使用 RxJS 通常不需要关心自己是被同步执行还是异步执行
      • 数据流抽象了很多现实问题
      • 擅长处理异步操作
      • 把复杂问题分解成简单的问题组合
        • 实现某个小功能的函数就是操作符
        • RxJS 就是学习如何组合操作符来解决复杂问题

RxJS 基础知识

tree-shaking 可以处理 import 方式的导入,最好是用一个文件专门导入 RxJS 相关功能,然后再在入口导入这个文件

  • Observable 是可被观察者(数据流),Observer 是观察者

    • 代表 “流” 的变量标识符都以 $ 符号结尾(芬兰式命名法)

      • 数据流相对于数组有个好处,就是如果有个永无止境的数据流,内存不会无限增加
    • 连接两者的桥梁就是 Observable 对象的函数 subscribe,而 Observer 作为这个函数的参数

      • 如何产生事件 -> Observable -> 发布者
      • 如何响应事件 -> Observer -> 观察者
      • 什么样的发布者关联什么样的观察者 -> 何时调用 subscribe
      js
      import { Observable } from 'rxjs/Observable
      const onSubscribe = observer => {
        observer.next(1) // 把数据 “推给” observer
        observer.next(2)
        observer.next(3)
        // observer.error('err') // 通知出错 -> 弹珠图表示符号 X
        // observer.complete() // 通知完结 -> 弹珠图表示符号 |
        // 以上终结状态 error complete 只要其中一个被调用,后面的都不会执行了
        return {
          // 退订时
          unsubscribe: () => {
            // 清除定时器之类的。。。
          }
        }
      }
      const source$ = new Observable(onSubscribe)
      // 这里并不是直接传给 onSubscribe,而是被包装过的
      const observer = {
        next: item => {}, // 只有 next 是必须
        error: err => {},
        complete: () => {}
      }
      // subscribe 参数是观察者,可以用对象形式
      // 也可以用多参形式:(nextFn, errorFn, completeFn)
      // 只有 subscribe 被调用时,onSubscribe 才执行,observer 才响应
      const subscription = source$.subscribe(observer)
      // 观察者退订 - 终端 observer 与 source$ 的连接
      // -> observer 不会再响应了,但 source$ 还有可能继续发射
      subscription.unsubscribe()
  • Hot Cold Observable

    • 有多个观察者在不同时间订阅的情况下,是否响应之前时间产生的数据
    • Cold -> “点播” 每次 subscribe 都产生一个心的生产者
    js
    const clod = new Observable((observer) => {
      const producer = new Producer() // 产生新的生产者 - 新启动 - 冷 -_-
      // ...
    })
    • Hot -> “直播”
    js
    const producer = new Producer() // 生产者 - 已启动 - 热
    const clod = new Observable((observer) => {
      // ...
    })
  • 操作符:对数据流进行创建/处理/转化(静态方法、实例方法)的函数

操作符基础

返回一个 Observable 对象、对上游和下游的订阅及退订处理、处理异常境况、及时释放资源

  • (按输入)返回一个 Observable 对象的函数

    1. 根据其他 Observable 对象产生 -> 实例

      import 'rxjs/add/operators/map'

    2. 利用其他类型的输入产生 -> 静态

      import 'rxjs/add/observable/of'

    3. 不需要输入可以凭空产生 -> 静态

      Observable.create(...)

    4. 静态操作符只能出现在链式调用的首位,有些操作符既有静态类也有实例类
  • pipeable 操作符 通过 pipe 串接,有利于 tree shaking

    js
    import {of} from 'rxjs/add/observable;
    import {map, filter} from 'rxjs/add/operators;
    
    const source$ = of(1, 2, 3)
    const res$ = source$.pipe(
      filter(x => x % 2 === 0),
      map(x => x * 2)
    )

创建数据流

不依赖其他 Observable 对象产生 Observable

功能需求适用的操作符
直接操作观察者create(observer => {...})
根据有限的数据产生同步数据流of(1, 2, 3)
产生一个数值范围内的数据range(1, 100)
以循环方是产生数据generate
重复产生数据流中的数据repeat 和 repeatWhen
产生空数据流empty -> 直接产生一个完结(complete)的 Observable 对象
产生直接出错的数据流throw -> 直接 error
产生永不完结的数据流never -> 永不 complete 或 error
间隔给定时间持续产生interval 和 time
从数组等枚举类型数据产生数据流from(数组、字符串、generator、arguments)
从 Promise 对象产生数据流fromPromise
从外部事件对象产生数据流fromEvent 和 fromEventPattern
从 ajax 请求结果产生数据流ajax
延迟产生数据流defer() 参数是

创建同步数据流

  • generate -> 类似 for 循环

    js
    const res = []
    for (let i = 0; i < 10; i++) {
      res.push(i * i)
    }
    // ---- 对照 ----
    const res$ = Observable.generate(
      0, // let i = 0
      (i) => i < 10,
      (i) => i++,
      (i) => i * i
    )
  • repeat(2) -> 上游 complete 之后再重复 -> 被订阅 2 次,也被退订 2 次

创建异步数据流

  • interval(1000) -> setInterval -> . 0 . 1 . 2 ...

  • time(1000) -> setTimeout -> . 0

    • time 第二个参数作用同 interval 所以 time 是 interval 超集
  • fromEvent

    js
    const evt$ = fromEvent(dom, 'click/mousemove')
    // 也可以是自定义 event 和 action
  • fromEventPattern(addHandler, removeHandler)

    • addHandler, removeHandler 函数表示添加、移除事件的动作(无需传入事件对象)
    • 当 Observable 对象调用 subscribe 时 addHandler 被调用
    • Observable 调用 subscribe 的返回值再调用 unsubscribe 时 removeHandler 被调用
  • repeatWhen 相对于 repeat 来说还可以控制再次订阅的时间

    js
    const notifier = (notification$) => {
      return notification$.defer(1000) // 上游完结后 1s 之后再重新订阅
      // return 一个 Observable 通知重新订阅
    }
    const repeated$ = source$.repeateWhen(notifier)
  • defer 延迟产生 Observable

    js
    const observableFa = () => Observable.ajax(ajaxUrl)
    const source$ = Observable.defer(observableFa)
    // 只有在 source$ 被订阅时 observableFa 才会被调用

合并数据流

把多个 Observable 对象合并到 一个 Observable 中

功能需求适用的操作符
把多个数据流以首尾相连的方式合并concat 和 concatAll
把多个数据流以先到先得的方式合并merge 和 mergeAll
把多个数据流以一对一的方式合并zip 和 zipAll
持续合并多个数据流中最新产生的数据combineLatest combineAll withLatestFrom
从多个数据流中选取第一个产生内容的数据流race
在数据流前面添加一个指定数据startWith
只获取多个数据流最后产生的那个数据forkJoin
从高阶数据流中切换数据源switch exhaust
  • concat 从 source1$ 中获取数据并传给下游,当 source1$ complete 之后,就会调用 source1$.unsubscribe 然后调用 source2$.subscribe 继续从 source2$ 获取数据并传给下游
  • merge 会第一时间订阅所有上游 Observable 任何一个上游 Observable 有数据了就立刻传给下游,所有上游都 complete 时,下游 Observable 才会 complete

    避免使用 merge 去合并同步数据流,因为上游同步数据是一下子全传给下游的,和 concat 差不多了

  • zip 把上游的数据转化为数组形式传给下游,每个上游 Observable 产生的数据,都会在对应的数组中有一席之地。只要任意一个上游 complete 且这个 Observable 最后产生的数据也配对之后就把这最后一个数组传给下游同时下游 Observable 也 complete
  • combineLatest 合并上游产生的最新的数据,从所有上游都有数据产生开始,只要任意一个上游的发出新的数据,都会合并一次传给下游。所有上游都完结之后(不会再有新发出数据)时,下游也就完结了
    • combineLatest 的第二个参数是函数,用于"定义合并"这个操作,可以不必传数组给下游
  • withLatestFrom 与 combineLatest 不同的是它给下游推送数据只能由一个上游 Observable 驱动(主导产生数据的节奏)只有这个上游更新才会产生一次合并推送

    这个没有静态操作符,因为它需要一个做主导

  • race 多个上游 Observable 一起,看谁最先产生数据,就完全采用这个的数据推送给下游,其他的会被退订
  • forkJoin 所有上游完结时,合并最后的数据产生唯一一个数据推送给下游
  • 高阶 Observable
    • 产生的数据依然是 Observable 的 Observable 对象 -> 高阶 Observable 完结了并不代表内部 Observable 对象 完结
    • concatAll mergeAll zipAll combineAll
      • All 代表全部,把一个高阶 Observable 的所有内部 Observable 都组合起来
      • 没有任何参数,所有的输入都来自上游的 Observable 对象
    • switch 总是切换到最新的内部 Observable 对象上获取数据
      • 每当上游高阶 Observable 对象内部产生新的 Observable 就立即订阅新的,退订掉旧的
      • 完结条件:上游高阶 Observable 完结 + 当前内部 Observable 完结
    • exhaust 只有耗尽当前内部 Observable 后,才有空订阅下一个新产生的内部 Observable

辅助类操作符

功能需求适用的操作符
统计数据流中产生的所有数据个数count
获取数据流中最大或最小的数据max min
对数据流中所有数据做规约操作Reduce
判断是否所有数据满足某个判定条件every
找到第一个满足判定条件的数据find findIndex
判断一个数据流是否不包含任何数据isEmpty
如果一个数据流为空就默认产生一个指定的数据defaultIfEmpty

过滤数据流

功能需求适用操作符
过滤掉不满足判定条件的操作符filter
获得满足判定条件的第一个数据first
获得满足判定条件的最后一个数据last
从数据流中选取最先出现的若干个数据take
从数据流中选取最后出现的若干个数据takeLast
从数据流中选取数据直到某种情况发生takeWhile takeUntil
从数据流中忽略最先出现的若干个数据skip
从数据流中忽略数据直到某种情况发生skipWhile 和 skipUntil
基于数据内容的数据流筛选throttle debounce audit
基于时间的数据流筛选throttleTime debounceTime auditTime
基于采样方式的数据流筛选sample sampleTime
删除重复的数据distnct
删除重复的连续数据distnctUntilChanged distnctUntilKeyChanged
忽略数据流中的所有数据(只关心 error 和 complete)ignoreElements
只选取指定出现位置的数据elementAt
判断是否只有一个数据满足条件single
  • first 与 find findIndex 的区别

    js
    import 'rxjs/add/observable/of'
    import 'rxjs/add/operator/first'
    
    const source$ = Observable.of(3, 4, 2, 3, 4, 7)
    
    source$.first(
      (x) => x % 2 === 0,
      (value, index) => [value, index], // 可选, 定义输出结果(数组)
      -1 // 可选
    )
    // 第二个参数可选,用于定制最终吐出的数据
    // 如果没有满足条件的,将会抛出 error。
    // 但是可以传递三个参数,表示没有结果时输出的值 -1
  • takeLast 与 take 的差别:takeLast 只有在上游数据完结时,才能产生数据(才知道哪 n 个最后 n 个),而且时一次性传给下游(不会有时间间隔)

  • takeUntil 特点:上游的数据直接转手给下游,直到参数 notifier(是一个 Observable)吐出一个数据或者完结(-_- 它只要一有动静,takeUntil 就完结。。。

  • takeWhile skipWhile 的参数时一个返回 boollen 的判定函数

  • throttleTime 限制在 duration 时间范围内,从上游传递给下游的数据的个数(1 个)

  • debounceTime 传递给下游的数据间隔不能小于给定的时间 duetime

  • throttle debounce 与上面的操作符的区别是,不是用时间来控制流量,而是用 Observable 中的数据来控制流量

    • 参数是一个返回 Observable 的函数,可以理解成当 Observable 为固定时间吐数据时,与 throttleTime debounceTime 效果一样
    • 大部分场景 throttleTime debounceTime 足够,只有当固定时间的回压控制不满足要求时,才用 throttle debounce
  • auditTime 和 audit

    • auditTime 与 throttle 类似,区别是:auditTime 取时间段内的最后一个数据
  • sampleTime 与 sample 不管上游产生的数据节奏怎样,完全根据自己参数指定的间隔节奏来给下游传递数据

转化数据流

功能需求适用的操作符
将每个元素用映射函数产生新的数据map
将数据流中每个元素映射为同一个元素mapTo
提取数据流中每个数据的某个字段pluck
产生高阶 Observable 对象windowTime windowCount windowWhen windowToggle window
产生数组构成的数据流bufferTime bufferCount bufferWhen bufferToggle buffer
映射产生高阶 Observable 对象然后合并concatMap mergeMap switchMap exhaustMap
产生规约运算的结果组成的数据流scan mergeScan
  • 无损回压控制:把上游在一段时间内产生的数据放到一个数据集合(数组、Observable)里,然后把这个数据集合一次丢给下游。支持数组的以 buffer 开头,支持 Observable 的以 window 开头
    • windowTime 和 bufferTime 每间隔固定时间段来收集
    • windowCount 和 bufferCount 每收集固定个数
    • windowWhen 和 bufferWhen 参数是一个返回 Observable 的函数,Observable 的一次产生数据~完成被认为是一个“缓冲区块”
    • windowToggle 和 bufferToggle 接收 2 个参数,第一个参数 opening$ 产生一个数据代表缓冲窗口(区块)的开始,同时第二个参数 closingSelector 被调用,用来获取缓冲窗口的结束

      closingSelector 是指一个返回 Observable 的函数(它的参数是 opening$ 产生的数据)

    • window 和 buffer 的参数只是一个 notifer$ (吐出数据的节奏)分隔上游数据序列
  • 高阶的 map
    • 高阶 map 的参数(project 函数)把一个数据映射为一个 Observable 对象,而且把每个内部 Observable 的数据做组合(砸平)传给下游

      project 函数 的参数(map 传下来的数据,序号)

    • concatMap = map + concatAll
    • mergeMap = map + mergeAll
    • switchMap = map + switch
    • exhoustMap = map + exhoust 先产生的 Observable 优先级更高(喜欢旧的数据)
  • 数据分组 - 把一个数据流拆分成多个数据流
    • groupBy 传递给下游的是一个高阶 Observable
    • partition 返回一个数组(包含 2 个 Observable)-> 唯一一个不返回 Observable 的操作符
  • 累计数据 规约操作
    • scan 对上游每一个数据都会产生一个规约结果传递给下游,无需上游数据完结(与 reduce 的区别)
    • mergeScan 区别是:规约函数返回的是一个 Observable 而不是一个数据(使用频率低)

异常错误处理

对错误异常的处理可以分为 2 类:恢复(发生错误时依然让运算继续下去)、重试(重新尝试之前发生错误的操作)

功能需求适用的操作符
捕获并处理上游产生的异常错误catch
当上游产生错误时进行重试retry retryWhen
无论是否出错都要进行一些操作(上游完结 or 出错时发挥一次作用)finally
  • catch “恢复”
js
// err: 错误对象
// caught$ 代表上游 Observable 对象(直接返回 caught$ 可以模拟重试的效果)
// 返回的 observable 吐出的数据会替代发生错误的那一个数据
const catch$ = error$.catch((err, caught$) => Observable.of(8))
  • retry "重试"(本质是重新订阅一次上游的 Observable,在订阅的同时取消上一次订阅,对于 Hot 数据流其实并不是真的 “重试”,只不过是重新订阅而已)
  • retryWhen 参数是一个返回 OBservable 的函数(notice$ 节奏控制器)

多播

让一个数据流的内容被多个 Observer 订阅 Subject BehaviorSubject ReplaySubject AsyncSubject 类型

功能需求适用的操作符
灵活选取 Subject 对象进行多播multicast share publish
只多播数据流中最后一个数据publishLast
对数据流中给定数量的数据进行多播publishReplay
拥有默认数据的多播publishBehavior
  • Hot Clod 数据流

    • Hot 操作符的数据源来自外部,实现的是 多播
    • Clod 实现的是单播(每次订阅都是重新发数据)
  • Subject 包装 Clod 产生一个新的 Hot

    • Subject 实例命名通常不以 $ 结尾
    • Subject 兼具 Observable 和 Observer 的性质 -> 实现多播效果
    js
    import { Subject } from 'rxjs/Subject'
    const sub1 = new Subject()
    // sub1.subscribe(...) // 接受订阅
    // sub1.next(1) 发数据 -> 所以它调用 error complete 之后作为 Observable 的生命周期也就结束了
    const sub2 = new Subject()
    source$.subcribe(sub2)
    // source2$.subcribe(sub2) -> 它可以有多个上游来发数据
    // sub2.subscribe(...)
    // sub2.subscribe(...) // 使 source$ 多播效果
  • multicast (比较基础的操作符

js
// multicast 参数还可以是一个返回 Subject(中间人)的函数
// -> 当有人订阅但中间人已经退订时,产生一个新的中间人
const hotSource$ = coldSource$.multicast(new Subject())
// hotSource$.connect() 开始播
// 或者 hotSource$ = hotSource$.refCount() 根据下游 Observer 的个数来决定对上游的连接
  • publish 是通过 multicast 来实现的
js
// publish -> multicast(new Subject())
const hotSource$ = coldSource$.publish().refCount()
  • share
js
// share -> multicast(() => new Subject()).refCount()
const hotSource$ = coldSource$.share()
  • publishLastAsyncSubject
    • 当上游 Cold 完结的时候,才把最后一个数据传给 Observer
  • publishReplayReplaySubject TODO:
  • publishBehaviorBehaviorSubject TODO:

掌握时间的 Scheduler

RxJS 调试和测试