很长一段时间没有写 Angular 了 (哎...全栈的命),近期计划又要开始回去写了,于是就开始做复习咯。

我的复习是从 JS > TS > RxJS > Angular,与此同时当然是顺便写一系列半教程半复习的文章咯,我当年写了许多 Angular 的学习笔记,只是文笔太烂,这次得好好写了。

JS 已经复习到七七八八了,TS 老是提不起劲去写,所以就改成边写 TS 边写 RxJS 吧。

鐵人賽 – 30 天精通 RxJS 系列

鐵人賽 – 打通 RxJS 任督二脈 系列

以前写过相关的文章:

angular2 学习笔记 ( Rxjs, Promise, Async/Await 的区别 )

angular2 学习笔记 ( rxjs 流 )

什么是流 (stream) ?

RxJS 参杂了许多概念,什么函数式,观察者,异步等等。

但我个人觉得最容易理解的部分是 stream (流)。

stream 是什么?它表示一段时间内一个变化的状态。

在 JS 里,状态可以被理解为某个值,variable 的值。

时间则是用户使用 App 的时间。

看例子吧:

上图 (gif) 大概是 5 秒钟,这个就是时间,在这 5 秒中里面,价钱 (值) 变化了好几次 (160 -> 190 -> 200 -> 250)

一个有时间,有变化的值就可以理解为一个 stream,所以价钱就是一个 Stream。

Why Stream? Because... 管理

为什么要用 "stream" 概念去理解这些 "值"?不能简单的理解为 "点击" -> "更新 value" ?

当然可以,其实 stream 概念并不是为了理解,而是为了管理。

当程序里出现越来越多,变来变去的值以后,出现 bug 的几率会越来越高,而追踪 bug 也越来越吃力。

所以就必须整一套概念来管理它们,这就好比你用 Redux 来管理 React 的 state 一样。

以前有许多人拿 redux 去管理简单的程序,结果就是大材小用,反而是 redux 本身增加了整个系统的复杂度...幸好后来出现了 hook 才把这群人拉了出来...(永远记得,软件开发一定要看清楚当前项目需求,选择合适的方案而不是最牛逼的方案)

Computed

上面提到了,stream 的其中一个特色就是变化。一个东西变化了,那么依赖它的东西通常也会跟着变化 -- 蝴蝶效应🦋

我们在写 Excel 的时候经常会写这样的逻辑 cell

full name 这个值,来自 first name + ' ' + last name,

而每当 first name 或 last name 变化以后,full name 也随之变化。

在上面这个例子里,first name, last name 就是 stream。随着时间它会发生变化。

而 full name 算是一个 depend and addon stream。它也会变化,同时它依赖其它的 stream 和一些额外的处理逻辑。

用 RxJS 来表达这类型的场景会非常贴切。

体验一下:

Without RxJS 实现:

const firstName = document.querySelector<HTMLInputElement>('#first-name')!;
const lastName = document.querySelector<HTMLInputElement>('#last-name')!;
const fullName = document.querySelector<HTMLSpanElement>('#full-name')!;
for (const input of [firstName, lastName]) {
  input.addEventListener('input', () => {
    fullName.textContent = `${firstName.value} ${lastName.value}`;

用 RxJS 来实现:

const firstNameInput = document.querySelector<HTMLInputElement>('#first-name')!;
const lastNameInput = document.querySelector<HTMLInputElement>('#last-name')!;
const fullNameSpan = document.querySelector<HTMLSpanElement>('#full-name')!;
// 表达 stream
const firstName$ = fromEvent(firstNameInput, 'input').pipe(
  map(() => firstNameInput.value),
  startWith(firstNameInput.value)
const lastName$ = fromEvent(lastNameInput, 'input').pipe(
  map(() => lastNameInput.value),
  startWith(lastNameInput.value)
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
  map(([firstName, lastName]) => `${firstName} ${lastName}`)
// 消费 stream
fullName$.subscribe(fullName => {
  fullNameSpan.textContent = fullName;

哇...怎么更复杂了...所以啊,上面说了,程序简单就没必要搞 RxJS 啊。

但你看看它的管理是不错的,表达 stream 负责描述 stream 的来源。

尤其是那个 combine stream 的表达尤其加分。

消费 stream 则可以做许多事情 (比如 render view)

这样 stream 可以被多个地方复用。

赠送一个优化版本:

// 这个可以封装起来
function fromInput(input: HTMLInputElement): Observable<string> {
  return fromEvent(input, 'input').pipe(
    map(() => input.value),
    startWith(input.value)
// 表达 stream
const firstName$ = fromInput(document.querySelector<HTMLInputElement>('#first-name')!);
const lastName$ = fromInput(document.querySelector<HTMLInputElement>('#last-name')!);
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
  map(([firstName, lastName]) => `${firstName} ${lastName}`)
// 消费 stream
const fullNameSpan = document.querySelector<HTMLSpanElement>('#full-name')!;
fullName$.subscribe(fullName => {
  fullNameSpan.textContent = fullName; // render view

Stream like a production line

Stream 通常指河流,但我觉得 RxJS Stream 更像是工厂里的生产线 / 流水线。

我们想象一间工厂的生产线长什么样。

  • 生产线是一条长长的输送带
  • 输送带旁边有 Operators 操作员 (或者 robot)
  • 输送带上面有原材料、半成品、产品
  • 流水线的源头是原材料,结尾是产品
  • 流水线在运作的过程中,原材料从源头往结尾输送,它们会经过 Operators,
    Operators 会对原材料加工,变成半成品,然后再加工,变成最终的产品。
    往细节讲,Operators 还可能负责把次品选出来拿去 rework 等等不同的操作。
  • 上面是 Overview,我们再细看它的流程。

  • 生产线不是一开始就运作的,如果没有订单,生产线是不启动的,输送带也不会跑,输送带上也没有任何东西。
  • 当订单来了,生产线开始运作,输送带开始跑。但是源头的原材料不一定马上就有,因为还得等供应商提供。
    当供应商供应原材料后,产品开始生产出货。
  • 当订单完成或者被取消,生产线就关闭了。
  • RxJS 有几个基础步骤,大致上可以对应上面的各个场景。

  • Observable
    Observable 就是一个生产线,它负责定义源头
    比如下面这句
    const firstNameInput = document.querySelector('input')!;
    const inputEvent$: Observable<Event> = fromEvent(firstNameInput, 'input');

    它的意思是创建了一个生产线,生产线的供应商是 input element 的 input event。
    当 input event dispatch 生产线就得到 input event 对象,这个就是原材料。

  • Pipe
    Pipe 就是输送带,它没有实际意义,你可以把它理解为 Operators 的一个 container。
    const inputEvent$: Observable<Event> = fromEvent(firstNameInput, 'input').pipe();
  • Operators
    Operators 就是操作员或 robot。
    const firstName$: Observable<string> = fromEvent(firstNameInput, 'input').pipe(
      map(e => (e.currentTarget as HTMLInputElement).value),
      filter(firstName => firstName !== '')
    

    map 是一个 Operator,它负责把原材料 input event 加工变成半成品/产品 input value。
    filter 是一个 Operator,它负责过滤出合格的产品,比如 value !== '' 才算合格的产品,不合格的不可以交给买家。

  • Subscribe
    subscribe 就是下订单
    const firstName$: Observable<string> = fromEvent(firstNameInput, 'input').pipe(
      map(() => firstNameInput.value),
      filter(firstName => firstName !== '')
    firstName$.subscribe(firstName => console.log(firstName));

    整个过程是这样发生的:

  • 供应商是 input event listening
  • 原材料是 input event
  • map 操作员负责把原材料 input event 加工成产品 input value
  • filter 操作员负责过滤出合格的产品 -- value !== ''
  • 在下订单 (subscribe) 前,生产线 (Observable) 是停滞的,工厂也不会去跟供应商订货 (no yet input.addEventListener)
  • 下订单后,工厂才开始想供应商要原材料 (input.addEventListener),此时生产线任然是空的,要等待供应商发货 (input dispatch event)
  • 当原材料来了以后,经过 map operator 加工,filter operator 过滤次品,如果最终有产品就交付给买家。
  • Deferred Execution (延期执行)

    上一 part 我们提到,如果没有人下订单 (subscribe),生产线 (Observable) 就是停滞的状态。

    这个在 RxJS 被称为 Deferred Execution (延期执行)。

    读历史的就知道,RxJS 是 C# LINQ 的衍生品,Deferred Execution 正是 LINQ 的特色之一。

    const documentClicked$ = fromEvent(document, 'click');
    setTimeout(() => {
      documentClicked$.subscribe(() => console.log('clicked'));
    }, 1000);

    fromEvent 是 document.addEventListener 的 RxJS 写法。

    当 fromEvent 调用后,RxJS 并不会马上去 addEventListener。

    而是等到 1 秒后 documentClicked$ stream 被 subscribe 后,才去 addEventListner。

    这就是所谓的 Deferred Execution。

    如果没有了 subscribe,所有 RxJS 都只是 declaration 而已。

    Stream 与 Array 的关系

    Stream 是一段时间内一个变化的状态,如果把每一次的改变放在一起看,那么它会长得像 Array。

    let value = 1;
    value = 2;
    value = 3;
    const value$ = [1, 2, 3];

    Array 有 map, filter

    Stream 也有 map, filter

    因为这些都是对 value 的加工处理,这是它俩相像的地方。

    它俩的主要区别在处理 value 的 timing。

    [1, 2, 3].map(v => v + 1); // [2, 3 ,4]

    Array 是静态的,一开始就有 [1, 2, 3] -> 然后 map -> 输出 [2, 3, 4] -> 结束。

    Stream 是动态的,一开始是空,某个事件发布后 -> 有了 1 -> 经过 map 输出 2,此时还么结束。

    又发布一个 2 -> 经过 map 输出 3 -> 又发布一个 3 -> 以此类推...

    总结:它们的处理过程很像,只是 Stream 多了一个动态和时间的概念。

    RxJS 与 Angular 的关系

    Angular 为什么引入了 RxJS 概念?

    其最大的原因就是为了实现 change detection,当 Model 改变的时候 View 需要被更新,这就是一个典型的观察者模式。

    Angular 虽然使用 RxJS,但并没有很重,常见的地方只有 HttpClient、Router、Form。

    我们在写 Angular Application 的时候也不需要强制自己去写 RxJS,适量的运用就可以了。

    Observable vs Promise

    两者区别还是挺大的:

  • Promise 一定是异步,Observable 可能是同步,也可能是异步。
  • Promise 只会发布一次。Observable 可能会发布多次。
  • Observable 会延迟执行,Promise 会立刻执行。
  • Observable 被 subscribe 多次会导致多次执行 (unitcast 概念),Promise 被 then 多次依然只会执行一次。
  • 当 Observable 被立刻 subscribe 执行,同时它内部是一个异步发布,而且只发布一次,这个时候它和 Promise 最像,通常使用 Promise 会更恰当。
  •