一、前言
1.1 Promise缺点
Promise的特点是无论有没有人关心它的执行结果,它都会立即开始执行,并且你没有机会取消这次执行。显然,在某些情况下这么做是浪费的甚至错误的- 以电商为例,如果某商户的订单不允许取消,你还会去买吗?
- 如果你发起了一个
Ajax请求,然后用户导航到了另一个路由,显然,你这个请求如果还没有完成就应该被取消,而不应该发出去 - 使用
Promise,你做不到,不是因为实现方面的原因,而是因为它在概念层(接口定义上)就无法支持取消 - 由于
Promise只会承载一个值,因此当我们要处理的是一个集合的时候就比较困难了。比如对于一个随机数列(总数未知),如果我们要借助Web API检查每个数字的有效性,然后对前一百个有效数字进行求和,那么用Promise写就比较麻烦了
1.2 Observable
- 它就是可观察对象,
Observable顾名思义就是可以被别人观察的对象,当它变化时,观察者就可以得到通知。换句话说,它负责生产数据,别人可以消费它生产的数据 Observable就像个传送带。这个传送带不断运行,围绕这个传送带建立了一条生产线,包括一系列工序,不同的工序承担单一而确定的职责。每个工位上有一个工人- 整个传送带的起点是原料箱,原料箱中的原料不断被放到传送带上。工人只需要待在自己的工位上,对面前的原料进行加工,然后放回传送带上或放到另一条传送带上即可,简单、高效、无意外
1.3 ReactiveX宝石图

- 中间的带箭头的线就像传送带,用来表示数据序列,这个数据序列被称为“流”。上方的流叫做输入流,下方的流叫做输出流。输入流可能有多个,但是输出流只会有一个(不过,流中的每个数据项也可以是别的流)
- 数据序列上的每个圆圈表示一个数据项,圆圈的位置表示数据出现的先后顺序,但是一般不会表示精确的时间比例,比如在一毫秒内接连出现的两个数据之间仍然会有较大的距离。只有少数涉及到时间的操作,其宝石图才会表现出精确的时间比例
- 圆圈的最后,通常会有一条竖线或者一个叉号。竖线表示这个流正常终止了,也就是说不会再有更多的数据提供出来了。而叉号表示这个流抛出错误导致异常中止了。还有一种流,既没有竖线也没有叉号,这种叫做无尽流,比如一个由所有自然数组成的流就不会主动终止。但是要注意,无尽流仍然是可以处理的,因为需要多少项是由消费者决定的。你可以把这个“智能”传送带理解为由下一个工位“叫号”的,没“叫号”下一项数据就不会过来
- 中间的大方框表示一个操作,也就是 operator —— 一个函数,比如这个图中的操作就是把输入流中的条目乘以十后放入输出流中。
- 看懂了宝石图,就能很形象的理解各种操作符了
二、Rxjs介绍
2.1 介绍
RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据 流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能RxJS是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释RxJS其目 标就是异步编程,Angular引入RxJS为了就是让异步可控、更简单。
目前常见的异步编程的几种方法:
- 回调函数
- 事件监听/发布订阅
PromiseRxjs
2.2 Promise和RxJS处理异步对比
// promise异步处理 |
// RxJS 处理异步: |
从上面列子可以看到
RxJS和Promise的基本用法非常类似,除了一些关键词不同。Promise里面用的是then()和resolve(),而RxJS里面用的是next()和subscribe()Rxjs相比Promise要强大很多。 比如Rxjs中可以中途撤回、Rxjs可以发射多个值、Rxjs提供了多种工具函数等等
2.3 Rxjs基本概念
Observable(可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。Observer(观察者): 一个回调函数的集合,它知道如何去监听由Observable提供的值。Subscription(订阅): 表示Observable的执行,主要用于取消Observable的执行。Operators(操作符): 采用函数式编程风格的纯函数 (pure function),使用像map、filter、concat、flatMap等这样的操作符来处理集合。Subject(主体): 相当于EventEmitter,并且是将值或事件多路推送给多个Observer的唯一方式。Schedulers(调度器):用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如setTimeout或requestAnimationFrame或其他
2.4 Rxjs使用场景
- 涉及复杂的时序操作:在游戏的某一关卡中,连续按下上上下下左右左右baba,每次点按间隔不超过0.4ms,就发送信息到服务器
A - 涉及复杂的条件处理:用户输入时,每输入一个字符,发送信息给服务器
A,如果服务器A返回的数据有问题,则请求服务器B,如果用户输入’fuck’则停止上述操作并请求服务器c - 涉及复杂的状态管理:早上的时候每隔
10秒检查一次用户信息,晚上的时候每隔5秒检查一次用户信息,检测到变更后响应式更新所有视图 - 真要用
Rx建议还是Angular环境下
三、基础入门
3.1 典型的写法
of(1,2,3).pipe( |
of称为创建器,用来创建流,它返回一个Observable类型的对象,filter和 map 称为操作符(operator),用来对条目进行处理。这些操作符被当作Observable对象的pipe方法的参数传进去Observable对象的subscribe方法表示消费者要订阅这个流,当流中出现数据时,传给subscribe方法的回调函数就会被调用,并且把这个数据传进去。这个回调函数可能被调用很多次,取决于这个流中有多少条数据- 注意,
Observable必须被subscribe之后才会开始生产数据。如果没人subscribe它,那就什么都不会做
3.2 简单创建器
- 广义上,创建器也是操作符的一种,不过这里我们把它单独拿出来讲。要启动生产线,我们得先提供原料。本质上,这个提供者就是一组函数,当流水线需要拿新的原料时,就会调用它
- 你当然可以自己实现这个提供者,但通常是不用的。
RxJS提供了很多预定义的创建器,而且将来可能还会增加新的。不过,那些眼花缭乱的创建器完全没必要全记住,只要记住少数几个就够了,其它的有时间慢慢看。
3.2.1 of – 单一值转为流

它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中
3.2.2 from – 数组转为流

它接受一个数组型参数,数组中可以有任意数据,然后把数组的每个元素逐个放入流中
3.2.3 range – 范围转为流

它接受两个数字型参数,一个起点,一个终点,然后按
1递增,把中间的每个数字(含边界值)放入流中
3.2.4 fromPromise – Promise 转为流
- 接受一个
Promise,当这个Promise有了输出时,就把这个输出放入流中。 - 要注意的是,当
Promise作为参数传给fromPromise时,这个Promise就开始执行了,你没有机会防止它被执行。 - 如果你需要这个
Promise被消费时才执行,那就要改用接下来要讲的defer创建器
3.2.5 defer – 惰性创建流

- 它的参数是一个用来生产流的工厂函数。也就是说,当消费方需要流(注意不是需要流中的值)的时候,就会调用这个函数,创建一个流,并从这个流中进行消费(取数据)
- 因此,当我们定义
defer的时候,实际上还不存在一个真正的流,只是给出了创建这个流的方法,所以叫惰性创建流
3.2.6 timer – 定时器流

- 它有两个数字型的参数,第一个是首次等待时间,第二个是重复间隔时间。从图上可以看出,它实际上是个无尽流 —— 没有终止线。因此它会按照预定的规则往流中不断重复发出数据。
- 要注意,虽然名字有相关性,但它不是
setTimeout的等价物,事实上它的行为更像是setInterval
3.2.7 interval – 定时器流

- 它和
timer唯一的差别是它只接受一个参数。事实上,它就是一个语法糖,相当于timer(1000, 1000),也就是说初始等待时间和间隔时间是一样的。 - 如果需求确实是
interval的语义,那么就优先使用这个语法糖,毕竟,从行为上它和setInterval几乎是一样的
3.3 Subject – 主体对象
它和创建器不同,创建器是供直接调用的函数,而
Subject则是一个实现了observable接口的类。也就是说,你要先把它new出来(假设实例叫subject),然后你就可以通过程序控制的方式往流里手动放数据了。它的典型用法是用来管理事件,比如当用户点击了某个按钮时,你希望发出一个事件,那么就可以调用subject.next(someValue)来把事件内容放进流中
- 当你希望手动控制往这个流中放数据的时机时,这种特性非常有用。
- 当然,
Subject其实并没有这么简单,用法也很多
3.4 合并创建器
我们不但可以直接创建流,还可以对多个现有的流进行不同形式的合并,创建一个新的流。常见的合并方式有三种:并联、串联、拉链
3.4.1 merge – 并联

- 从图上我们可以看到两个流中的内容被合并到了一个流中。只要任何一个流中出现了值就会立刻被输出,哪怕其中一个流是完全空的也不影响结果 —— 等同于原始流。
- 这种工作方式非常像电路中的并联行为,因此我称其为并联创建器。
- 并联在什么情况下起作用呢?举个例子吧:有一个列表需要每隔
5秒钟定时刷新一次,但是一旦用户按了搜索按钮,就必须立即刷新,而不能等待5秒间隔。这时候就可以用一个定时器流和一个自定义的用户操作流(subject)merge在一起。这样,无论哪个流中出现了数据,都会进行刷新
3.4.2 concat – 串联

- 从图中我们可以看到两个流中的内容被按照顺序放进了输出流中。前面的流尚未结束时(注意竖线),后面的流就会一直等待
- 这种工作方式非常像电路中的串联行为,因此我称其为串联创建器。
- 串联的适用场景就很容易想象了,比如我们需要先通过
Web API进行登录,然后取学生名册。这两个操作就是异步且串联工作的
3.4.3 zip – 拉链

zip的直译就是拉链,事实上,有些压缩软件的图标就是一个带拉链的钥匙包。拉链的特点是两边各有一个“齿”,两者会啮合在一起。这里的zip操作也是如此。
从图上我们可以看到,两个输入流中分别出现了一些数据,当仅仅输入流
A中出现了数据时,输出流中什么都没有,因为它还在等另一个“齿”。当输出流B中出现了数据时,两个“齿”都凑齐了,于是对这两个齿执行中间定义的运算(取A的形状,B的颜色,并合成为输出数据)
- 可以看到,当任何一个流先行结束之后,整个输出流也就结束了。
- 拉链创建器适用的场景要少一些,通常用于合并两个数据有对应关系的数据源。比如一个流中是姓名,另一个流中是成绩,还有一个流中是年龄,如果这三个流中的每个条目都有精确的对应关系,那么就可以通过
zip把它们合并成一个由表示学生成绩的对象组成的流。
3.5 操作符
RxJS有很多操作符,事实上比创建器还要多一些,但是我们并不需要一一讲解,因为它们中的很大一部分都是函数式编程中的标配,比如map、reduce、filter等
3.5.1 retry – 失败时重试

- 有些错误是可以通过重试进行恢复的,比如临时性的网络丢包。甚至一些流程的设计还会故意借助重试机制,比如当你发起请求时,如果后端发现你没有登录过,就会给你一个
401错误,然后你可以完成登录并重新开始整个流程。 retry操作符就是负责在失败时自动发起重试的,它可以接受一个参数,用来指定最大重试次数。
这里我为什么一直在强调失败时重试呢?因为还有一个操作符负责成功时重试
3.5.2 repeat – 成功时重试

- 除了重复的条件之外,
repeat的行为几乎和retry一模一样。 repeat很少会单独用,一般会组合上delay操作,以提供暂停时间,否则就容易 DoS 了服务器
3.5.3 delay – 延迟

- 这才是真正的
setTimeout的等价操作。它接受一个毫秒数(图中是20毫秒),每当它从输入流中读取一个数据之后,会先等待20毫秒,然后再放到输出流中。 - 可以看到,输入流和输出流内容是完全一样的,只是时机上,输出流中的每个条目都恰好比输入流晚
20毫秒出现
3.5.4 toArray – 收集为数组

- 事实上,你几乎可以把它看做是
from的逆运算。from把数组打散了逐个放进流中,而toArray恰好相反,把流中的内容收集到一个数组中 —— 直到这个流结束。 - 这个操作符几乎总是放在最后一步,因为
RxJS的各种operator本身就可以对流中的数据进行很多类似数组的操作,比如查找最小值、最大值、过滤等。所以通常会先使用各种 operator 对数据流进行处理,等到要脱离RxJS的体系时,再转换成数组传出去
3.5.5 debounceTime – 防抖

- 在
underscore/lodash中这是常用函数。 所谓防抖其实就是“等它平静下来”。比如预输入(type ahead)功能,当用户正在快速打字的时候,你没必要立刻去查服务器,否则可能直接让服务器挂了,而应该等用户稍作停顿(平静下来)时再发起查询。 debounceTime就是这样,你传入一个最小平静时间,在这个时间窗口内连续过来的数据一概被忽略,一旦平静时间超过它,就会往把接收到的下一条数据放到流中。这样消费者就只能看到平静时间超时之后发来的最后一条数据
3.5.6 switchMap – 切换成另一个流

- 有时候,我们会希望根据一个立即数发起一个远程查询,并且把这个异步取回的结果放进流中。比如,流中是一些学生的
id,每过来一个id,你要发起一个Ajax请求来根据这个id获取这个学生的详情,并且把详情放进输出流中。 - 注意,这是一个异步操作,所以你没法用普通的
map来实现,否则映射出来的结果就会是一个个Observable对象。 switchMap就是用来解决这个问题的。它在回调函数中接受从输入流中传来的数据,并转换成一个新的Observable对象(新的流,每个流中包括三个值,每个值都等于输入值的十倍),switchMap会订阅这个Observable对象,并把它的值放入输出流中。注意图中竖线的位置 —— 只有当所有新的流都结束时,输出流才会结束
四、进一步使用
4.1 Rxjs unsubscribe 取消订阅
Promise的创建之后,动作是无法撤回的。Observable不一样,动作可以通过unsbscribe()方法中途撤回,而且Observable在内部做了智能的处理.
Promise 创建之后动作无法撤回
let promise = new Promise(resolve = >{ |
Rxjs 可以通过 unsubscribe() 可以撤回 subscribe 的动作
let stream = new Observable(observer = >{ |
4.2 Rxjs 订阅后多次执行
如果我们想让异步里面的方法多次执行,比如下面代码。
这一点Promise是做不到的,对于Promise来说,最终结果要么resolve(兑现)、要么reject(拒绝),而且都只能触发一次。如果在同一个Promise对象上多次调用resolve方法, 则会抛异常。而Observable不一样,它可以不断地触发下一个值,就像next()这个方法的 名字所暗示的那样。
// promise |
// Rxjs |
五、RXJS 实例操作符小结
常用的实例方法
pip:功能类似于let操作符map:转换输出的数据pluck:提取属性值并输出do:不做数据格式化,可用于调试filter:用于过滤一些Observabletake:表示取几条数据takeWhile:满足什么条件时开始取数据skip:表示跳过多少条数据后开始取distinctUntilChanged:比较输入的Observable计算得出的值当前与后最后的值是否相等使用===判断scan:功能有点类似于reduce这个方法,可用于累加数据同时可以使用startWith的数据用途scan的初始值,最后返回累加的数据delay:表示Observable延时多久开始处理订阅数据toArray:把输出值格式化成数据形式toMap:给当前的输出取个名字或标签expand:实现递归forkJoin:类似于Promise.all,只有数据全部返回且状态为complete时,表示成功处理了请求,否则失败let:这个操作符可以获取完整的输入Observable对象,做相应的处理后返回新的Observable对象catch:用于Observable处理数据异常的处理combineLatest:用于组且各个输入的Observable,并获取和返回各个Observable最新的数据merge:用于把两个Observable合成一个处理