歡迎光臨
每天分享高質量文章

分散式訊息佇列 RocketMQ 原始碼分析 —— RPC 通訊(二)

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 https://mp.weixin.qq.com/s/iJww26xFSwEytoz8NjpFRw 「胡宗棠」歡迎轉載,保留摘要,謝謝!

  • 一、為何要使用Netty作為高效能的通訊庫?

  • 二、RocketMQ中RPC通訊的Netty多執行緒模型

    • 2.1、Netty的Reactor多執行緒模型設計概念與簡述

    • 2.2、RocketMQ中RPC通訊的1+N+M1+M2的Reactor多執行緒設計與實現

  • 三、總結

  • 666. 彩蛋


文章摘要:如何設計RPC通訊層模型是任何一款效能強勁的MQ所要重點考慮的問題

在(一)篇中主要介紹了RocketMQ的協議格式,訊息編解碼,通訊方式(同步/非同步/單向)、訊息傳送/接收以及非同步回呼的主要通訊流程。而本篇將主要對RocketMQ訊息佇列RPC通訊部分的Netty多執行緒模型進行重點介紹。

一、為何要使用Netty作為高效能的通訊庫?

在看RocketMQ的RPC通訊部分時候,可能有不少同學有這樣子的疑問,RocketMQ為何要選擇Netty而不直接使用JDK的NIO進行網路程式設計呢?這裡有必要先來簡要介紹下Netty。
Netty是一個封裝了JDK的NIO庫的高效能網路通訊開源框架。它提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。
下麵主要列舉了下一般系統的RPC通訊模組會選擇Netty作為底層通訊庫的理由(作者認為RocketMQ的RPC同樣也是基於此選擇了Netty):

(1)Netty的程式設計API使用簡單,開發門檻低,無需程式設計者去關註和瞭解太多的NIO程式設計模型和概念;

(2)對於程式設計者來說,可根據業務的要求進行定製化地開發,透過Netty的ChannelHandler對通訊框架進行靈活的定製化擴充套件;

(3)Netty框架本身支援拆包/解包,異常檢測等機制,讓程式設計者可以從JAVA NIO的繁瑣細節中解脫,而只需要關註業務處理邏輯;

(4)Netty解決了(準確地說應該是採用了另一種方式完美規避了)JDK NIO的Bug(Epoll bug,會導致Selector空輪詢,最終導致CPU 100%);

(5)Netty框架內部對執行緒,selector做了一些細節的最佳化,精心設計的reactor多執行緒模型,可以實現非常高效地併發處理;

(6)Netty已經在多個開源專案(Hadoop的RPC框架avro使用Netty作為通訊框架)中都得到了充分驗證,健壯性/可靠性比較好。

二、RocketMQ中RPC通訊的Netty多執行緒模型

RocketMQ的RPC通訊部分採用了“1+N+M1+M2”的Reactor多執行緒樣式,對網路通訊部分進行了一定的擴充套件與最佳化,這一節主要讓我們來看下這一部分的具體設計與實現內容。

2.1、Netty的Reactor多執行緒模型設計概念與簡述

這裡有必要先來簡要介紹下Netty的Reactor多執行緒模型。Reactor多執行緒模型的設計思想是分而治之+事件驅動。

(1)分而治之
一般來說,一個網路請求連線的完整處理過程可以分為接受(accept)、資料讀取(read)、解碼/編碼(decode/encode)、業務處理(process)、傳送響應(send)這幾步驟。Reactor模型將每個步驟都對映成為一個任務,服務端執行緒執行的最小邏輯單元不再是一次完整的網路請求,而是這個任務,且採用以非阻塞方式執行。

(2)事件驅動
每個任務對應特定網路事件。當任務準備就緒時,Reactor收到對應的網路事件通知,並將任務分發給系結了對應網路事件的Handler執行。

2.2、RocketMQ中RPC通訊的1+N+M1+M2的Reactor多執行緒設計與實現

(1)RocketMQ中RPC通訊的Reactor多執行緒設計與流程
RocketMQ的RPC通訊採用Netty元件作為底層通訊庫,同樣也遵循了Reactor多執行緒模型,同時又在這之上做了一些擴充套件和最佳化。下麵先給出一張RocketMQ的RPC通訊層的Netty多執行緒模型框架圖,讓大家對RocketMQ的RPC通訊中的多執行緒分離設計有一個大致的瞭解。

RocketMQ的RPC通訊層—1+N+M1+M2模型.png


從上面的框圖中可以大致瞭解RocketMQ中NettyRemotingServer的Reactor 多執行緒模型。一個 Reactor 主執行緒(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網路連線請求,建立好連線後丟給Reactor 執行緒池(eventLoopGroupSelector,即為上面的“N”,原始碼中預設設定為3),它負責將建立好連線的socket 註冊到 selector上去(RocketMQ的原始碼中會自動根據OS的型別選擇NIO和Epoll,也可以透過引數配置),然後監聽真正的網路資料。拿到網路資料後,再丟給Worker執行緒池(defaultEventExecutorGroup,即為上面的“M1”,原始碼中預設設定為8)
為了更為高效的處理RPC的網路請求,這裡的Worker執行緒池是專門用於處理Netty網路通訊相關的(包括編碼/解碼、空閑連結管理、網路連線管理以及網路請求處理)。而處理業務操作放在業務執行緒池中執行(這個內容在“RocketMQ的RPC通訊(一)篇”中也有提到),根據 RomotingCommand 的業務請求碼code去processorTable這個本地快取變數中找到對應的 processor,然後封裝成task任務後,提交給對應的業務processor處理執行緒池來執行(sendMessageExecutor,以傳送訊息為例,即為上面的 “M2”)
下麵以表格的方式列舉了下上面所述的“1+N+M1+M2”Reactor多執行緒模型

執行緒數 執行緒名 執行緒具體說明
1 NettyBoss_%d
N NettyServerEPOLLSelector_%d_%d
M1 NettyServerCodecThread_%d
M2 RemotingExecutorThread_%d

(2)RocketMQ中RPC通訊的Reactor多執行緒的程式碼具體實現
說完了Reactor多執行緒整體的設計與流程,大家應該就對RocketMQ的RPC通訊的Netty部分有了一個比較全面的理解了,那接下來就從原始碼上來看下一些細節部分(在看該部分程式碼時候需要讀者對JAVA NIO和Netty的相關概念與技術點有所瞭解)。
在NettyRemotingServer的實體初始化時,會初始化各個相關的變數包括serverBootstrap、nettyServerConfig引數、channelEventListener監聽器並同時初始化eventLoopGroupBoss和eventLoopGroupSelector兩個Netty的EventLoopGroup執行緒池(這裡需要註意的是,如果是Linux平臺,並且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則,就用Java NIO的NioEventLoopGroup。),具體程式碼如下:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener)
 
{
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
      //省略部分程式碼
      //初始化時候nThreads設定為1,說明RemotingServer端的Disptacher連結管理和分發請求的執行緒為1,用於接收客戶端的TCP連線
        this.eventLoopGroupBoss = new NioEventLoopGroup(1new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d"this.threadIndex.incrementAndGet()));
            }
        });

        /**
         * 根據配置設定NIO還是Epoll來作為Selector執行緒池
         * 如果是Linux平臺,並且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則,就用Java NIO的NioEventLoopGroup。
         *
         */

        if (useEpoll()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        //省略部分程式碼

在NettyRemotingServer實體初始化完成後,就會將其啟動。Server端在啟動階段會將之前實體化好的1個acceptor執行緒(eventLoopGroupBoss),N個IO執行緒(eventLoopGroupSelector),M1個worker 執行緒(defaultEventExecutorGroup)系結上去。前面部分也已經介紹過各個執行緒池的作用了。
這裡需要說明的是,Worker執行緒拿到網路資料後,就交給Netty的ChannelPipeline(其採用責任鏈設計樣式),從Head到Tail的一個個Handler執行下去,這些 Handler是在建立NettyRemotingServer實體時候指定的。NettyEncoder和NettyDecoder 負責網路傳輸資料和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼得到的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 還是 response來進行相應處理,根據業務請求碼封裝成不同的task任務後,提交給對應的業務processor處理執行緒池處理。

 @Override
    public void start() {
        //預設的處理執行緒池組,使用預設的處理執行緒池組用於處理後面的多個Netty Handler的邏輯操作

        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
        /**
         * 首先來看下 RocketMQ NettyServer 的 Reactor 執行緒模型,
         * 一個 Reactor 主執行緒負責監聽 TCP 連線請求;
         * 建立好連線後丟給 Reactor 執行緒池,它負責將建立好連線的 socket 註冊到 selector
         * 上去(這裡有兩種方式,NIO和Epoll,可配置),然後監聽真正的網路資料;
         * 拿到網路資料後,再丟給 Worker 執行緒池;
         *
         */

        //RocketMQ-> Java NIO的1+N+M模型:1個acceptor執行緒,N個IO執行緒,M1個worker 執行緒。
        ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        //服務端處理客戶端連線請求是順序處理的,所以同一時間只能處理一個客戶端連線,多個客戶端來的時候,服務端將不能處理的客戶端連線請求放在佇列中等待處理,backlog引數指定了佇列的大小
                        .option(ChannelOption.SO_REUSEADDR, true)//這個引數表示允許重覆使用本地地址和埠
                        .option(ChannelOption.SO_KEEPALIVE, false)//當設定該選項以後,如果在兩小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文。
                        .childOption(ChannelOption.TCP_NODELAY, true)//該引數的作用就是禁止使用Nagle演演算法,使用於小資料即時傳輸
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//這兩個引數用於操作接收緩衝區和傳送緩衝區
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {

                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                        .addLast(defaultEventExecutorGroup,
                                                new NettyEncoder(),//rocketmq解碼器,他們分別改寫了父類的encode和decode方法
                                                new NettyDecoder(),//rocketmq編碼器
                                                new IdleStateHandler(00, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自帶的心跳管理器
                                                new NettyConnectManageHandler(),//連線管理器,他負責捕獲新連線、連線斷開、異常等事件,然後統一排程到NettyEventExecuter處理器處理。
                                                new NettyServerHandler()//當一個訊息經過前面的解碼等步驟後,然後排程到channelRead0方法,然後根據訊息型別進行分發
                                        );
                            }
                        });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        //定時掃描responseTable,獲取傳回結果,並且處理超時
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 31000);
    }

從上面的描述中可以概括得出RocketMQ的RPC通訊部分的Reactor執行緒池模型框圖。

RocketMQ的RPC通訊層—Reactor執行緒池.png


整體可以看出RocketMQ的RPC通訊藉助Netty的多執行緒模型,其服務端監聽執行緒和IO執行緒分離,同時將RPC通訊層的業務邏輯與處理具體業務的執行緒進一步相分離。時間可控的簡單業務都直接放在RPC通訊部分來完成,複雜和時間不可控的業務提交至後端業務執行緒池中處理,這樣提高了通訊效率和MQ整體的效能。(ps:其中抽象出NioEventLoop來表示一個不斷迴圈執行處理任務的執行緒,每個NioEventLoop有一個selector,用於監聽系結在其上的socket鏈路。)

三、總結

仔細閱讀RocketMQ的過程中收穫了很多關於網路通訊設計技術和知識點。對於剛接觸開源版的RocketMQ的童鞋來說,想要自己掌握RPC通訊部分的各個技術知識點,還需要不斷地使用本地環境進行debug除錯和閱讀原始碼反覆思考。限於筆者的才疏學淺,對本文內容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。後續還會陸續釋出RocketMQ其他模組(Client、Broker和NameServer等)的相關技術文章,敬請關註。
在此順便為自己打個Call,有興趣的朋友可以關註下我的個人公眾號:“匠心獨運的部落格”,對於Java併發、Spring、資料庫和訊息佇列的一些細節、問題的文章將會在這個公眾號上釋出,歡迎交流與討論。

666. 彩蛋




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

贊(0)

分享創造快樂