Appearance
《深入浅出 RxJS》笔记
RxJS 兼具函数式和反应式编程特点,擅长处理异步操作,因为它对数据采用 “推” 的处理方式,当一个数据产生的时候,会被推送给对应的处理函数,这个函数不用关心数据是同步产生还是异步产生的。
函数反应式编程 FRP
- 问题
- 如何控制大量代码的复杂度
- 如何保持代码的可读性/可维护性
- 如何处理异步操作
- 函数式编程特征
- 声明式(对应命令式)
- 不直接操作数据,以传递函数类型的参数来封装功能(只需表达想要做什么,而不是怎么去做)
- 纯函数
- 函数的执行结果完全由输入参数决定,不会受除参数之外的任何数据影响
- 函数内不会修改任何外部状态(如引用类型的参数属性)
- 数据不可变性
- 不要去修改现有数据,而是产生一个新的数据来体现 “程序的功能”
- 声明式(对应命令式)
- 反应式编程
- Rx 是一套通过监听流(程序的输入可以看成是一个数据流)来做异步编程的 API
- Rx 诞生的主要目的是解决异步问题,而实际上使用 RxJS 通常不需要关心自己是被同步执行还是异步执行
- 数据流抽象了很多现实问题
- 擅长处理异步操作
- 把复杂问题分解成简单的问题组合
- 实现某个小功能的函数就是操作符
- RxJS 就是学习如何组合操作符来解决复杂问题
RxJS 基础知识
tree-shaking可以处理import方式的导入,最好是用一个文件专门导入 RxJS 相关功能,然后再在入口导入这个文件
Observable是可被观察者(数据流),Observer是观察者代表 “流” 的变量标识符都以 $ 符号结尾(芬兰式命名法)
- 数据流相对于数组有个好处,就是如果有个永无止境的数据流,内存不会无限增加
连接两者的桥梁就是
Observable对象的函数subscribe,而Observer作为这个函数的参数- 如何产生事件 ->
Observable-> 发布者 - 如何响应事件 ->
Observer-> 观察者 - 什么样的发布者关联什么样的观察者 -> 何时调用
subscribe
jsimport { Observable } from 'rxjs/Observable const onSubscribe = observer => { observer.next(1) // 把数据 “推给” observer observer.next(2) observer.next(3) // observer.error('err') // 通知出错 -> 弹珠图表示符号 X // observer.complete() // 通知完结 -> 弹珠图表示符号 | // 以上终结状态 error complete 只要其中一个被调用,后面的都不会执行了 return { // 退订时 unsubscribe: () => { // 清除定时器之类的。。。 } } } const source$ = new Observable(onSubscribe) // 这里并不是直接传给 onSubscribe,而是被包装过的 const observer = { next: item => {}, // 只有 next 是必须 error: err => {}, complete: () => {} } // subscribe 参数是观察者,可以用对象形式 // 也可以用多参形式:(nextFn, errorFn, completeFn) // 只有 subscribe 被调用时,onSubscribe 才执行,observer 才响应 const subscription = source$.subscribe(observer) // 观察者退订 - 终端 observer 与 source$ 的连接 // -> observer 不会再响应了,但 source$ 还有可能继续发射 subscription.unsubscribe()- 如何产生事件 ->
Hot Cold Observable
- 有多个观察者在不同时间订阅的情况下,是否响应之前时间产生的数据
- Cold -> “点播” 每次
subscribe都产生一个心的生产者
jsconst clod = new Observable((observer) => { const producer = new Producer() // 产生新的生产者 - 新启动 - 冷 -_- // ... })- Hot -> “直播”
jsconst producer = new Producer() // 生产者 - 已启动 - 热 const clod = new Observable((observer) => { // ... })操作符:对数据流进行创建/处理/转化(静态方法、实例方法)的函数
操作符基础
返回一个 Observable 对象、对上游和下游的订阅及退订处理、处理异常境况、及时释放资源
(按输入)返回一个 Observable 对象的函数
- 根据其他 Observable 对象产生 -> 实例
import 'rxjs/add/operators/map' - 利用其他类型的输入产生 -> 静态
import 'rxjs/add/observable/of' - 不需要输入可以凭空产生 -> 静态
Observable.create(...) - 静态操作符只能出现在链式调用的首位,有些操作符既有静态类也有实例类
- 根据其他 Observable 对象产生 -> 实例
pipeable 操作符 通过 pipe 串接,有利于 tree shaking
jsimport {of} from 'rxjs/add/observable; import {map, filter} from 'rxjs/add/operators; const source$ = of(1, 2, 3) const res$ = source$.pipe( filter(x => x % 2 === 0), map(x => x * 2) )
创建数据流
不依赖其他 Observable 对象产生 Observable
| 功能需求 | 适用的操作符 |
|---|---|
| 直接操作观察者 | create(observer => {...}) |
| 根据有限的数据产生同步数据流 | of(1, 2, 3) |
| 产生一个数值范围内的数据 | range(1, 100) |
| 以循环方是产生数据 | generate |
| 重复产生数据流中的数据 | repeat 和 repeatWhen |
| 产生空数据流 | empty -> 直接产生一个完结(complete)的 Observable 对象 |
| 产生直接出错的数据流 | throw -> 直接 error |
| 产生永不完结的数据流 | never -> 永不 complete 或 error |
| 间隔给定时间持续产生 | interval 和 time |
| 从数组等枚举类型数据产生数据流 | from(数组、字符串、generator、arguments) |
| 从 Promise 对象产生数据流 | fromPromise |
| 从外部事件对象产生数据流 | fromEvent 和 fromEventPattern |
| 从 ajax 请求结果产生数据流 | ajax |
| 延迟产生数据流 | defer() 参数是 |
创建同步数据流
generate -> 类似 for 循环
jsconst res = [] for (let i = 0; i < 10; i++) { res.push(i * i) } // ---- 对照 ---- const res$ = Observable.generate( 0, // let i = 0 (i) => i < 10, (i) => i++, (i) => i * i )repeat(2)-> 上游complete之后再重复 -> 被订阅 2 次,也被退订 2 次
创建异步数据流
interval(1000) -> setInterval -> . 0 . 1 . 2 ...
time(1000) -> setTimeout -> . 0
- time 第二个参数作用同 interval 所以 time 是 interval 超集
fromEvent
jsconst evt$ = fromEvent(dom, 'click/mousemove') // 也可以是自定义 event 和 actionfromEventPattern(addHandler, removeHandler)
addHandler,removeHandler函数表示添加、移除事件的动作(无需传入事件对象)- 当 Observable 对象调用 subscribe 时
addHandler被调用 - Observable 调用 subscribe 的返回值再调用 unsubscribe 时
removeHandler被调用
repeatWhen 相对于 repeat 来说还可以控制再次订阅的时间
jsconst notifier = (notification$) => { return notification$.defer(1000) // 上游完结后 1s 之后再重新订阅 // return 一个 Observable 通知重新订阅 } const repeated$ = source$.repeateWhen(notifier)defer 延迟产生 Observable
jsconst observableFa = () => Observable.ajax(ajaxUrl) const source$ = Observable.defer(observableFa) // 只有在 source$ 被订阅时 observableFa 才会被调用
合并数据流
把多个 Observable 对象合并到 一个 Observable 中
| 功能需求 | 适用的操作符 |
|---|---|
| 把多个数据流以首尾相连的方式合并 | concat 和 concatAll |
| 把多个数据流以先到先得的方式合并 | merge 和 mergeAll |
| 把多个数据流以一对一的方式合并 | zip 和 zipAll |
| 持续合并多个数据流中最新产生的数据 | combineLatest combineAll withLatestFrom |
| 从多个数据流中选取第一个产生内容的数据流 | race |
| 在数据流前面添加一个指定数据 | startWith |
| 只获取多个数据流最后产生的那个数据 | forkJoin |
| 从高阶数据流中切换数据源 | switch exhaust |
concat从 source1$ 中获取数据并传给下游,当 source1$ complete 之后,就会调用 source1$.unsubscribe 然后调用 source2$.subscribe 继续从 source2$ 获取数据并传给下游merge会第一时间订阅所有上游 Observable 任何一个上游 Observable 有数据了就立刻传给下游,所有上游都 complete 时,下游 Observable 才会 complete避免使用 merge 去合并同步数据流,因为上游同步数据是一下子全传给下游的,和 concat 差不多了
zip把上游的数据转化为数组形式传给下游,每个上游 Observable 产生的数据,都会在对应的数组中有一席之地。只要任意一个上游 complete 且这个 Observable 最后产生的数据也配对之后就把这最后一个数组传给下游同时下游 Observable 也 completecombineLatest合并上游产生的最新的数据,从所有上游都有数据产生开始,只要任意一个上游的发出新的数据,都会合并一次传给下游。所有上游都完结之后(不会再有新发出数据)时,下游也就完结了- combineLatest 的第二个参数是函数,用于"定义合并"这个操作,可以不必传数组给下游
withLatestFrom与 combineLatest 不同的是它给下游推送数据只能由一个上游 Observable 驱动(主导产生数据的节奏)只有这个上游更新才会产生一次合并推送这个没有静态操作符,因为它需要一个做主导
race多个上游 Observable 一起,看谁最先产生数据,就完全采用这个的数据推送给下游,其他的会被退订forkJoin所有上游完结时,合并最后的数据产生唯一一个数据推送给下游- 高阶 Observable
- 产生的数据依然是 Observable 的 Observable 对象 -> 高阶 Observable 完结了并不代表内部 Observable 对象 完结
- concatAll mergeAll zipAll combineAll
- All 代表全部,把一个高阶 Observable 的所有内部 Observable 都组合起来
- 没有任何参数,所有的输入都来自上游的 Observable 对象
switch总是切换到最新的内部 Observable 对象上获取数据- 每当上游高阶 Observable 对象内部产生新的 Observable 就立即订阅新的,退订掉旧的
- 完结条件:上游高阶 Observable 完结 + 当前内部 Observable 完结
exhaust只有耗尽当前内部 Observable 后,才有空订阅下一个新产生的内部 Observable
辅助类操作符
| 功能需求 | 适用的操作符 |
|---|---|
| 统计数据流中产生的所有数据个数 | count |
| 获取数据流中最大或最小的数据 | max min |
| 对数据流中所有数据做规约操作 | Reduce |
| 判断是否所有数据满足某个判定条件 | every |
| 找到第一个满足判定条件的数据 | find findIndex |
| 判断一个数据流是否不包含任何数据 | isEmpty |
| 如果一个数据流为空就默认产生一个指定的数据 | defaultIfEmpty |
过滤数据流
| 功能需求 | 适用操作符 |
|---|---|
| 过滤掉不满足判定条件的操作符 | filter |
| 获得满足判定条件的第一个数据 | first |
| 获得满足判定条件的最后一个数据 | last |
| 从数据流中选取最先出现的若干个数据 | take |
| 从数据流中选取最后出现的若干个数据 | takeLast |
| 从数据流中选取数据直到某种情况发生 | takeWhile takeUntil |
| 从数据流中忽略最先出现的若干个数据 | skip |
| 从数据流中忽略数据直到某种情况发生 | skipWhile 和 skipUntil |
| 基于数据内容的数据流筛选 | throttle debounce audit |
| 基于时间的数据流筛选 | throttleTime debounceTime auditTime |
| 基于采样方式的数据流筛选 | sample sampleTime |
| 删除重复的数据 | distnct |
| 删除重复的连续数据 | distnctUntilChanged distnctUntilKeyChanged |
| 忽略数据流中的所有数据(只关心 error 和 complete) | ignoreElements |
| 只选取指定出现位置的数据 | elementAt |
| 判断是否只有一个数据满足条件 | single |
first与 find findIndex 的区别jsimport 'rxjs/add/observable/of' import 'rxjs/add/operator/first' const source$ = Observable.of(3, 4, 2, 3, 4, 7) source$.first( (x) => x % 2 === 0, (value, index) => [value, index], // 可选, 定义输出结果(数组) -1 // 可选 ) // 第二个参数可选,用于定制最终吐出的数据 // 如果没有满足条件的,将会抛出 error。 // 但是可以传递三个参数,表示没有结果时输出的值 -1takeLast与 take 的差别:takeLast 只有在上游数据完结时,才能产生数据(才知道哪 n 个最后 n 个),而且时一次性传给下游(不会有时间间隔)takeUntil特点:上游的数据直接转手给下游,直到参数 notifier(是一个 Observable)吐出一个数据或者完结(-_- 它只要一有动静,takeUntil 就完结。。。takeWhileskipWhile的参数时一个返回 boollen 的判定函数throttleTime限制在 duration 时间范围内,从上游传递给下游的数据的个数(1 个)debounceTime让传递给下游的数据间隔不能小于给定的时间 duetimethrottledebounce与上面的操作符的区别是,不是用时间来控制流量,而是用 Observable 中的数据来控制流量- 参数是一个返回 Observable 的函数,可以理解成当 Observable 为固定时间吐数据时,与 throttleTime debounceTime 效果一样
- 大部分场景 throttleTime debounceTime 足够,只有当固定时间的回压控制不满足要求时,才用 throttle debounce
auditTime 和 audit
- auditTime 与 throttle 类似,区别是:auditTime 取时间段内的最后一个数据
sampleTime 与 sample 不管上游产生的数据节奏怎样,完全根据自己参数指定的间隔节奏来给下游传递数据
转化数据流
| 功能需求 | 适用的操作符 |
|---|---|
| 将每个元素用映射函数产生新的数据 | map |
| 将数据流中每个元素映射为同一个元素 | mapTo |
| 提取数据流中每个数据的某个字段 | pluck |
| 产生高阶 Observable 对象 | windowTime windowCount windowWhen windowToggle window |
| 产生数组构成的数据流 | bufferTime bufferCount bufferWhen bufferToggle buffer |
| 映射产生高阶 Observable 对象然后合并 | concatMap mergeMap switchMap exhaustMap |
| 产生规约运算的结果组成的数据流 | scan mergeScan |
- 无损回压控制:把上游在一段时间内产生的数据放到一个数据集合(数组、Observable)里,然后把这个数据集合一次丢给下游。支持数组的以
buffer开头,支持Observable的以 window 开头- windowTime 和 bufferTime 每间隔固定时间段来收集
- windowCount 和 bufferCount 每收集固定个数
- windowWhen 和 bufferWhen 参数是一个返回 Observable 的函数,Observable 的一次产生数据~完成被认为是一个“缓冲区块”
- windowToggle 和 bufferToggle 接收 2 个参数,第一个参数 opening$ 产生一个数据代表缓冲窗口(区块)的开始,同时第二个参数
closingSelector被调用,用来获取缓冲窗口的结束closingSelector 是指一个返回 Observable 的函数(它的参数是 opening$ 产生的数据)
- window 和 buffer 的参数只是一个 notifer$ (吐出数据的节奏)分隔上游数据序列
- 高阶的 map
- 高阶 map 的参数(project 函数)把一个数据映射为一个 Observable 对象,而且把每个内部 Observable 的数据做组合(砸平)传给下游
project 函数 的参数(map 传下来的数据,序号)
- concatMap = map + concatAll
- mergeMap = map + mergeAll
- switchMap = map + switch
- exhoustMap = map + exhoust 先产生的 Observable 优先级更高(喜欢旧的数据)
- 高阶 map 的参数(project 函数)把一个数据映射为一个 Observable 对象,而且把每个内部 Observable 的数据做组合(砸平)传给下游
- 数据分组 - 把一个数据流拆分成多个数据流
groupBy传递给下游的是一个高阶Observablepartition返回一个数组(包含 2 个Observable)-> 唯一一个不返回 Observable 的操作符
- 累计数据 规约操作
scan对上游每一个数据都会产生一个规约结果传递给下游,无需上游数据完结(与 reduce 的区别)mergeScan区别是:规约函数返回的是一个 Observable 而不是一个数据(使用频率低)
异常错误处理
对错误异常的处理可以分为 2 类:恢复(发生错误时依然让运算继续下去)、重试(重新尝试之前发生错误的操作)
| 功能需求 | 适用的操作符 |
|---|---|
| 捕获并处理上游产生的异常错误 | catch |
| 当上游产生错误时进行重试 | retry retryWhen |
| 无论是否出错都要进行一些操作(上游完结 or 出错时发挥一次作用) | finally |
catch“恢复”
js
// err: 错误对象
// caught$ 代表上游 Observable 对象(直接返回 caught$ 可以模拟重试的效果)
// 返回的 observable 吐出的数据会替代发生错误的那一个数据
const catch$ = error$.catch((err, caught$) => Observable.of(8))retry"重试"(本质是重新订阅一次上游的 Observable,在订阅的同时取消上一次订阅,对于 Hot 数据流其实并不是真的 “重试”,只不过是重新订阅而已)retryWhen参数是一个返回 OBservable 的函数(notice$ 节奏控制器)
多播
让一个数据流的内容被多个 Observer 订阅
SubjectBehaviorSubjectReplaySubjectAsyncSubject类型
| 功能需求 | 适用的操作符 |
|---|---|
| 灵活选取 Subject 对象进行多播 | multicast share publish |
| 只多播数据流中最后一个数据 | publishLast |
| 对数据流中给定数量的数据进行多播 | publishReplay |
| 拥有默认数据的多播 | publishBehavior |
Hot Clod 数据流
- Hot 操作符的数据源来自外部,实现的是 多播
- Clod 实现的是单播(每次订阅都是重新发数据)
Subject包装 Clod 产生一个新的 Hot- Subject 实例命名通常不以 $ 结尾
- Subject 兼具 Observable 和 Observer 的性质 -> 实现多播效果
jsimport { Subject } from 'rxjs/Subject' const sub1 = new Subject() // sub1.subscribe(...) // 接受订阅 // sub1.next(1) 发数据 -> 所以它调用 error complete 之后作为 Observable 的生命周期也就结束了 const sub2 = new Subject() source$.subcribe(sub2) // source2$.subcribe(sub2) -> 它可以有多个上游来发数据 // sub2.subscribe(...) // sub2.subscribe(...) // 使 source$ 多播效果multicast(比较基础的操作符
js
// multicast 参数还可以是一个返回 Subject(中间人)的函数
// -> 当有人订阅但中间人已经退订时,产生一个新的中间人
const hotSource$ = coldSource$.multicast(new Subject())
// hotSource$.connect() 开始播
// 或者 hotSource$ = hotSource$.refCount() 根据下游 Observer 的个数来决定对上游的连接publish是通过multicast来实现的
js
// publish -> multicast(new Subject())
const hotSource$ = coldSource$.publish().refCount()share
js
// share -> multicast(() => new Subject()).refCount()
const hotSource$ = coldSource$.share()publishLast和AsyncSubject- 当上游 Cold 完结的时候,才把最后一个数据传给 Observer
publishReplay和ReplaySubjectTODO:publishBehavior和BehaviorSubjectTODO: