歡迎光臨
每天分享高質量文章

響應式編程知多少 | Rx.NET 瞭解下

1. 引言

An API for asynchronous programming with observable streams.
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.ReactiveX 使用可觀察資料流進行異步編程的API。
ReactiveX結合了觀察者樣式、迭代器樣式和函式式編程的精華

關於Reactive(本文統一譯作響應式),有一個The Reactive Manifesto【響應式宣言】:響應式系統(Reactive System)具備以下特質:即時響應性(Responsive)、回彈性(Resilient)、彈性(Elastic)以及訊息驅動(Message Driven)。

很顯然開發一個響應式系統,並不簡單。
那本文就來講一講如何基於Rx.NET進行響應式編程,進而開發更加靈活、松耦合、可伸縮的響應式系統。

2. 編程範式

在開始之前呢,我們有必要瞭解下幾種編程範式:命令式編程、宣告式編程、函式式編程和響應式編程。

命令式編程:命令式編程的主要思想是關註計算機執行的步驟,即一步一步告訴計算機先做什麼再做什麼。

  1. //1. 宣告變數
  2. List results = new List();
  3. //2. 迴圈變數
  4. foreach(var num in Enumerable.Range(1,10))
  5. {
  6.    //3. 添加條件
  7.    if (num > 5)
  8.    {  
  9.        //4. 添加處理邏輯
  10.        results.Add(num);
  11.        Console.WriteLine(num);
  12.    }
  13. }

宣告式編程:宣告式編程是以資料結構的形式來表達程式執行的邏輯。它的主要思想是告訴計算機應該做什麼,但不指定具體要怎麼做。

  1. var nums = from num in Enumerable.Range(1,10) where num > 5 select num

函式式編程:主要思想是把運算過程儘量寫成一系列嵌套的函式呼叫。

  1. Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);

響應式編程:響應式編程是一種面向資料流和變化傳播的編程範式,旨在簡化事件驅動應用的實現。響應式編程專註於如何創建依賴於變更的資料流並對變化做出響應。

  1. IObservable nums = Enumerable.Range(1, 10).ToObservable();
  2.  
  3. IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);
  4.  
  5. subscription.Dispose();

3. Hello Rx.NET

從一個簡單的Demo開始。
假設我們現在模擬電熱壺燒水,實時輸出當前水溫,一般我們會這樣做:

  1. Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);
  2. // do something else. 阻塞

假設當前程式是智慧家居的中控設備,不僅控制電熱壺燒水,還控制其他設備,為了避免阻塞主執行緒。一般我們會創建一個Thread或Task去做。

  1. Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));
  2. // do something else. 非阻塞

假設現在我們不僅要在控制台輸出而且還要實時通過揚聲器報警。這時我們應該想到委托和事件。

  1. class Heater
  2. {
  3.    private delegate void TemperatureChanged(int temperature);
  4.    private event TemperatureChanged TemperatureChangedEvent;
  5.    public void BoilWater()
  6.    {
  7.        TemperatureChangedEvent += ShowTemperature;
  8.        TemperatureChangedEvent += MakeAlerm;
  9.        Task.Run(
  10.            () =>
  11.        Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
  12.        );
  13.    }
  14.    private void ShowTemperature(int temperature)
  15.    {
  16.        Console.WriteLine($"當前溫度:{temperature}");
  17.    }
  18.    private void MakeAlerm(int temperature)
  19.    {
  20.        Console.WriteLine($"嘟嘟嘟,當前水溫{temperature}");
  21.    }
  22. }
  23. class Program
  24. {
  25.    static void Main(string[] args)
  26.    {
  27.        Heater heater = new Heater();        
  28.        heater.BoilWater();
  29.    }
  30. }

瞬間代碼量就上去了。但是借助Rx.NET,我們可以簡化成以下代碼:

  1. var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可觀察序列
  2. Subject subject = new Subject();//申明Subject
  3. subject.Subscribe((temperature) => Console.WriteLine($"當前溫度:{temperature}"));//訂閱subject
  4. subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,當前水溫:{temperature}"));//訂閱subject
  5. observable.Subscribe(subject);//訂閱observable

僅僅通過以下三步:

  1. 呼叫 ToObservable將列舉序列轉換為可觀察序列。
  2. 通過指定 NewTheadScheduler.Default來指定在單獨的執行緒進行列舉。
  3. 呼叫 Subscribe方法進行事件註冊。
  4. 借助 Subject進行多播傳輸

通過以上我們可以看到Rx.NET大大簡化了事件處理的步驟,而這隻是Rx的冰山一角。

4. Rx.NET 核心

Reactive Extensions(Rx)是一個為.NET應用提供響應式編程模型的庫,用來構建異步基於事件流的應用,通過安裝 System.ReactiveNuget包進行取用。Rx將事件流抽象為Observable sequences(可觀察序列)表示異步資料流,使用LINQ運算子查詢異步資料流,並使用 Scheduler來控制異步資料流中的併發性。簡單地說:Rx = Observables + LINQ + Schedulers。

在軟體系統中,事件是一種訊息用於指示發生了某些事情。事件由Event Source(事件源)引發並由Event Handler(事件處理程式)使用。
在Rx中,事件源可以由observable表示,事件處理程式可以由observer表示。
但是應用程式使用的資料如何表示呢,例如資料庫中的資料或從Web服務器獲取的資料。而在應用程式中我們一般處理的資料無外乎兩種:靜態資料和動態資料。 但無論使用何種型別的資料,其都可以作為流來觀察。換句話說,資料流本身也是可觀察的。也就意味著,我們也可以用observable來表示資料流。

講到這裡,Rx.NET的核心也就一目瞭然了:

  1. 一切皆為資料流
  2. Observable 是對資料流的抽象
  3. Observer是對Observable的響應

在Rx中,分別使用 IObservable<T>IObserver<T>接口來表示可觀察序列和觀察者。它們預置在system命名空間下,其定義如下:

  1. public interface IObservable<out T>
  2. {
  3.      //Notifies the provider that an observer is to receive notifications.
  4.      IDisposable Subscribe(IObserver<T> observer);
  5. }
  6.  
  7. public interface IObserver<in T>
  8. {
  9.    //Notifies the observer that the provider has finished sending push-based notifications.
  10.    void OnCompleted();
  11.  
  12.    //Notifies the observer that the provider has experienced an error condition.
  13.    void OnError(Exception error);
  14.  
  15.    //Provides the observer with new data.
  16.    void OnNext(T value);
  17. }

5. 創建IObservable

創建 IObservable<T>主要有以下幾種方式:

1. 直接實現 IObservable<T>接口

2. 使用 Observable.Create創建

  1. Observable.Create(observer=>{
  2.    for (int i = 0; i < 5; i++)
  3.    {
  4.        observer.OnNext(i);
  5.    }
  6.    observer.OnCompleted();
  7.    return Disposable.Empty;
  8. })

3. 使用 Observable.Deffer進行延遲創建(當有觀察者訂閱時才創建)比如要連接資料庫進行查詢,如果沒有觀察者,那麼資料庫連接會一直被占用,這樣會造成資源浪費。使用Deffer可以解決這個問題。

  1. Observable.Defer(() =>
  2. {
  3.    var connection = Connect(user, password);
  4.    return connection.ToObservable();
  5. });

4. 使用 Observable.Generate創建迭代型別的可觀察序列

  1. IObservable observable =
  2.    Observable.Generate(
  3.        0,              //initial state
  4.        i => i < 10,    //condition (false means terminate)
  5.        i => i + 1,     //next iteration step
  6.        i => i * 2);      //the value in each iteration

5. 使用 Observable.Range創建指定區間的可觀察序列

  1. IObservable observable = Observable.Range (0, 10).Select (i => i * 2);

6. 創建特殊用途的可觀察序列

  1. Observable.Return ("Hello World");//創建單個元素的可觀察序列
  2. Observable.Never ();//創建一個空的永遠不會結束的可觀察序列
  3. Observable.Throw<ApplicationException> (
  4. new ApplicationException ("something bad happened"))//創建一個丟擲指定異常的可觀察序列
  5. Observable.Empty ()//創建一個空的立即結束的可觀察序列

7. 使用 ToObservable轉換 IEnumerate和Task型別

  1. Enumerable.Range(1, 10).ToObservable();
  2. IObservable<IEnumerable> resultsA = searchEngineA.SearchAsync(term).ToObservable();

8. 使用 Observable.FromEventPattern<T>Observable.FromEvent<TDelegate,TEventArgs>進行事件的轉換

  1. public delegate void RoutedEventHandler(object sender,
  2. System.Windows.RoutedEventArgs e)
  3. IObservable<EventPattern<RoutedEventArgs>> clicks =
  4.                Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
  5.                    h => theButton.Click += h,
  6.                    h => theButton.Click -= h);
  7. clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);

9. 使用 Observable.Using進行資源釋放

  1. IObservable lines =
  2.    Observable.Using (
  3.        () => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with
  4.        stream =>
  5.        Observable.Generate (
  6.            stream, //initial state
  7.            s => !s.EndOfStream, //we continue until we reach the end of the file
  8.            s => s, //the stream is our state, it holds the position in the file
  9.            s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)
  10.    );

10. 使用 Observable.Interval創建指定間隔可觀察序列

11. 使用 Observable.Timer創建可觀察的計時器

6. RX 運算子

創建完IObservable後,我們可以對其應用系列Linq運算子,對其進行查詢、過濾、聚合等等。Rx內置了以下系列運算子:下麵通過圖示來解釋常用運算子的作用:

7. 多播傳輸靠:Subject

基於以上示例,我們瞭解到,借助Rx可以簡化事件模型的實現,而其實質上就是對觀察者樣式的擴展。提到觀察者樣式,我們知道一個Subject可以被多個觀察者訂閱,從而完成訊息的多播。同樣,在Rx中,也引入了Subject用於多播訊息傳輸,不過Rx中的Subject具有雙重身份——即是觀察者也是被觀察者。

  1. interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>
  2. {
  3. }

Rx中預設提供了以下四種實現:

Subject– 向所有觀察者廣播每個通知

AsyncSubject– 當可觀察序列完成後有且僅發送一個通知

ReplaySubject– 快取指定通知以對後續訂閱的觀察者進行重放

BehaviorSubject– 推送預設值或最新值給觀察者

但對於第一種 Subject<T>有一點需要指出,當其有多個觀察者序列時,一旦其中一個停止發送訊息,則Subject就停止廣播所有其他序列後續發送的任何訊息。

8. 有溫度的可觀察者序列

對於Observable,它們是有溫度的,有冷熱之分。它們的區別如下圖所示:

Cold Observable:有且僅當有觀察者訂閱時才發送通知,且每個觀察者獨享一份完整的觀察者序列。

Hot Observable:不管有無觀察者訂閱都會發送通知,且所有觀察者共享同一份觀察者序列。

9. 一切皆在掌控:Scheduler

在Rx中,使用Scheduler來控制併發。而對於Scheduler我們可以理解為程式調度,通過Scheduler來規定在什麼時間什麼地點執行什麼事情。Rx提供了以下幾種Scheduler:

  1. NewThreadScheduler:即在新執行緒上執行

  2. ThreadPoolScheduler:即在執行緒池中執行

  3. TaskPoolScheduler:同ThreadPoolScheduler

  4. CurrentThreadScheduler:在當前執行緒執行

  5. ImmediateScheduler:在當前執行緒立即執行

  6. EventLoopScheduler:創建一個後臺執行緒按序執行所有操作

舉例而言:

  1. Observable.Return("Hello",NewThreadScheduler.Default)
  2. .Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
  3. );
  4. Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");
  5.  
  6. 以上輸出:
  7. Current ThreadId1
  8. Hello on ThreadId4

10. 最後

羅里吧嗦的總算把《Rx.NET In Action》這本書的內容大致梳理了一遍,對Rx也有了一個更深的認識,Rx擴展了觀察者樣式用於支持資料和事件序列,內置系列運算子允許我們以宣告式的方式組合這些序列,且無需關註底層的實現進行事件驅動開發:如執行緒、同步、執行緒安全、併發資料結構和非阻塞IO。

但事無巨細,難免疏漏。對響應式編程有興趣的不妨拜讀下此書,相信對你會大有裨益。

參考資料:

Rx.NET in Action.pdf

ReactiveX

.Net中的反應式編程(Reactive Programming)

 

    赞(0)

    分享創造快樂