歡迎光臨
每天分享高質量文章

在 DotNetty 中實現同步請求

一、背景

DotNetty 本身是一個優秀的網絡通訊框架,不過它是基於異步事件驅動來處理另一端的響應,需要在單獨的 Handler 去處理相應的傳回結果。而在我們的實際使用當中,尤其是 客戶端程式 基本都是 請求-響應 模型,在發送了資料時候需要等待服務器的響應才能進行下一步操作,如果服務器傳回的是錯誤信息,則需要進行特殊的處理。

類似於下麵這種方式:

public async void Button1_Click()
{
    var result = await DotNettyClient.SendData("Hello");
    
    if(result == "Error")
    {
        throw new Exception("服務器傳回錯誤!");
    }
    
    Console.WriteLine($"Hello {result}");
}

二、解決思路

參閱了大部分資料之後,發現在 Java 的 Netty 當中可以使用 Future / Promise 來實現,那麼 C# 是否有類似的組件呢?答案是有的,他們對應的就是 Task 和 TaskCompletionSource,前者是給呼叫者的任務,而後者則是用於設置響應任務的結果。

那麼我們就可以這麼來處理,當客戶端發送請求時,附帶唯一的一個請求 ID,並將 TaskCompletionSource 放在一個請求佇列當中,之後傳回一個 Task。當客戶端接收到服務器響應的時候,通過 TaskCompletionSource 設置之前那個 Task 的結果,這樣我們接收到響應之後,就會從之前 await 的地方繼續執行。

這裡我自己的需求僅僅是類似於 同步阻塞式 的操作,所以我直接使用一個佇列來處理,並沒有用唯一的請求 ID 來表示不同的請求,因為我可以 保證在同一時間內有且僅有一個客戶端請求被髮起

三、代碼實現

實現起來超級簡單,只需要在發起請求的時候,創建一個 TaskCompletionSource 物件。這個泛型引數指的是你想要的傳回值型別,這裡我以 TResponse 代替,下麵的 DEMO 我會用 string 型別進行演示。

創建好一個 TaskCompletionSource 之後,在發送方法裡面,我們可以將其物件放在一個先進先出的佇列當中,然後將其 Task 屬性作為發送方法的傳回值。

我們再來到處理服務器響應的 Handler 當中,從佇列裡面拿去之前存放的 TaskCompletionSource 物件,呼叫其 SetResult() 方法,將具體響應進行設置。

通過以上的操作,我們在發送資料的時候,就可以使用 await 關鍵字等待服務端的響應,但不會阻塞執行緒,當客戶端接收到服務端響應時,就會恢復到之前 await 的位置繼續執行。

資料發送方法:

public static class DotNettyClient
{
    static DotNettyClient()
    {
        RequestQueue = new Queuestring>>();
    }
    
    public static Queuestring>> RequestQueue { get; set; }
    
    public static async Task<string> SendData(string data)
    {
        var resultTask = new TaskCompletionSource<string>();
        
        var buffer = new Unpooled.Buffer();
        buffer.WriteBytes(Encoding.UTF8.GetBytes(data));
        await _clientChannel.WriteAndFlushAsync(buffer);
        
        RequestQueue.Enqueue(resultTask);
        
        return await resultTask.Task;
    }
}

服務端響應處理:

public class ProtocolHandler : ChannelHandlerAdapter
{
    public override void ChannelRead(IChannelHandlerContext context, object message)
    {
        if(message is string response)
        {
            if(!DotNettyClient.RequestQueue.TryDequeue(out TaskCompletionSource<string> result)) return;
            result.SetResult(response);
        }
    }
}

這裡我就不再編寫解析器,主要說明一下代碼的思路,下麵在使用的時候就如同第一節說的一樣,直接使用 await 關鍵字等待響應結果即可。

四、缺陷

在這裡我並沒有展示多個異步請求的情況,如果是用戶同時發起多個請求的時候,你可以通過資料的唯一 ID 來標識每一個請求,這樣在接收服務端響應的時候就能處理這種情況了。

五、參考資料

  • DotNetty Github Issues
  • dotBlogs – 《[C#] 將事件驅動 (event-driven) 的樣式改為可等候的方法 (awaitable method)》
  • dotBlogs -《[C#.NET][TPL] 利用 TaskCompletionSource 將 EAP 轉換成 TAP》
  • HK-Zhang -《TaskCompletionSource的使用場景》
赞(0)

分享創造快樂