歡迎光臨
每天分享高質量文章

分散式訊息佇列 RocketMQ 原始碼分析 —— RPC 通訊(一)

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 


摘要: 原創出處 https://mp.weixin.qq.com/s/V_nOevq_2cbrH2_zgOSP-w 「胡宗棠」歡迎轉載,保留摘要,謝謝!

  • 一、RocketMQ中Remoting通訊模組概覽

  • 二、RocketMQ中Remoting通訊模組的具體實現

    • 1、Remoting通訊模組的類結構圖

    • 2、訊息的協議設計與編碼解碼

    • 3、訊息的通訊方式和通訊流程

  • 三、總結

  • 666. 彩蛋


文章摘要:借用小廝的一句話“訊息佇列的本質在於訊息的傳送、儲存和接收”。那麼,對於一款訊息佇列來說,如何做到訊息的高效傳送與接收是重點和關鍵**

一、RocketMQ中Remoting通訊模組概覽

RocketMQ訊息佇列的整體部署架構如下圖所示:

RocketMQ整體的架構叢集圖.jpg

先來說下RocketMQ訊息佇列叢集中的幾個角色:

(1)NameServer:在MQ叢集中做的是做命名服務,更新和路由發現 broker服務;

(2)Broker-Master:broker 訊息主機伺服器;

(3)Broker-Slave:broker 訊息從機伺服器;

(4)Producer:訊息生產者;

(5)Consumer:訊息消費者;

其中,RocketMQ叢集的一部分通訊如下:

(1)Broker啟動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定期向NameServer上報Topic路由資訊;

(2)訊息生產者Producer作為客戶端傳送訊息時候,需要根據Msg的Topic從本地快取的TopicPublishInfoTable獲取路由資訊。如果沒有則更新路由資訊會從NameServer上重新拉取;

(3)訊息生產者Producer根據(2)中獲取的路由資訊選擇一個佇列(MessageQueue)進行訊息傳送;Broker作為訊息的接收者收訊息並落盤儲存; 從上面(1)~(3)中可以看出在訊息生產者, Broker和NameServer之間都會發生通訊(這裡只說了MQ的部分通訊),因此如何設計一個良好的網路通訊模組在MQ中至關重要,它將決定RocketMQ叢集整體的訊息傳輸能力與最終的效能。 rocketmq-remoting 模組是 RocketMQ訊息佇列中負責網路通訊的模組,它幾乎被其他所有需要網路通訊的模組(諸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依賴和取用。為了實現客戶端與伺服器之間高效的資料請求與接收,RocketMQ訊息佇列自定義了通訊協議併在Netty的基礎之上擴充套件了通訊模組。ps:鑒於RocketMQ的通訊模組是建立在Netty基礎之上的,因此在閱讀RocketMQ的原始碼之前,讀者最好先對Netty的多執行緒模型、JAVA NIO模型均有一定的瞭解,這樣子理解RocketMQ原始碼會較為快一些。 作者閱讀的RocketMQ版本是4.2.0, 依賴的netty版本是4.0.42.Final. RocketMQ的程式碼結構圖如下:

RocketMQ的Remoting原始碼目錄結構.png

原始碼部分主要可以分為

rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模組

,通訊框架就封裝在rocketmq-remoting模組中。 本文主要從RocketMQ的協議格式,訊息編解碼,通訊方式(同步/非同步/單向)和具體的傳送/接收訊息的通訊流程來進行闡述等。

二、RocketMQ中Remoting通訊模組的具體實現

1、Remoting通訊模組的類結構圖

RocketMQ的Remoting模組類結構圖.png

從類層次結構來看:

(1)RemotingService

:為最上層的介面,提供了三個方法:

void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);

(2)RemotingClient/RemotingSever:兩個介面繼承了最上層介面—RemotingService,分別各自為Client和Server提供所必需的方法,下麵所列的是RemotingServer的方法:

/**
     * 同RemotingClient端一樣
     *
     * @param requestCode
     * @param processor
     * @param executor
     */

    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor)
;

    /**
     * 註冊預設的處理器
     *
     * @param processor
     * @param executor
     */

    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    /**
     * 根據請求code來獲取不同的處理Pair
     *
     * @param requestCode
     * @return
     */

    Pair getProcessorPair(final int requestCode);

    /**
     * 同RemotingClient端一樣,同步通訊,有傳回RemotingCommand
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */

    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
 throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException
;

    /**
     * 同RemotingClient端一樣,非同步通訊,無傳回RemotingCommand
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */

    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
 throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException
;

    /**
     * 同RemotingClient端一樣,單向通訊,諸如心跳包
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */

    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException
;

(3)NettyRemotingAbstract:Netty通訊處理的抽象類,定義並封裝了Netty處理的公共處理方法; (4)NettyRemotingClient/NettyRemotingServer:分別實現了RemotingClient和RemotingServer, 都繼承了NettyRemotingAbstract抽象類。RocketMQ中其他的元件(如client、nameServer、broker在進行訊息的傳送和接收時均使用這兩個元件)

2、訊息的協議設計與編碼解碼

在Client和Server之間完成一次訊息傳送時,需要對傳送的訊息進行一個協議約定,因此就有必要自定義RocketMQ的訊息協議。同時,為了高效地在網路中傳輸訊息和對收到的訊息讀取,就需要對訊息進行編解碼。在RocketMQ中,RemotingCommand這個類在訊息傳輸過程中對所有資料內容的封裝,不但包含了所有的資料結構,還包含了編碼解碼操作。 RemotingCommand類的部分成員變數如下:

Header欄位 型別 Request說明 Response說明
code int 請求操作碼,應答方根據不同的請求碼進行不同的業務處理 應答響應碼。0表示成功,非0則表示各種錯誤
language LanguageCode 請求方實現的語言 應答方實現的語言
version int 請求方程式的版本 應答方程式的版本
opaque int 相當於reqeustId,在同一個連線上的不同請求標識碼,與響應訊息中的相對應 應答不做修改直接傳回
flag int 區分是普通RPC還是onewayRPC得標誌 區分是普通RPC還是onewayRPC得標誌
remark String 傳輸自定義文字資訊 傳輸自定義文字資訊
extFields HashMap 請求自定義擴充套件資訊 響應自定義擴充套件資訊

這裡展示下Broker向NameServer傳送一次心跳註冊的報文:

[
code=103,//這裡的103對應的code就是broker向nameserver註冊自己的訊息
language=JAVA,
version=137,
opaque=58,//這個就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON

下麵來看下RocketMQ通訊協議的格式:

RocketMQ中Remoting協議格式.png

可見傳輸內容主要可以分為以下4部分:

(1)訊息長度

:總長度,四個位元組儲存,佔用一個int型別;

(2)序列化型別&訊息頭長度

:同樣佔用一個int型別,第一個位元組表示序列化型別,後面三個位元組表示訊息頭長度;

(3)訊息頭資料

:經過序列化後的訊息頭資料;

(4)訊息主體資料

:訊息主體的二進位制位元組資料內容; 訊息的編碼和解碼分別在RemotingCommand類的encode和decode方法中完成,下麵是訊息編碼encode方法的具體實現:

public ByteBuffer encode() {
    // 1> essay-header length size
    int length = 4;    //訊息總長度

    // 2> essay-header data length
    //將訊息頭編碼成byte[]
    byte[] essay-headerData = this.essay-headerEncode();
    //計算頭部長度
    length += essay-headerData.length;

    // 3> body data length
    if (this.body != null) {
        //訊息主體長度
        length += body.length;
    }
    //分配ByteBuffer, 這邊加了4,
    //這是因為在訊息總長度的計算中沒有將儲存頭部長度的4個位元組計算在內
    ByteBuffer result = ByteBuffer.allocate(4 + length);

    // length
    //將訊息總長度放入ByteBuffer
    result.putInt(length);

    // essay-header length
    //將訊息頭長度放入ByteBuffer
    result.put(markProtocolType(essay-headerData.length, serializeTypeCurrentRPC));

    // essay-header data
    //將訊息頭資料放入ByteBuffer
    result.put(essay-headerData);

    // body data;
    if (this.body != null) {
        //將訊息主體放入ByteBuffer
        result.put(this.body);
    }
    //重置ByteBuffer的position位置
    result.flip();

    return result;
}

    /**
     * markProtocolType方法是將RPC型別和essay-headerData長度編碼放到一個byte[4]陣列中
     *
     * @param source
     * @param type
     * @return
     */

    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        //右移16位後再和255與->“16-24位”
        result[1] = (byte) ((source >> 16) & 0xFF);
        //右移8位後再和255與->“8-16位”
        result[2] = (byte) ((source >> 8) & 0xFF);
        //右移0位後再和255與->“8-0位”
        result[3] = (byte) (source & 0xFF);
        return result;
    }

訊息解碼decode方法是編碼的逆向過程,其具體實現如下:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //獲取byteBuffer的總長度
        int length = byteBuffer.limit();

        //獲取前4個位元組,組裝int型別,該長度為總長度
        int oriHeaderLen = byteBuffer.getInt();

        //獲取訊息頭的長度,這裡和0xFFFFFF做與運算,編碼時候的長度即為24位
        int essay-headerLength = getHeaderLength(oriHeaderLen);

        byte[] essay-headerData = new byte[essay-headerLength];
        byteBuffer.get(essay-headerData);

        RemotingCommand cmd = essay-headerDecode(essay-headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - essay-headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

3、訊息的通訊方式和通訊流程

在RocketMQ訊息佇列中支援通訊的方式主要有以下三種:

(1)同步(sync)

(2)非同步(async)

(3)單向(oneway)

其中“同步”通訊樣式相對簡單,一般用在傳送心跳包場景下,無需關註其Response。本文將主要介紹RocketMQ的非同步通訊流程(限於篇幅,讀者可以按照同樣的樣式進行分析同步通訊流程)。 下麵先給出了RocketMQ非同步通訊的整體流程圖:

RocketMQ非同步通訊的整體時序圖.png

下麵兩小節內容主要介紹了Client端傳送請求訊息、Server端接收訊息的具體實現並簡要分析的Client端的回呼。

3.1、Client傳送請求訊息的具體實現

當客戶端呼叫非同步通訊介面—invokeAsync時候,先由RemotingClient的實現類—NettyRemotingClient根據addr獲取相應的channel(如果本地快取中沒有則建立),隨後呼叫invokeAsyncImpl方法,將資料流轉給抽象類NettyRemotingAbstract處理(真正做完傳送請求動作的是在NettyRemotingAbstract抽象類的invokeAsyncImpl方法裡面)。具體傳送請求訊息的原始碼如下所示:

    /**
     * invokeAsync(非同步呼叫)
     *
     */

    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)

        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException 
{
        //相當於request ID, RemotingCommand會為每一個request產生一個request ID, 從0開始, 每次加1

        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //根據request ID構建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //將ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel傳送請求資料
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //訊息傳送後執行
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //如果傳送訊息成功給Server,那麼這裡直接Set後return
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            //執行回呼
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            //釋放訊號量
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel  failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                //異常處理
                responseFuture.release();
                log.warn("send a request command to channel  + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

在Client端傳送請求訊息時有個比較重要的資料結構需要註意下:

(1)responseTable—儲存請求碼與響應關聯對映

protected final ConcurrentHashMap/* opaque */, ResponseFuture> responseTable

opaque表示請求發起方在同個連線上不同的請求標識程式碼,每次傳送一個訊息的時候,可以選擇同步阻塞/非同步非阻塞的方式。無論是哪種通訊方式,都會儲存請求操作碼至ResponseFuture的Map對映—responseTable中。

(2)ResponseFuture—儲存傳迴響應(包括回呼執行方法和訊號量)

public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
        SemaphoreReleaseOnlyOnce once)
 
{
        this.opaque = opaque;
        this.timeoutMillis = timeoutMillis;
        this.invokeCallback = invokeCallback;
        this.once = once;
    }

對於同步通訊來說,第三、四個引數為null;而對於非同步通訊來說,invokeCallback是在收到訊息響應的時候能夠根據responseTable找到請求碼對應的回呼執行方法,semaphore引數用作流控,當多個執行緒同時往一個連線寫資料時可以透過訊號量控制permit同時寫許可的數量。

(3)異常傳送流程處理—定時掃描responseTable本地快取 在傳送訊息時候,如果遇到異常情況(比如服務端沒有response傳回給客戶端或者response因網路而丟失),上面所述的responseTable的本地快取Map將會出現堆積情況。這個時候需要一個定時任務來專門做responseTable的清理回收。在RocketMQ的客戶端/服務端啟動時候會產生一個頻率為1s呼叫一次來的定時任務檢查所有的responseTable快取中的responseFuture變數,判斷是否已經得到傳回, 併進行相應的處理。

public void scanResponseTable() {
        final List rfList = new LinkedList();
        Iterator> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

3.2、Server端接收訊息併進行處理的具體實現

Server端接收訊息的處理入口在NettyServerHandler類的channelRead0方法中,其中呼叫了processMessageReceived方法(這裡省略了Netty服務端訊息流轉的大部分流程和邏輯)。其中服務端最為重要的處理請求方法實現如下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根據RemotingCommand中的code獲取processor和ExecutorService
    final Pair matched = this.processorTable.get(cmd.getCode());
    final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor處理請求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封裝requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想執行緒池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //構建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

上面的請求處理方法中根據RemotingCommand的請求業務碼來匹配到相應的業務處理器;然後生成一個新的執行緒提交至對應的業務執行緒池進行非同步處理。

(1)processorTable—請求業務碼與業務處理、業務執行緒池的對映變數

    protected final HashMap/* request code */, Pair> processorTable =
        new HashMap>(64);

我想RocketMQ這種做法是為了給不同型別的請求業務碼指定不同的處理器Processor處理,同時訊息實際的處理並不是在當前執行緒,而是被封裝成task放到業務處理器Processor對應的執行緒池中完成非同步執行。(在RocketMQ中能看到很多地方都是這樣的處理,這樣的設計能夠最大程度的保證非同步,保證每個執行緒都專註處理自己負責的東西

3.3、Client端非同步回呼執行的實現分析

看到這裡可能有一些同學會疑問Client端的非同步回呼究竟在哪裡執行的?從上面“RocketMQ非同步通訊的整體時序圖”來看,回呼執行處理的流程的確是放在了Client端來完成,而rocketmq-remoting通訊模組中只是給非同步回呼處理提供了介面。這裡可以看下rocketmq-client模組非同步傳送訊息的部分程式碼(限於篇幅也只是列舉了非同步回呼執行的部分程式碼):

private void sendMessageAsync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final AtomicInteger times,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    )
 throws InterruptedException, RemotingException 
{
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                //先從Server端傳回的responseFuture變數中獲取RemotingCommand的值
                RemotingCommand response = responseFuture.getResponseCommand();
              if (null == sendCallback && response != null) {

                    try {
                        //Client端處理髮送訊息的Reponse傳回(包括對訊息傳回體的頭部進行解碼,
                        //取得“topic”、“BrokerName”、“QueueId”等值)
                        //隨後構建sendResult物件並設定Context背景關係中
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }
            //省略其他部分程式碼
            //......
}

這裡需要結合3.1節的內容和NettyRemotingAbstract抽象類的processResponseCommand方法,便可以明白Client端實現非同步回呼的大致流程了。在Client端傳送非同步訊息時候(rocketmq-client模組最終呼叫sendMessageAsync方法時),會將InvokeCallback的介面註入,而在Server端的非同步執行緒由上面所講的業務執行緒池真正執行後,傳回response給Client端時候才會去觸發執行。NettyRemotingAbstract抽象類的processResponseCommand方法的具體程式碼如下:

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        //從RemotingCommand中獲取opaque值
        final int opaque = cmd.getOpaque();‘
        //從本地快取的responseTable這個Map中取出本次非同步通訊連線對應的ResponseFuture變數
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                //在這裡真正去執行Client註入進來的非同步回呼方法
                executeInvokeCallback(responseFuture);
            } else {
                //否則釋放responseFuture變數
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

三、總結

剛開始看RocketMQ原始碼—RPC通訊模組可能覺得略微有點複雜,但是隻要能夠抓住Client端傳送請求訊息、Server端接收訊息並處理的流程以及回呼過程來分析和梳理,那麼整體來說並不複雜。RPC通訊部分也是RocketMQ原始碼中最重要的部分之一,想要對其中的全過程和細節有更為深刻的理解,還需要多在本地環境Debug和分析對應的日誌。同時,鑒於篇幅所限,本篇還沒有來得及對RocketMQ的Netty多執行緒模型進行介紹,將在訊息中介軟體—RocketMQ的RPC通訊(二)篇中來做詳細地介紹。 在此順便為自己打個Call,有興趣的朋友可以關註下我的個人公眾號:“匠心獨運的部落格”,對於Java併發、Spring、資料庫和訊息佇列的一些細節、問題的文章將會在這個公眾號上釋出,歡迎交流與討論。

666. 彩蛋




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

贊(0)

分享創造快樂