导航
导航
文章目录󰁋
  1. 一、前言
    1. 1.1 Promise缺点
    2. 1.2 Observable
    3. 1.3 ReactiveX宝石图
  2. 二、Rxjs介绍
    1. 2.1 介绍
    2. 2.2 Promise和RxJS处理异步对比
    3. 2.3 Rxjs基本概念
    4. 2.4 Rxjs使用场景
  3. 三、基础入门
    1. 3.1 典型的写法
    2. 3.2 简单创建器
      1. 3.2.1 of – 单一值转为流
      2. 3.2.2 from – 数组转为流
      3. 3.2.3 range – 范围转为流
      4. 3.2.4 fromPromise – Promise 转为流
      5. 3.2.5 defer – 惰性创建流
      6. 3.2.6 timer – 定时器流
      7. 3.2.7 interval – 定时器流
    3. 3.3 Subject – 主体对象
    4. 3.4 合并创建器
      1. 3.4.1 merge – 并联
      2. 3.4.2 concat – 串联
      3. 3.4.3 zip – 拉链
    5. 3.5 操作符
      1. 3.5.1 retry – 失败时重试
      2. 3.5.2 repeat – 成功时重试
      3. 3.5.3 delay – 延迟
      4. 3.5.4 toArray – 收集为数组
      5. 3.5.5 debounceTime – 防抖
      6. 3.5.6 switchMap – 切换成另一个流
  4. 四、进一步使用
    1. 4.1 Rxjs unsubscribe 取消订阅
    2. 4.2 Rxjs 订阅后多次执行
    3. 五、RXJS 实例操作符小结
  5. 六、更多参考

关注作者公众号

和万千小伙伴一起学习

异步编程入门之RxJs(一)

一、前言

1.1 Promise缺点

  • Promise 的特点是无论有没有人关心它的执行结果,它都会立即开始执行,并且你没有机会取消这次执行。显然,在某些情况下这么做是浪费的甚至错误的
  • 以电商为例,如果某商户的订单不允许取消,你还会去买吗?
  • 如果你发起了一个 Ajax 请求,然后用户导航到了另一个路由,显然,你这个请求如果还没有完成就应该被取消,而不应该发出去
  • 使用 Promise,你做不到,不是因为实现方面的原因,而是因为它在概念层(接口定义上)就无法支持取消
  • 由于 Promise 只会承载一个值,因此当我们要处理的是一个集合的时候就比较困难了。比如对于一个随机数列(总数未知),如果我们要借助 Web API 检查每个数字的有效性,然后对前一百个有效数字进行求和,那么用 Promise 写就比较麻烦了

1.2 Observable

  • 它就是可观察对象,Observable 顾名思义就是可以被别人观察的对象,当它变化时,观察者就可以得到通知。换句话说,它负责生产数据,别人可以消费它生产的数据
  • Observable 就像个传送带。这个传送带不断运行,围绕这个传送带建立了一条生产线,包括一系列工序,不同的工序承担单一而确定的职责。每个工位上有一个工人
  • 整个传送带的起点是原料箱,原料箱中的原料不断被放到传送带上。工人只需要待在自己的工位上,对面前的原料进行加工,然后放回传送带上或放到另一条传送带上即可,简单、高效、无意外

1.3 ReactiveX宝石图

image-20210208204624236

  • 中间的带箭头的线就像传送带,用来表示数据序列,这个数据序列被称为“流”。上方的流叫做输入流,下方的流叫做输出流。输入流可能有多个,但是输出流只会有一个(不过,流中的每个数据项也可以是别的流)
  • 数据序列上的每个圆圈表示一个数据项,圆圈的位置表示数据出现的先后顺序,但是一般不会表示精确的时间比例,比如在一毫秒内接连出现的两个数据之间仍然会有较大的距离。只有少数涉及到时间的操作,其宝石图才会表现出精确的时间比例
  • 圆圈的最后,通常会有一条竖线或者一个叉号。竖线表示这个流正常终止了,也就是说不会再有更多的数据提供出来了。而叉号表示这个流抛出错误导致异常中止了。还有一种流,既没有竖线也没有叉号,这种叫做无尽流,比如一个由所有自然数组成的流就不会主动终止。但是要注意,无尽流仍然是可以处理的,因为需要多少项是由消费者决定的。你可以把这个“智能”传送带理解为由下一个工位“叫号”的,没“叫号”下一项数据就不会过来
  • 中间的大方框表示一个操作,也就是 operator —— 一个函数,比如这个图中的操作就是把输入流中的条目乘以十后放入输出流中。
  • 看懂了宝石图,就能很形象的理解各种操作符了

二、Rxjs介绍

2.1 介绍

  • RxJSReactiveX 编程理念的 JavaScript 版本。ReactiveX 来自微软,它是一种针对异步数据 流的编程。简单来说,它将一切数据,包括 HTTP 请求,DOM 事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能
  • RxJS 是一种针对异步数据流编程工具,或者叫响应式扩展编程;可不管如何解释 RxJS 其目 标就是异步编程,Angular 引入 RxJS 为了就是让异步可控、更简单。

目前常见的异步编程的几种方法:

  • 回调函数
  • 事件监听/发布订阅
  • Promise
  • Rxjs

2.2 Promise和RxJS处理异步对比

// promise异步处理
// Promise 处理异步
getPromiseData() {
return new Promise(resolve = >{
setTimeout(() = >{
resolve('---promise timeout---');
},
2000);
});

// 使用

getPromiseData().then(d=>console.log(d))
// RxJS 处理异步:
getRxjsData() {
return new Observable(observer = >{
setTimeout(() = >{
observer.next('observable timeout');
},
2000);
});
}
// 使用
getRxjsData().subscribe(d=>console.log(d))

从上面列子可以看到 RxJSPromise的基本用法非常类似,除了一些关键词不同。Promise 里面用的是 then()resolve(),而 RxJS里面用的是 next()subscribe()
Rxjs相比Promise要强大很多。 比如 Rxjs 中可以中途撤回、Rxjs 可以发射多个值、Rxjs 提供了多种工具函数等等

2.3 Rxjs基本概念

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
    Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器):用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他

2.4 Rxjs使用场景

  • 涉及复杂的时序操作:在游戏的某一关卡中,连续按下上上下下左右左右baba,每次点按间隔不超过0.4ms,就发送信息到服务器A
  • 涉及复杂的条件处理:用户输入时,每输入一个字符,发送信息给服务器A,如果服务器A返回的数据有问题,则请求服务器B,如果用户输入’fuck’则停止上述操作并请求服务器c
  • 涉及复杂的状态管理:早上的时候每隔10秒检查一次用户信息,晚上的时候每隔5秒检查一次用户信息,检测到变更后响应式更新所有视图
  • 真要用Rx建议还是Angular环境下

三、基础入门

3.1 典型的写法

of(1,2,3).pipe(
filter(item=>item % 2 === 1),
map(item=>item * 3),
).subscribe(item=> console.log(item))
  • of 称为创建器,用来创建流,它返回一个 Observable 类型的对象,filter 和 map 称为操作符(operator),用来对条目进行处理。这些操作符被当作 Observable 对象的 pipe 方法的参数传进去
  • Observable 对象的 subscribe 方法表示消费者要订阅这个流,当流中出现数据时,传给 subscribe 方法的回调函数就会被调用,并且把这个数据传进去。这个回调函数可能被调用很多次,取决于这个流中有多少条数据
  • 注意,Observable 必须被 subscribe 之后才会开始生产数据。如果没人 subscribe 它,那就什么都不会做

3.2 简单创建器

  • 广义上,创建器也是操作符的一种,不过这里我们把它单独拿出来讲。要启动生产线,我们得先提供原料。本质上,这个提供者就是一组函数,当流水线需要拿新的原料时,就会调用它
  • 你当然可以自己实现这个提供者,但通常是不用的。RxJS 提供了很多预定义的创建器,而且将来可能还会增加新的。不过,那些眼花缭乱的创建器完全没必要全记住,只要记住少数几个就够了,其它的有时间慢慢看。

3.2.1 of – 单一值转为流

image-20210208204644634

它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中

3.2.2 from – 数组转为流

image-20210208204656811

它接受一个数组型参数,数组中可以有任意数据,然后把数组的每个元素逐个放入流中

3.2.3 range – 范围转为流

image-20210208204705194

它接受两个数字型参数,一个起点,一个终点,然后按 1 递增,把中间的每个数字(含边界值)放入流中

3.2.4 fromPromise – Promise 转为流

  • 接受一个 Promise,当这个 Promise 有了输出时,就把这个输出放入流中。
  • 要注意的是,当 Promise 作为参数传给 fromPromise 时,这个 Promise 就开始执行了,你没有机会防止它被执行。
  • 如果你需要这个 Promise 被消费时才执行,那就要改用接下来要讲的 defer 创建器

3.2.5 defer – 惰性创建流

image-20210208204715704

  • 它的参数是一个用来生产流的工厂函数。也就是说,当消费方需要流(注意不是需要流中的值)的时候,就会调用这个函数,创建一个流,并从这个流中进行消费(取数据)
  • 因此,当我们定义 defer的时候,实际上还不存在一个真正的流,只是给出了创建这个流的方法,所以叫惰性创建流

3.2.6 timer – 定时器流

image-20210208204727266

  • 它有两个数字型的参数,第一个是首次等待时间,第二个是重复间隔时间。从图上可以看出,它实际上是个无尽流 —— 没有终止线。因此它会按照预定的规则往流中不断重复发出数据。
  • 要注意,虽然名字有相关性,但它不是 setTimeout的等价物,事实上它的行为更像是 setInterval

3.2.7 interval – 定时器流

image-20210208204736607

  • 它和 timer 唯一的差别是它只接受一个参数。事实上,它就是一个语法糖,相当于 timer(1000, 1000),也就是说初始等待时间和间隔时间是一样的。
  • 如果需求确实是 interval 的语义,那么就优先使用这个语法糖,毕竟,从行为上它和 setInterval 几乎是一样的

3.3 Subject – 主体对象

它和创建器不同,创建器是供直接调用的函数,而 Subject 则是一个实现了 observable 接口的类。也就是说,你要先把它 new 出来(假设实例叫 subject),然后你就可以通过程序控制的方式往流里手动放数据了。它的典型用法是用来管理事件,比如当用户点击了某个按钮时,你希望发出一个事件,那么就可以调用 subject.next(someValue) 来把事件内容放进流中

  • 当你希望手动控制往这个流中放数据的时机时,这种特性非常有用。
  • 当然,Subject 其实并没有这么简单,用法也很多

3.4 合并创建器

我们不但可以直接创建流,还可以对多个现有的流进行不同形式的合并,创建一个新的流。常见的合并方式有三种:并联、串联、拉链

3.4.1 merge – 并联

img

  • 从图上我们可以看到两个流中的内容被合并到了一个流中。只要任何一个流中出现了值就会立刻被输出,哪怕其中一个流是完全空的也不影响结果 —— 等同于原始流。
  • 这种工作方式非常像电路中的并联行为,因此我称其为并联创建器。
  • 并联在什么情况下起作用呢?举个例子吧:有一个列表需要每隔 5 秒钟定时刷新一次,但是一旦用户按了搜索按钮,就必须立即刷新,而不能等待 5 秒间隔。这时候就可以用一个定时器流和一个自定义的用户操作流(subjectmerge 在一起。这样,无论哪个流中出现了数据,都会进行刷新

3.4.2 concat – 串联

img

  • 从图中我们可以看到两个流中的内容被按照顺序放进了输出流中。前面的流尚未结束时(注意竖线),后面的流就会一直等待
  • 这种工作方式非常像电路中的串联行为,因此我称其为串联创建器。
  • 串联的适用场景就很容易想象了,比如我们需要先通过 Web API 进行登录,然后取学生名册。这两个操作就是异步且串联工作的

3.4.3 zip – 拉链

image-20210208204828426

  • zip 的直译就是拉链,事实上,有些压缩软件的图标就是一个带拉链的钥匙包。拉链的特点是两边各有一个“齿”,两者会啮合在一起。这里的 zip 操作也是如此。

从图上我们可以看到,两个输入流中分别出现了一些数据,当仅仅输入流 A 中出现了数据时,输出流中什么都没有,因为它还在等另一个“齿”。当输出流 B 中出现了数据时,两个“齿”都凑齐了,于是对这两个齿执行中间定义的运算(取 A 的形状,B 的颜色,并合成为输出数据)

  • 可以看到,当任何一个流先行结束之后,整个输出流也就结束了。
  • 拉链创建器适用的场景要少一些,通常用于合并两个数据有对应关系的数据源。比如一个流中是姓名,另一个流中是成绩,还有一个流中是年龄,如果这三个流中的每个条目都有精确的对应关系,那么就可以通过 zip 把它们合并成一个由表示学生成绩的对象组成的流。

3.5 操作符

RxJS 有很多操作符,事实上比创建器还要多一些,但是我们并不需要一一讲解,因为它们中的很大一部分都是函数式编程中的标配,比如 mapreducefilter

3.5.1 retry – 失败时重试

image-20210208204837805

  • 有些错误是可以通过重试进行恢复的,比如临时性的网络丢包。甚至一些流程的设计还会故意借助重试机制,比如当你发起请求时,如果后端发现你没有登录过,就会给你一个 401 错误,然后你可以完成登录并重新开始整个流程。
  • retry 操作符就是负责在失败时自动发起重试的,它可以接受一个参数,用来指定最大重试次数。

这里我为什么一直在强调失败时重试呢?因为还有一个操作符负责成功时重试

3.5.2 repeat – 成功时重试

image-20210208204846757

  • 除了重复的条件之外,repeat 的行为几乎和 retry 一模一样。
  • repeat 很少会单独用,一般会组合上 delay 操作,以提供暂停时间,否则就容易 DoS 了服务器

3.5.3 delay – 延迟

image-20210208204856423

  • 这才是真正的 setTimeout 的等价操作。它接受一个毫秒数(图中是 20 毫秒),每当它从输入流中读取一个数据之后,会先等待 20 毫秒,然后再放到输出流中。
  • 可以看到,输入流和输出流内容是完全一样的,只是时机上,输出流中的每个条目都恰好比输入流晚 20 毫秒出现

3.5.4 toArray – 收集为数组

image-20210208204905368

  • 事实上,你几乎可以把它看做是 from 的逆运算。 from 把数组打散了逐个放进流中,而 toArray 恰好相反,把流中的内容收集到一个数组中 —— 直到这个流结束。
  • 这个操作符几乎总是放在最后一步,因为 RxJS 的各种 operator 本身就可以对流中的数据进行很多类似数组的操作,比如查找最小值、最大值、过滤等。所以通常会先使用各种 operator 对数据流进行处理,等到要脱离 RxJS 的体系时,再转换成数组传出去

3.5.5 debounceTime – 防抖

image-20210208204912979

  • underscore/lodash 中这是常用函数。 所谓防抖其实就是“等它平静下来”。比如预输入(type ahead)功能,当用户正在快速打字的时候,你没必要立刻去查服务器,否则可能直接让服务器挂了,而应该等用户稍作停顿(平静下来)时再发起查询。
  • debounceTime 就是这样,你传入一个最小平静时间,在这个时间窗口内连续过来的数据一概被忽略,一旦平静时间超过它,就会往把接收到的下一条数据放到流中。这样消费者就只能看到平静时间超时之后发来的最后一条数据

3.5.6 switchMap – 切换成另一个流

image-20210208204929447

  • 有时候,我们会希望根据一个立即数发起一个远程查询,并且把这个异步取回的结果放进流中。比如,流中是一些学生的 id,每过来一个 id,你要发起一个 Ajax 请求来根据这个 id 获取这个学生的详情,并且把详情放进输出流中。
  • 注意,这是一个异步操作,所以你没法用普通的 map 来实现,否则映射出来的结果就会是一个个 Observable 对象。
  • switchMap 就是用来解决这个问题的。它在回调函数中接受从输入流中传来的数据,并转换成一个新的 Observable 对象(新的流,每个流中包括三个值,每个值都等于输入值的十倍),switchMap 会订阅这个 Observable 对象,并把它的值放入输出流中。注意图中竖线的位置 —— 只有当所有新的流都结束时,输出流才会结束

四、进一步使用

4.1 Rxjs unsubscribe 取消订阅

Promise 的创建之后,动作是无法撤回的。Observable 不一样,动作可以通过 unsbscribe() 方法中途撤回,而且 Observable 在内部做了智能的处理.

Promise 创建之后动作无法撤回

let promise = new Promise(resolve = >{
setTimeout(() = >{
resolve('---promise timeout---');
},
2000);
});
promise.then(value = >console.log(value));

Rxjs 可以通过 unsubscribe() 可以撤回 subscribe 的动作

let stream = new Observable(observer = >{
let timeout = setTimeout(() = >{
clearTimeout(timeout);
observer.next('observable timeout');
},
2000);
});
let disposable = stream.subscribe(value = >console.log(value));
setTimeout(() = >{
//取消执行 disposable.unsubscribe();
},
1000);

4.2 Rxjs 订阅后多次执行

如果我们想让异步里面的方法多次执行,比如下面代码。
这一点 Promise是做不到的,对于 Promise来说,最终结果要么 resolve(兑现)、要么 reject (拒绝),而且都只能触发一次。如果在同一个 Promise 对象上多次调用 resolve 方法, 则会抛异常。而 Observable不一样,它可以不断地触发下一个值,就像 next()这个方法的 名字所暗示的那样。

// promise
let promise = new Promise(resolve = >{
setInterval(() = >{
resolve('---promise setInterval---');
},
2000);
});
promise.then(value = >console.log(value));
// Rxjs

let stream = new Observable < number > (observer = >{
let count = 0;
setInterval(() = >{
observer.next(count++);
},
1000);
});
stream.subscribe(value = >console.log("Observable>" + value));

五、RXJS 实例操作符小结

常用的实例方法

  • pip :功能类似于 let 操作符
  • map :转换输出的数据
  • pluck :提取属性值并输出
  • do :不做数据格式化,可用于调试
  • filter :用于过滤一些Observable
  • take:表示取几条数据
  • takeWhile :满足什么条件时开始取数据
  • skip :表示跳过多少条数据后开始取
  • distinctUntilChanged :比较输入的Observable计算得出的值当前与后最后的值是否相等使用===判断
  • scan :功能有点类似于reduce这个方法,可用于累加数据同时可以使用startWith的数据用途scan的初始值,最后返回累加的数据
  • delay :表示Observable延时多久开始处理订阅数据
  • toArray :把输出值格式化成数据形式
  • toMap :给当前的输出取个名字或标签
  • expand :实现递归
  • forkJoin :类似于Promise.all,只有数据全部返回且状态为complete时,表示成功处理了请求,否则失败
  • let :这个操作符可以获取完整的输入Observable对象,做相应的处理后返回新的Observable对象
  • catch :用于Observable处理数据异常的处理
  • combineLatest :用于组且各个输入的Observable,并获取和返回各个Observable最新的数据
  • merge :用于把两个Observable合成一个处理

六、更多参考

支持一下
扫一扫,支持poetries
  • 微信扫一扫
  • 支付宝扫一扫