今天再來介紹幾個常用的「建立/組合」類型的 operators。

combineLatest

combineLatest 跟昨天介紹過的 zip 非常像,差別在於 zip 會依序組合,而 combineLatest 會在資料流有事件發生時,直接跟目前其他資料流的「最後一個事件」組合在一起,也因此這個 operator 是 latest 結尾,另一個不同的地方是, comeintLatest 內的參數是一個 Observable 陣列,訂閱後會把陣列內的這些 Observables 組合起來;直接看看程式碼:

const sourceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`)
const sourceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`)
const sourceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`)
const subscription = combineLatest([sourceA$, sourceB$, sourceC$])
	.subscribe(data => console.log(`combineLatest 範例: ${data}`));
// combineLatest 範例: A3,B1,C1
// combineLatest 範例: A4,B1,C1
// combineLatest 範例: A4,B2,C1
// combineLatest 範例: A5,B2,C1
// combineLatest 範例: A6,B2,C1
// combineLatest 範例: A6,B3,C1
// combineLatest 範例: A6,B3,C2
// combineLatest 範例: A7,B3,C2
// combineLatest 範例: A8,B3,C2
// combineLatest 範例: A8,B4,C2
// combineLatest 範例: A9,B4,C2
// combineLatest 範例: A9,B4,C3

RxJS 5 之前參數不用放在陣列內,直接一個一個放進去就好,不過後來改成用陣列了,因此不放陣列的方法也被標記為棄用囉。

從結果可以看到每次有事件發生時都會將其他 Observable 最後發生的事件值組合起來,而 A1 發生時,因為其他 Observable 還沒有任何新事件,因此沒有辦法組合,所以直到 C1 發生時,全部 Observable 都有「最後一次事件值」,才進行組合。

sourceA$: --A1--A2--A3        --A4        --A5......           
sourceB$:   ----B1            --B2        ....
sourceC$:     ------C1                          
zip(sourceA$, sourceB$, sourceC$)
              ------**        --**        --**.......
                [A3,B1,C1]  [A4,B1,C1]  
                            [A4,B2,C1] (兩個來源 Observable 同時發生事件)

combineLatest 在實務上組合各種 Observables 的時候還蠻常用的,之後實戰練習的時候再來詳細介紹一下。

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-combinelatest

forkJoin

forkJoin 會同時訂閱傳入的 Observables,直到每個 Observable 都「結束」後,將每個 Observable 的「最後一筆值」組合起來,一樣直接上程式碼:

import { interval, forkJoin } from 'rxjs';
import { map, take } from 'rxjs/operators';
const sourceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`),
  take(5)
const sourceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`),
  take(4)
const sourceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`),
  take(3)
forkJoin([sourceA$, sourceB$, sourceC$]).subscribe({
  next: data => console.log(`forkJoin 範例: ${data}`),
  complete: () => console.log('forkJoin 結束')
// forkJoin 範例: A5,B4,C3
// forkJoin 結束

因為要等所有 Observables 都結束,因此這裡示範使用了 take() 這個 operators 會在事件發生指定次數後結束整個 Observable。

上面的程式碼,最後結束的會是sourceC$ 的 C3,此時 sourceA$sourceB 都已經結束了,事件值分別是 A5 和 B4,因此最後訂閱時會得到一個 [A5, B4, C3] 然後結束。

sourceA$: --A1--A2--A3--A4--A5|
sourceB$: ----B1  ----B2  ----B3|
sourceC$:     ------C1    ------C2    ------C3|
forkJoin(sourceA$, sourceB$, sourceC#)
              ------      ------      ------**|
                                        [A5,B3,C3]

在實務上,我們會使用 forkJoin 去平行發送多個沒有順序性的 HTTP 請求,因為 HTTP 請求只會發生一次回傳就結束,如果每個請求之前沒有順序性,那麼一起發送會是比較快可以拿到全部資料的方法,例如:

const profile$ = ajax('/me/rofile');
const friends$ = ajax('/me/friends');
forkJoin(profile$, friend$).subscribe((profile, friends) => {
  // 同時處理 profile 和 friends 資料

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-forkjoin

race 本身就有「競速」的意思,因此這個 operator 接受的參數一樣是數個 Observables,當訂閱發生時,這些 Observables 會同時開跑,當其中一個 Observable 率先發生事件後,就會以這個 Observable 為主,並退訂其他的 Observables,也就是先到先贏,其他都是輸家:

const sourceA$ = interval(1000).pipe(
  map(data => `A${data + 1}`)
const sourceB$ = interval(2000).pipe(
  map(data => `B${data + 1}`)
const sourceC$ = interval(3000).pipe(
  map(data => `C${data + 1}`)
const subscription = race([sourceA$, sourceB$, sourceC$])
  .subscribe(data => console.log(`race 範例: ${data}`));
// A1
// A2
// A3
// ... (因為 sourceA$ 已經先到了,其他 Observables 就退訂不處理)
sourceA$: --A1--A2--A3.....
sourceB$:   ----B1.........
sourceC$:     ------C1.....
race(sourceA$, sourceB$, sourceC$)
          --A1--A2--A3..... 
            ^ sourceA$ 先到了,因此退訂 sourceB$ 和 sourceC$

這種只有一條 Observable 可以存活的感覺,讓人想忍不住到這張圖啊:

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-race

這兩天我們介紹了「組合/建立類型」的 operators,這類型的 opereators 都是透過不同的 Observables 來源組合成新的 Observables,讓我們能更靈活的操作各種 Observables。

combineLatest:同時訂閱所有內部 Observables,並將內部 Observables 裡面的最後一次事件資料組合起來。 forkJoin:同時訂閱所有內部 Observables,並將內部 Observables 「完成」前的最後一個事件資料組合起來。 race:同時訂閱所有內部 Observables,當其中一個 Observable 先發生第一次事件後,以此 Observable 為主,並將其他 Observable 取消訂閱。

跟建立類型相關的 operators 介紹到此為止,明天開始讓我們介紹這種轉換資料流向的 operators 囉。

Operators - combineLatest Operators - forkJoin Operators - race