歡迎光臨
每天分享高質量文章

MediatR 知多少

引言

首先不用查字典了,詞典查無此詞。猜測是作者筆誤將Mediator寫成MediatR了。廢話少說,轉入正題。

先來簡單瞭解下這個開源專案MediatR(作者Jimmy Bogard,也是開源專案AutoMapper的建立者,在此表示膜拜):

Simple mediator implementation in .NET. In-process messaging with no dependencies. Supports request/response, commands, queries, notifications and events, synchronous and async with intelligent dispatching via C# generic variance.

.NET中的簡單中介者樣式實現,一種行程內訊息傳遞機制(無其他外部依賴)。 支援以同步或非同步的形式進行請求/響應,命令,查詢,通知和事件的訊息傳遞,並透過C#泛型支援訊息的智慧排程。

如上所述,其核心是一個中介者樣式的.NET實現,其目的是訊息傳送和訊息處理的解耦。它支援以單播和多播形式使用同步或非同步的樣式來釋出訊息,建立和偵聽事件。

中介者樣式

既然是對中介者樣式的一種實現,那麼我們就有必要簡要介紹下中介者這個設計樣式,以便後續展開。

中介者樣式:用一個中介物件封裝一系列的物件互動,中介者使各物件不需要顯示地相互作用,從而使耦合鬆散,而且可以獨立地改變它們之間的互動。

看上面的官方定義可能還是有點繞,那麼下麵這張圖應該能幫助你對中介者樣式有個直觀瞭解。

使用中介樣式,物件之間的互動將封裝在中介物件中。物件不再直接相互互動(解耦),而是透過中介進行互動。這減少了物件之間的依賴性,從而減少了耦合。

那其優缺點也在圖中很容易看出:

優點:中介者樣式的優點就是減少類間的依賴,把原有的一對多的依賴變成了一對一的依賴,同事類只依賴中介者,減少了依賴,當然同時也降低了類間的耦合

缺點:中介者樣式的缺點就是中介者會膨脹得很大,而且邏輯複雜,原本N個物件直接的相互依賴關係轉換為中介者和同事類的依賴關係,同事類越多,中介者的邏輯就越複雜。

Hello MeidatR

在開始之前,我們先來瞭解下其基本用法。

單播訊息傳輸

單播訊息傳輸,也就是一對一的訊息傳遞,一個訊息對應一個訊息處理。其透過 IRequest來抽象單播訊息,用 IRequestHandler進行訊息處理。

  1. //構建 訊息請求
  2. public class Ping : IRequest { }
  3. //構建 訊息處理
  4. public class PingHandler : IRequestHandler<Ping, string> {
  5.    public Task Handle(Ping request, CancellationToken cancellationToken) {
  6.        return Task.FromResult("Pong");
  7.    }
  8. }
  9. //傳送 請求
  10. var response = await mediator.Send(new Ping());
  11. Debug.WriteLine(response); // "Pong"

多播訊息傳輸

多播訊息傳輸,也就是一對多的訊息傳遞,一個訊息對應多個訊息處理。其透過 INotification來抽象多播訊息,對應的訊息處理型別為 INotificationHandler

  1. //構建 通知訊息
  2. public class Ping : INotification { }
  3. //構建 訊息處理器1
  4. public class Pong1 : INotificationHandler<Ping> {
  5.    public Task Handle(Ping notification, CancellationToken cancellationToken) {
  6.        Debug.WriteLine("Pong 1");
  7.        return Task.CompletedTask;
  8.    }
  9. }
  10. //構建 訊息處理器2
  11. public class Pong2 : INotificationHandler<Ping> {
  12.    public Task Handle(Ping notification, CancellationToken cancellationToken) {
  13.        Debug.WriteLine("Pong 2");
  14.        return Task.CompletedTask;
  15.    }
  16. }
  17. //釋出訊息
  18. await mediator.Publish(new Ping());

原始碼解析

對MediatR有了基本認識後,我們來看看原始碼,研究下其如何實現的。

從程式碼圖中我們可以看到其核心的物件主要包括:

  1. IRequest Vs IRequestHandler
  2. INotification Vs INoticifaitonHandler
  3. IMediator Vs Mediator
  4. Unit
  5. IPipelineBehavior

IRequest Vs IRequestHandler

其中 IRequestINotification分別對應單播和多播訊息的抽象。
對於單播訊息可以決定是否需要傳回值選用不同的介面:

  • IRequest – 有傳回值
  • IRequest – 無傳回值

這裡就不得不提到其中巧妙的設計,透過引入結構型別 Unit來代表無傳回的情況。

  1. ///


  2. /// 代表無需傳回值的請求
  3. ///
  • public interface IRequest : IRequest<Unit> { }
  • ///


  • /// 代表有傳回值的請求
  • ///
  • /// Response type
  • public interface IRequest<out TResponse> : IBaseRequest { }
  • ///


  • /// Allows for generic type constraints of objects implementing IRequest or IRequest{TResponse}
  • ///
  • public interface IBaseRequest { }

同樣對於 IRequestHandler也是透過結構型別 Unit來處理不需要傳回值的情況。

  1. public interface IRequestHandler<in TRequest, TResponse>
  2.    where TRequest : IRequest<TResponse>
  3. {
  4.    Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
  5. }
  6. public interface IRequestHandler<in TRequest> : IRequestHandler<TRequest, Unit>
  7.    where TRequest : IRequest<Unit>
  8. {
  9. }

從上面我們可以看出定義了一個方法名為 Handle傳回值為 Task的包裝型別,而因此賦予了其具有以同步和非同步的方式進行訊息處理的能力。我們再看一下其以非同步方式進行訊息處理(無傳回值)的預設實現 AsyncRequestHandler

  1. public abstract class AsyncRequestHandler<TRequest> : IRequestHandler<TRequest>
  2.    where TRequest : IRequest
  3. {
  4.    async Task<Unit> IRequestHandler<TRequest, Unit>.Handle(TRequest request, CancellationToken cancellationToken)
  5.    {
  6.        await Handle(request, cancellationToken).ConfigureAwait(false);
  7.        return Unit.Value;
  8.    }
  9.    protected abstract Task Handle(TRequest request, CancellationToken cancellationToken);
  10. }

從上面的程式碼來看,我們很容易看出這是裝飾樣式的實現方式,是不是很巧妙的解決了無需傳回值的場景。

最後我們來看下結構型別 Unit的定義:

  1. public struct Unit : IEquatable<Unit>, IComparable<Unit>, IComparable
  2. {
  3.    public static readonly Unit Value = new Unit();
  4.    public static readonly Task<Unit> Task = System.Threading.Tasks.Task.FromResult(Value);
  5.    // some other code
  6. }

IMediator Vs Mediator

IMediator主要定義了兩個方法 SendPublish,分別用於傳送訊息和釋出通知。其預設實現Mediator中定義了兩個集合,分別用來儲存請求與請求處理的對映關係。

  1. //Mediator.cs
  2. //儲存request和requesthandler的對映關係,1對1。
  3. private static readonly ConcurrentDictionary<Type, object> _requestHandlers = new ConcurrentDictionary<Type, object>();
  4. //儲存notification與notificationhandler的對映關係,
  5. private static readonly ConcurrentDictionary<Type, NotificationHandlerWrapper> _notificationHandlers = new ConcurrentDictionary<Type, NotificationHandlerWrapper>();

這裡面其又引入了兩個包裝類: RequestHandlerWrapperNotificationHandlerWrapper。這兩個包裝類的作用就是用來傳遞 ServiceFactory委託進行依賴解析。

所以說 Mediator藉助 publicdelegateobjectServiceFactory(TypeserviceType);完成對Ioc容器的一層抽象。這樣就可以對接任意你喜歡用的Ioc容器,比如:Autofac、Windsor或ASP.NET Core預設的Ioc容器,只需要在註冊 IMediator時指定 ServiceFactory型別的委託即可,比如ASP.NET Core中的做法:

在使用ASP.NET Core提供的原生Ioc容器有些問題:Service registration crashes when registering generic handlers

IPipelineBehavior

MeidatR支援按需配置請求管道進行訊息處理。即支援在請求處理前和請求處理後新增額外行為。僅需實現以下兩個介面,並註冊到Ioc容器即可。

  • IRequestPreProcessor 請求處理前介面
  • IRequestPostProcessor 請求處理後介面

其中 IPipelineBehavior的預設實現: RequestPreProcessorBehaviorRequestPostProcessorBehavior分別用來處理所有實現 IRequestPreProcessorIRequestPostProcessor介面定義的管道行為。

而處理管道是如何構建的呢?我們來看下 RequestHandlerWrapperImpl的具體實現:

  1. internal class RequestHandlerWrapperImpl<TRequest, TResponse> : RequestHandlerWrapper<TResponse>
  2.    where TRequest : IRequest<TResponse>
  3. {
  4.    public override Task<TResponse> Handle(IRequest<TResponse> request, CancellationToken cancellationToken,
  5.        ServiceFactory serviceFactory)
  6.    {
  7.        Task<TResponse> Handler() => GetHandler<IRequestHandler<TRequest, TResponse>>(serviceFactory).Handle((TRequest) request, cancellationToken);
  8.        return serviceFactory
  9.            .GetInstances<IPipelineBehavior<TRequest, TResponse>>()
  10.            .Reverse()
  11.            .Aggregate((RequestHandlerDelegate<TResponse>) Handler, (next, pipeline) => () => pipeline.Handle((TRequest)request, cancellationToken, next))();
  12.    }
  13. }

就這樣一個簡單的函式,涉及的知識點還真不少,說實話我花了不少時間來理清這個邏輯。
那都涉及到哪些知識點呢?我們一個一個的來理一理。

  1. C# 7.0的新特性 – 區域性函式
  2. C# 6.0的新特性 – 運算式形式的成員函式
  3. Linq高階函式 – Aggregate
  4. 匿名委託
  5. 構造委託函式鏈

關於第1、2個知識點,請看下麵這段程式碼:

  1. public delegate int SumDelegate();//定義委託
  2. public static void Main()
  3. {
  4.    //區域性函式(在函式內部定義函式)
  5.    //運算式形式的成員函式, 相當於 int Sum() { return 1 + 2;}
  6.    int Sum() => 1 + 2;
  7.    var sumDelegate = (SumDelegate)Sum;//轉換為委託
  8.    Console.WriteLine(sumDelegate());//委託呼叫,輸出:3
  9. }

再看第4個知識點,匿名委託:

  1. public delegate int SumDelegate();
  2. SumDelegate delegater1 = delegate(){ return 1+2; }
  3. //也相當於
  4. SumDelegate delegater2 => 1+2;

下麵再來介紹一下 Aggregate這個Linq高階函式。 Aggregate是對一個集合序列進行累加操作,透過指定初始值,累加函式,以及結果處理函式完成計算。

函式定義:

  1. public static TResult Aggregate<TSource,TAccumulate,TResult>
  2. (this IEnumerable<TSource> source,
  3. TAccumulate seed,
  4. Func<TAccumulate,TSource,TAccumulate> func,
  5. Func<TAccumulate,TResult> resultSelector);

根據函式定義我們來寫個簡單的demo:

  1. var nums = Enumerable.Range(2, 3);//[2,3,4]
  2. // 計算1到5的累加之和,再將結果乘以2
  3. var sum = nums.Aggregate(1, (total, next) => total + next, result => result * 2);// 相當於 (((1+2)+3)+4)*2=20
  4. Console.WriteLine(sum);//20

和函式引數進行一一對應:

  1. seed : 1
  2. Func func : (total, next) => total + next
  3. Func resultSelector : result => result * 2

基於上面的認識,我們再來回過頭梳理一下 RequestHandlerWrapperImpl
其主要是藉助委託: publicdelegateTask<TResponse>RequestHandlerDelegate<TResponse>();來構造委託函式鏈來構建處理管道。

Aggregate函式瞭解後,我們就不難理解處理管道的構建了。請看下圖中的程式碼解讀:

那如何保證先執行 IRequestPreProcessor再執行 IRequestPostProcessor呢?
就是在註冊到Ioc容器時必須保證順序,先註冊 IRequestPreProcessor再註冊 IRequestPostProcessor。(這一點很重要!!!)

看到這裡有沒有想到ASP.NET Core中請求管道中中介軟體的構建呢?是不是很像俄羅斯套娃?先由內而外構建管道,再由外而內執行!

至此,MediatR的實現思路算是理清了。

應用場景

如文章開頭提到:MediatR是一種行程內訊息傳遞機制。 支援以同步或非同步的形式進行請求/響應,命令,查詢,通知和事件的訊息傳遞,並透過C#泛型支援訊息的智慧排程。

那麼我們就應該明白,其核心是訊息的解耦。因為我們幾乎都是在與訊息打交道,那因此它的應用場景就很廣泛,比如我們可以基於MediatR實現CQRS、EventBus等。

另外,還有一種應用場景:我們知道藉助依賴註入的好處是,就是解除依賴,但我們又不得不思考一個問題,隨著業務邏輯複雜度的增加,建構式可能要註入更多的服務,當註入的依賴太多時,其會導致建構式膨脹。比如:

  1. public DashboardController(
  2.    ICustomerRepository customerRepository,
  3.    IOrderService orderService,
  4.    ICustomerHistoryRepository historyRepository,
  5.    IOrderRepository orderRepository,
  6.    IProductRespoitory productRespoitory,
  7.    IRelatedProductsRepository relatedProductsRepository,
  8.    ISupportService supportService,
  9.    ILog logger
  10.    )  

如果藉助 MediatR進行改造,也許僅需註入 IMediatR就可以了。

  1. public DashboardController(IMediatR mediatr)  

總結

看到這裡,也許你應該明白MediatR實質上並不是嚴格意義上的中介者樣式實現,我更傾向於其是基於Ioc容器的一層抽象,根據請求定位相應的請求處理器進行訊息處理,也就是服務定位。
那到這裡似乎也恍然大悟MediatR這個筆誤可能是有意為之了。序員,你怎麼看?

參考資料:

CQRS/MediatR implementation patterns

MediatR when and why I should use it?

ABP CQRS 實現案例:基於 MediatR 實現

贊(0)

分享創造快樂