歡迎光臨
每天分享高質量文章

全面教程:在 RxJS 中建立流 | Linux 中國

有些時候,混用響應式和非響應式程式碼似乎很有用。然後大家就開始熱衷流的創造。不論是在編寫非同步程式碼或者是資料處理時,流都是一個不錯的方案。
— Oliver Flaggl


致謝
編譯自 | 
https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a
 
 作者 | Oliver Flaggl
 譯者 | 周家未 (BriFuture) ???共計翻譯:25 篇 貢獻時間:794 天

對大多數開發者來說,與 RxJS 的初次接觸是透過庫的形式,就像 Angular。一些函式會傳回stream,要使用它們就得把註意力放在運運算元上。

有些時候,混用響應式和非響應式程式碼似乎很有用。然後大家就開始熱衷流的創造。不論是在編寫非同步程式碼或者是資料處理時,流都是一個不錯的方案。

RxJS 提供很多方式來建立流。不管你遇到的是什麼情況,都會有一個完美的建立流的方式。你可能根本用不上它們,但瞭解它們可以節省你的時間,讓你少碼一些程式碼。

我把所有可能的方法,按它們的主要目的,放在四個分類當中:

◈ 流式化現有資料
◈ 生成資料
◈ 使用現有 API 進行互動
◈ 選擇現有的流,並結合起來

註意:示例用的是 RxJS 6,可能會以前的版本有所不同。已知的區別是你匯入函式的方式不同了。

RxJS 6

  1. import {of, from} from 'rxjs';

  2. of(...);

  3. from(...);

RxJS < 6

  1. import { Observable } from 'rxjs/Observable';

  2. import 'rxjs/add/observable/of';

  3. import 'rxjs/add/observable/from';

  4. Observable.of(...);

  5. Observable.from(...);

  6. //或

  7. import { of } from 'rxjs/observable/of';

  8. import { from } from 'rxjs/observable/from';

  9. of(...);

  10. from(...);

流的圖示中的標記:

◈ | 表示流結束了
◈ X 表示流出現錯誤並被終結
◈ ... 表示流的走向不定

流式化已有資料

你有一些資料,想把它們放到流中。有三種方式,並且都允許你把排程器當作最後一個引數傳入(你如果想深入瞭解排程器,可以看看我的 上一篇文章[1])。這些生成的流都是靜態的。

of

如果只有一個或者一些不同的元素,使用 of

  1. of(1,2,3)

  2.  .subscribe();

  1. // 結果

  2. // 1 2 3 |

from

如果有一個陣列或者 可迭代的物件 ,而且你想要其中的所有元素傳送到流中,使用 from。你也可以用它來把一個 promise 物件變成可觀測的。

  1. const foo = [1,2,3];

  2. from(foo)

  3.  .subscribe();

  1. // 結果

  2. // 1 2 3 |

pairs

流式化一個物件的鍵/值對。用這個物件表示字典時特別有用。

  1. const foo = { a: 1, b: 2};

  2. pairs(foo)

  3.  .subscribe();

  1. // 結果

  2. // [a,1] [b,2] |

那麼其他的資料結構呢?

也許你的資料儲存在自定義的結構中,而它又沒有實現 可迭代的物件 介面,又或者說你的結構是遞迴的、樹狀的。也許下麵某種選擇適合這些情況:

1. 先將資料提取到陣列裡
2. 使用下一節將會講到的 generate 函式,遍歷所有資料
3. 建立一個自定義流(見下一節)
4. 建立一個迭代器

稍後會講到選項 2 和 3 ,因此這裡的重點是建立一個迭代器。我們可以對一個 可迭代的物件 呼叫 from 建立一個流。 可迭代的物件 是一個物件,可以產生一個迭代器(如果你對細節感興趣,參考 這篇 mdn 文章[2])。

建立一個迭代器的簡單方式是 生成函式generator function[3]。當你呼叫一個生成函式時,它傳回一個物件,該物件同時遵循 可迭代的物件 介面和 迭代器 介面。

  1. // 自定義的資料結構

  2. class List {

  3.  add(element) ...

  4.  get(index) ...

  5.  get size() ...

  6.  ...

  7. }

  8. function* listIterator(list) {

  9.  for (let i = 0; i<list.size; i++) {

  10.    yield list.get(i);

  11.  }

  12. }

  13. const myList = new List();

  14. myList.add(1);

  15. myList.add(3);

  16. from(listIterator(myList))

  17.  .subscribe(console.log);

  1. // 結果

  2. // 1 3 |    

呼叫 listIterator 函式時,傳回值是一個 可迭代的物件 / 迭代器 。函式裡面的程式碼在呼叫 subscribe 前不會執行。

生成資料

你知道要傳送哪些資料,但想(或者必須)動態生成它。所有函式的最後一個引數都可以用來接收一個排程器。他們產生靜態的流。

範圍(range

從初始值開始,傳送一系列數字,直到完成了指定次數的迭代。

  1. range(10, 2)  // 從 10 開始,傳送兩個值

  2.  .subscribe();

  1. // 結果

  2. // 10 11 |

間隔(interval) / 定時器(timer

有點像範圍,但定時器是週期性的傳送累加的數字(就是說,不是立即傳送)。兩者的區別在於在於定時器允許你為第一個元素設定一個延遲。也可以只產生一個值,只要不指定週期。

  1. interval(1000) // 每 1000ms = 1 秒 傳送資料

  2.  .subscribe()

  1. // 結果

  2. // 0  1  2  3  4 ...

  1. delay(5000, 1000) // 和上面相同,在開始前先等待 5000ms

  2. delay(5000)

  3. .subscribe(i => console.log("foo");

  4. // 5 秒後列印 foo

大多數定時器將會用來週期性的處理資料:

  1. interval(10000).pipe(

  2.  flatMap(i => fetch("https://server/stockTicker")

  3. ).subscribe(updateChart)

這段程式碼每 10 秒獲取一次資料,更新螢幕。

生成(generate

這是個更加複雜的函式,允許你傳送一系列任意型別的物件。它有一些多載,這裡你看到的是最有意思的部分:

  1. generate(

  2.  0,           // 從這個值開始

  3.  x => x < 10, // 條件:只要值小於 10,就一直傳送

  4.  x => x*2     // 迭代:前一個值加倍

  5. ).subscribe();

  1. // 結果

  2. // 1 2 4 8 |

你也可以用它來迭代值,如果一個結構沒有實現 可迭代的物件 介面。我們用前面的串列例子來進行演示:

  1. const myList = new List();

  2. myList.add(1);

  3. myList.add(3);

  4. generate(

  5.  0,                  // 從這個值開始

  6.  i => i < list.size, // 條件:傳送資料,直到遍歷完整個串列

  7.  i => ++i,           // 迭代:獲取下一個索引

  8.  i => list.get(i)    // 選擇器:從串列中取值

  9. ).subscribe();

  1. // 結果

  2. // 1 3 |

如你所見,我添加了另一個引數:選擇器。它和 map 運運算元作用類似,將生成的值轉換為更有用的東西。

空的流

有時候你要傳遞或傳回一個不用傳送任何資料的流。有三個函式分別用於不同的情況。你可以給這三個函式傳遞排程器。empty 和 throwError 接收一個排程器引數。

empty

建立一個空的流,一個值也不傳送。

  1. empty()

  2.  .subscribe();

  1. // 結果

  2. // |

never

建立一個永遠不會結束的流,仍然不傳送值。

  1. never()

  2.  .subscribe();

  1. // 結果

  2. // ...

throwError

建立一個流,流出現錯誤,不傳送資料。

  1. throwError('error')

  2.  .subscribe();

  1. // 結果

  2. // X

掛鉤已有的 API

不是所有的庫和所有你之前寫的程式碼使用或者支援流。幸運的是 RxJS 提供函式用來橋接非響應式和響應式程式碼。這一節僅僅討論 RxJS 為橋接程式碼提供的模版。

你可能還對這篇出自 Ben Lesh[4] 的 全面的文章[5] 感興趣,這篇文章講了幾乎所有能與 promises 互動操作的方式。

from

我們已經用過它,把它列在這裡是因為,它可以封裝一個含有 observable 物件的 promise 物件。

  1. from(new Promise(resolve => resolve(1)))

  2.  .subscribe();

  1. // 結果

  2. // 1 |

fromEvent

fromEvent 為 DOM 元素新增一個事件監聽器,我確定你知道這個。但你可能不知道的是,也可以透過其它型別來新增事件監聽器,例如,一個 jQuery 物件。

  1. const element = $('#fooButton'); // 從 DOM 元素中建立一個 jQuery 物件

  2. from(element, 'click')

  3.  .subscribe();

  1. // 結果

  2. // clickEvent ...

fromEventPattern

要理解為什麼有 fromEvent 了還需要 fromEventPattern,我們得先理解 fromEvent 是如何工作的。看這段程式碼:

  1. from(document, 'click')

  2.  .subscribe();

這告訴 RxJS 我們想要監聽 document 中的點選事件。在提交過程中,RxJS 發現 document 是一個 EventTarget 型別,因此它可以呼叫它的 addEventListener 方法。如果我們傳入的是一個 jQuery 物件而非 document,那麼 RxJs 知道它得呼叫 on 方法。

這個例子用的是 fromEventPattern ,和 fromEvent 的工作基本上一樣:

  1. function addClickHandler(handler) {

  2.  document.addEventListener('click', handler);

  3. }

  4. function removeClickHandler(handler) {

  5.  document.removeEventListener('click', handler);

  6. }

  7. fromEventPattern(

  8.  addClickHandler,

  9.  removeClickHandler,

  10. )

  11. .subscribe(console.log);

  12. // 等效於

  13. fromEvent(document, 'click')

RxJS 自動建立實際的監聽器( handler )你的工作是新增或者移除監聽器。fromEventPattern的目的基本上是告訴 RxJS 如何註冊和移除事件監聽器。

現在想象一下你使用了一個庫,你可以呼叫一個叫做 registerListener 的方法。我們不能再用 fromEvent,因為它並不知道該怎麼處理這個物件。

  1. const listeners = [];

  2. class Foo {

  3.  registerListener(listener) {

  4.    listeners.push(listener);

  5.  }

  6.  emit(value) {

  7.    listeners.forEach(listener => listener(value));

  8.  }

  9. }

  10. const foo = new Foo();

  11. fromEventPattern(listener => foo.registerListener(listener))

  12.  .subscribe();

  13. foo.emit(1);

  1. // 結果

  2. // 1 ...

當我們呼叫 foo.emit(1) 時,RxJS 中的監聽器將被呼叫,然後它就能把值傳送到流中。

你也可以用它來監聽多個事件型別,或者結合所有可以透過回呼進行通訊的 API,例如,WebWorker API:

  1. const myWorker = new Worker('worker.js');

  2. fromEventPattern(

  3.  handler => { myWorker.onmessage = handler },

  4.  handler => { myWorker.onmessage = undefined }

  5. )

  6. .subscribe();

  1. // 結果

  2. // workerMessage ...

bindCallback

它和 fromEventPattern 相似,但它能用於單個值。就在回呼函式被呼叫時,流就結束了。用法當然也不一樣 —— 你可以用 bindCallBack 封裝函式,然後它就會在呼叫時魔術般的傳回一個流:

  1. function foo(value, callback) {

  2.  callback(value);

  3. }

  4. // 沒有流

  5. foo(1, console.log); //prints 1 in the console

  6. // 有流

  7. const reactiveFoo = bindCallback(foo);

  8. // 當我們呼叫 reactiveFoo 時,它傳回一個 observable 物件

  9. reactiveFoo(1)

  10.  .subscribe(console.log); // 在控制檯列印 1

  1. // 結果

  2. // 1 |

websocket

是的,你完全可以建立一個 websocket 連線然後把它暴露給流:

  1. import { webSocket } from 'rxjs/webSocket';

  2. let socket$ = webSocket('ws://localhost:8081');

  3. // 接收訊息

  4. socket$.subscribe(

  5.  (msg) => console.log('message received: ' + msg),

  6.  (err) => console.log(err),

  7.  () => console.log('complete') * );

  8. // 傳送訊息

  9. socket$.next(JSON.stringify({ op: 'hello' }));

把 websocket 功能新增到你的應用中真的很簡單。websocket 建立一個 subject。這意味著你可以訂閱它,透過呼叫 next 來獲得訊息和傳送訊息。

ajax

如你所知:類似於 websocket,提供 AJAX 查詢的功能。你可能用了一個帶有 AJAX 功能的庫或者框架。或者你沒有用,那麼我建議使用 fetch(或者必要的話用 polyfill),把傳回的 promise 封裝到一個 observable 物件中(參考稍後會講到的 defer 函式)。

定製流

有時候已有的函式用起來並不是足夠靈活。或者你需要對訂閱有更強的控制。

主題(Subject

Subject 是一個特殊的物件,它使得你的能夠把資料傳送到流中,並且能夠控制資料。Subject 本身就是一個可觀察物件,但如果你想要把流暴露給其它程式碼,建議你使用 asObservable 方法。這樣你就不能意外呼叫原始方法。

  1. const subject = new Subject();

  2. const observable = subject.asObservable();

  3. observable.subscribe();

  4. subject.next(1);

  5. subject.next(2);

  6. subject.complete();

  1. // 結果

  2. // 1 2 |

註意在訂閱前傳送的值將會“丟失”:

  1. const subject = new Subject();

  2. const observable = subject.asObservable();

  3. subject.next(1);

  4. observable.subscribe(console.log);

  5. subject.next(2);

  6. subject.complete();

  1. // 結果

  2. // 2

除了常規的 Subject,RxJS 還提供了三種特殊的版本。

AsyncSubject 在結束後只傳送最後的一個值。

  1. const subject = new AsyncSubject();

  2. const observable = subject.asObservable();

  3. observable.subscribe(console.log);

  4. subject.next(1);

  5. subject.next(2);

  6. subject.complete();

  1. // 輸出

  2. // 2

BehaviorSubject 使得你能夠提供一個(預設的)值,如果當前沒有其它值傳送的話,這個值會被髮送給每個訂閱者。否則訂閱者收到最後一個傳送的值。

  1. const subject = new BehaviorSubject(1);

  2. const observable = subject.asObservable();

  3. const subscription1 = observable.subscribe(console.log);

  4. subject.next(2);

  5. subscription1.unsubscribe();

  1. // 輸出

  2. // 1

  3. // 2

  1. const subscription2 = observable.subscribe(console.log);

  2. // 輸出

  3. // 2

ReplaySubject 儲存一定數量、或一定時間或所有的傳送過的值。所有新的訂閱者將會獲得所有儲存了的值。

  1. const subject = new ReplaySubject();

  2. const observable = subject.asObservable();

  3. subject.next(1);

  4. observable.subscribe(console.log);

  5. subject.next(2);

  6. subject.complete();

  1. // 輸出

  2. // 1

  3. // 2

你可以在 ReactiveX 檔案[6](它提供了一些其它的連線) 裡面找到更多關於 Subject 的資訊。Ben Lesh[4] 在 On The Subject Of Subjects[7] 上面提供了一些關於 Subject 的理解,Nicholas Jamieson[8] 在 in RxJS: Understanding Subjects[9] 上也提供了一些理解。

可觀察物件

你可以簡單地用 new 運運算元建立一個可觀察物件。透過你傳入的函式,你可以控制流,只要有人訂閱了或者它接收到一個可以當成 Subject 使用的觀察者,這個函式就會被呼叫,比如,呼叫 nextcomplet 和 error

讓我們回顧一下串列示例:

  1. const myList = new List();

  2. myList.add(1);

  3. myList.add(3);

  4. new Observable(observer => {

  5.  for (let i = 0; i<list.size; i++) {

  6.    observer.next(list.get(i));

  7.  }

  8.  observer.complete();

  9. })

  10. .subscribe();

  1. // 結果

  2. // 1 3 |

這個函式可以傳回一個 unsubcribe 函式,當有訂閱者取消訂閱時這個函式就會被呼叫。你可以用它來清楚或者執行一些收尾操作。

  1. new Observable(observer => {

  2.  // 流式化

  3.  return () => {

  4.                 //clean up

  5.               };

  6. })

  7. .subscribe();

繼承可觀察物件

在有可用的運運算元前,這是一種實現自定義運運算元的方式。RxJS 在內部擴充套件了 可觀察物件 。Subject 就是一個例子,另一個是 publisher 運運算元。它傳回一個 ConnectableObservable 物件,該物件提供額外的方法 connect

實現 Subscribable 介面

有時候你已經用一個物件來儲存狀態,並且能夠傳送值。如果你實現了 Subscribable 介面,你可以把它轉換成一個可觀察物件。Subscribable 介面中只有一個 subscribe 方法。

  1. interface Subscribable<T> {  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable}

結合和選擇現有的流

知道怎麼建立一個獨立的流還不夠。有時候你有好幾個流但其實只需要一個。有些函式也可作為運運算元,所以我不打算在這裡深入展開。推薦看看 Max NgWizard K[10] 所寫的一篇 文章[11],它還包含一些有趣的動畫。

還有一個建議:你可以透過拖拽元素的方式互動式的使用結合操作,參考 RxMarbles[12]

ObservableInput 型別

期望接收流的運運算元和函式通常不單獨和可觀察物件一起工作。相反,它們實際上期望的引數型別是 ObservableInput,定義如下:

  1. type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

這意味著你可以傳遞一個 promises 或者陣列卻不需要事先把他們轉換成可觀察物件。

defer

主要的目的是把一個 observable 物件的建立延遲(defer)到有人想要訂閱的時間。在以下情況,這很有用:

◈ 建立可觀察物件的開銷較大
◈ 你想要給每個訂閱者新的可觀察物件
◈ 你想要在訂閱時候選擇不同的可觀察物件
◈ 有些程式碼必須在訂閱之後執行

最後一點包含了一個並不起眼的用例:Promises(defer 也可以傳回一個 promise 物件)。看看這個用到了 fetch API 的例子:

  1. function getUser(id) {

  2.  console.log("fetching data");

  3.  return fetch(`https://server/user/${id}`);

  4. }

  5. const userPromise = getUser(1);

  6. console.log("I don't want that request now");

  7. // 其它地方

  8. userPromise.then(response => console.log("done");

  1. // 輸出

  2. // fetching data

  3. // I don't want that request now

  4. // done

只要流在你訂閱的時候執行了,promise 就會立即執行。我們呼叫 getUser 的瞬間,就發送了一個請求,哪怕我們這個時候不想傳送請求。當然,我們可以使用 from 來把一個 promise 物件轉換成可觀察物件,但我們傳遞的 promise 物件已經建立或執行了。defer 讓我們能夠等到訂閱才傳送這個請求:

  1. const user$ = defer(() => getUser(1));

  2. console.log("I don't want that request now");

  3. // 其它地方

  4. user$.subscribe(response => console.log("done");

  1. // 輸出

  2. // I don't want that request now

  3. // fetching data

  4. // done

iif

iif 包含了一個關於 defer 的特殊用例:在訂閱時選擇兩個流中的一個:

  1. iif(

  2.  () => new Date().getHours() < 12,

  3.  of("AM"),

  4.  of("PM")

  5. )

  6. .subscribe();

  1. // 結果

  2. // AM before noon, PM afterwards

取用該檔案:

實際上 iif[13] 能夠輕鬆地用 defer[14] 實現,它僅僅是出於方便和可讀性的目的。

onErrorResumeNext

開啟第一個流並且在失敗的時候繼續進行下一個流。錯誤被忽略掉。

  1. const stream1$ = of(1, 2).pipe(

  2.  tap(i => { if(i>1) throw 'error'}) //fail after first element

  3. );

  4. const stream2$ = of(3,4);

  5. onErrorResumeNext(stream1$, stream2$)

  6.  .subscribe(console.log);

  1. // 結果

  2. // 1 3 4 |

如果你有多個 web 服務,這就很有用了。萬一主伺服器開啟失敗,那麼備份的服務就能自動呼叫。

forkJoin

它讓流並行執行,當流結束時傳送存在陣列中的最後的值。由於每個流只有最後一個值被髮送,它一般用在只傳送一個元素的流的情況,就像 HTTP 請求。你讓請求並行執行,在所有流收到響應時執行某些任務。

  1. function handleResponses([user, account]) {

  2.  // 執行某些任務

  3. }

  4. forkJoin(

  5.  fetch("https://server/user/1"),

  6.  fetch("https://server/account/1")

  7. )

  8. .subscribe(handleResponses);

merge / concat

傳送每一個從可觀察物件源中發出的值。

merge 接收一個引數,讓你定義有多少流能被同時訂閱。預設是無限制的。設為 1 就意味著監聽一個源流,在它結束的時候訂閱下一個。由於這是一個常見的場景,RxJS 為你提供了一個顯示的函式:concat

  1. merge(

  2.  interval(1000).pipe(mapTo("Stream 1"), take(2)),

  3.  interval(1200).pipe(mapTo("Stream 2"), take(2)),

  4.  timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),

  5.  2 //two concurrent streams

  6. )

  7. .subscribe();

  8. // 只訂閱流 1 和流 2

  9. // 輸出

  10. // Stream 1 -> after 1000ms

  11. // Stream 2 -> after 1200ms

  12. // Stream 1 -> after 2000ms

  13. // 流 1 結束後,開始訂閱流 3

  14. // 輸出

  15. // Stream 3 -> after 0 ms

  16. // Stream 2 -> after 400 ms (2400ms from beginning)

  17. // Stream 3 -> after 1000ms

  18. merge(

  19.  interval(1000).pipe(mapTo("Stream 1"), take(2)),

  20.  interval(1200).pipe(mapTo("Stream 2"), take(2))

  21.  1

  22. )

  23. // 等效於

  24. concat(

  25.  interval(1000).pipe(mapTo("Stream 1"), take(2)),

  26.  interval(1200).pipe(mapTo("Stream 2"), take(2))

  27. )

  28. // 輸出

  29. // Stream 1 -> after 1000ms

  30. // Stream 1 -> after 2000ms

  31. // Stream 2 -> after 3200ms

  32. // Stream 2 -> after 4400ms

zip / combineLatest

merge 和 concat 一個接一個的傳送所有從源流中讀到的值,而 zip 和 combineLatest 是把每個流中的一個值結合起來一起傳送。zip 結合所有源流中傳送的第一個值。如果流的內容相關聯,那麼這就很有用。

  1. zip(

  2.  interval(1000),

  3.  interval(1200),

  4. )

  5. .subscribe();

  1. // 結果

  2. // [0, 0] [1, 1] [2, 2] ...

combineLatest 與之類似,但結合的是源流中傳送的最後一個值。直到所有源流至少傳送一個值之後才會觸發事件。這之後每次源流傳送一個值,它都會把這個值與其他流傳送的最後一個值結合起來。

  1. combineLatest(

  2.  interval(1000),

  3.  interval(1200),

  4. )

  5. .subscribe();

  1. // 結果

  2. // [0, 0] [1, 0] [1, 1] [2, 1] ...

兩個函式都讓允許傳遞一個選擇器函式,把元素結合成其它物件而不是陣列:

  1. zip(

  2.  interval(1000),

  3.  interval(1200),

  4.  (e1, e2) -> e1 + e2

  5. )

  6. .subscribe();

  1. // 結果

  2. // 0 2 4 6 ...

race

選擇第一個傳送資料的流。產生的流基本是最快的。

  1. race(

  2.  interval(1000),

  3.  of("foo")

  4. )

  5. .subscribe();

  1. // 結果

  2. // foo |

由於 of 立即產生一個值,因此它是最快的流,然而這個流就被選中了。

總結

已經有很多建立可觀察物件的方式了。如果你想要創造響應式的 API 或者想用響應式的 API 結合傳統 API,那麼瞭解這些方法很重要。

我已經向你展示了所有可用的方法,但它們其實還有很多內容可以講。如果你想更加深入地瞭解,我極力推薦你查閱 檔案[15] 或者閱讀相關文章。

RxViz[16] 是另一種值得瞭解的有意思的方式。你編寫 RxJS 程式碼,產生的流可以用圖形或動畫進行顯示。


via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a

作者:Oliver Flaggl[18] 譯者:BriFuture 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出

贊(0)

分享創造快樂

© 2024 知識星球   網站地圖