歡迎光臨
每天分享高質量文章

使用高效能Pipelines構建.NET通訊程式

.NET Standard支援一組新的API,System.Span, System.Memory,還有System.IO.Pipelines。這幾個新的API極大了提升了.NET程式的效能,將來.NET很多基礎API都會使用它們進行重寫。

Pipelines旨在解決.NET編寫Socket通訊程式時的很多困難,相信讀者也對此不勝其煩,使用stream模型進行程式設計,就算能夠解決,也是實在麻煩。

System.IO.Pipelines使用簡單的記憶體片段來管理資料,可以極大的簡化編寫程式的過程。關於Pipelines的詳細介紹,可以看看這裡。現在ASP.NET Core中使用的Kestrel已經在使用這個API。(話說這個東西貌似就是Kestrel團隊搞出來的。)

可能是直接需要用Socket場景有限(物聯網用的還挺多的),Pipelines相關的資料感覺不是很多。官方給出的示例是基於ASCII協議的,有固定結尾的協議,這裡我以物聯網裝置常用的BINARY二進位制自定義協議為例,講解基於Pipelines的程式套路。

與基於Stream的方式不同,pipelines提供一個pipe,用於儲存資料,pipe中間儲存的資料有點連結串列的感覺,可以基於SequencePosition進行slice操作,這樣就能得到一個ReadOnlySequence物件。reader可以進行自定義操作,併在操作完成之後告訴pipe已經處理了多少資料,整個過程是不需要進行記憶體複製操作的,因此效能得到了提升,還少了很多麻煩。可以簡單理解作為伺服器端,流程:

接受資料迴圈:接到資料->放pipe裡面->告訴pipe放了多少資料
處理資料迴圈:在pipe裡面找一條完整資料->交給處理流程->告訴pipe處理了多少資料

有一款裝置,binary協議,資料包開頭0x75, 0xbd, 0x7e, 0x97一共4個位元組,隨後跟資料包長度2個位元組(固定2400位元組,不固定長度也可以參照),隨後是資料區。在裝置連線成功之後,資料主動從裝置傳送到PC。

雖然是.NET Core平臺的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安裝,直接

install-package system.io.pipelines

進行安裝就可以了。Socket相關處理的程式碼不再寫了,只列關鍵的。

程式碼第一步是宣告pipe。

private async void InitPipe(Socket socket)
{
    Pipe pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(socket, pipe.Reader);

    await Task.WhenAll(reading, writing);
}

pipe有reader還有一個writer,reader負責讀取pipe資料,主要用在資料處理迴圈,writer負責將資料寫入pipe,主要用在資料接受迴圈。


private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    
    const int minimumBufferSize = 1024 * 1024;

    while (running)
    {
        try
        {
            
            Memory<byte> memory = writer.GetMemory(minimumBufferSize);

            
            if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
            {
                throw new InvalidOperationException("Buffer backed by array was expected");
            }
            
            int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }

            
            writer.Advance(bytesRead);
        }
        catch
        {
            break;
        }

        
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    
    writer.Complete();
}


private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
    while (running)
    {
        
        ReadResult result = await reader.ReadAsync();
        
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;

        do
        {
            
            position = buffer.PositionOf((byte)0x75);
            if (position != null)
            {
                
                
                var headtoCheck = buffer.Slice(position.Value, 4).ToArray();
                
                if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
                {
                    
                    if (buffer.Slice(position.Value).Length >= 2400)
                    {
                        
                        var mes = buffer.Slice(position.Value, 2400);
                        
                        await ProcessMessage(mes.ToArray());
                        
                        var next = buffer.GetPosition(2400, position.Value);
                        
                        buffer = buffer.Slice(next);
                    }
                    else
                    {
                        
                        break;
                    }
                }
                else
                {
                    
                    var next = buffer.GetPosition(1, position.Value);
                    buffer = buffer.Slice(next);
                }
            }
        }
        while (position != null);

        
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
        {
            break;
        }
    }

    reader.Complete();
}

以上程式碼基本解決了以下問題:

  • 資料接收不完整,找不到開頭結尾,導致資料大量丟棄,或者自己維護一個queue的程式碼複雜性
  • 資料接收與處理的同步問題
  • 一次性收到多條資料的情況

本文只是解釋了pipeline處理的樣式,對於茫茫多的ToArray方法,可以使用基於Span的操作進行最佳化(有時間就來填坑)。另外,如果在await ProcessMessage(mes.ToArray());這裡,直接使用Task.Run(()=>ProcessMessage(mes);代替的話,實測會出現莫名其妙的問題,很有可能是pipe執行快,在系統排程Task之前,已經將記憶體釋放導致的,如果需要最佳化這一塊的話,需要格外註意。

已同步到看一看
贊(0)

分享創造快樂