原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp
(建議使用原文地址閱讀:1、閱讀體驗;2、程式碼排版混亂因而省略。)RocketMQ 帶註釋原始碼地址 :https://github.com/YunaiV/incubator-rocketmq
?本系列每 1-2 周更新一篇,歡迎訂閱、關註、收藏 公眾號
- 
1、概述
 - 
2、Producer 傳送訊息
 - 
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
 - 
MQFaultStrategy
 - 
DefaultMQProducerImpl#sendKernelImpl()
 - 
MQFaultStrategy
 - 
LatencyFaultTolerance
 - 
LatencyFaultToleranceImpl
 - 
FaultItem
 - 
DefaultMQProducer#send(Message)
 - 
DefaultMQProducerImpl#sendDefaultImpl()
 - 
3、Broker 接收訊息
 - 
AbstractSendMessageProcessor#msgCheck
 - 
SendMessageProcessor#sendMessage
 - 
DefaultMessageStore#putMessage
 - 
4、某種結尾
 
1、概述
- 
Producer傳送訊息。主要是同步傳送訊息原始碼,涉及到 非同步/Oneway傳送訊息,事務訊息會跳過。 - 
Broker接收訊息。(儲存訊息在《RocketMQ 原始碼分析 —— Message 儲存》解析) 
2、Producer 傳送訊息
DefaultMQProducer#send(Message)
  // .... 省略程式碼
- 
說明:傳送同步訊息,
DefaultMQProducer#send(Message)對DefaultMQProducerImpl#send(Message)進行封裝。 
DefaultMQProducerImpl#sendDefaultImpl()
 // .... 省略程式碼
- 
說明 :傳送訊息。步驟:獲取訊息路由資訊,選擇要傳送到的訊息佇列,執行訊息傳送核心方法,並對傳送結果進行封裝傳回。
 - 
第 1 至 7 行:對
sendsendDefaultImpl(...)進行封裝。 - 
第 20 行 :
invokeID僅僅用於列印日誌,無實際的業務用途。 - 
第 25 行 :獲取 Topic路由資訊, 詳細解析見:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
 - 
第 30 & 34 行 :計算呼叫傳送訊息到成功為止的最大次數,併進行迴圈。同步或非同步傳送訊息會呼叫多次,預設配置為3次。
 - 
第 36 行 :選擇訊息要傳送到的佇列,詳細解析見:MQFaultStrategy
 - 
第 43 行 :呼叫傳送訊息核心方法,詳細解析見:DefaultMQProducerImpl#sendKernelImpl()
 - 
第 46 行 :更新
Broker可用性資訊。在選擇傳送到的訊息佇列時,會參考Broker傳送訊息的延遲,詳細解析見:MQFaultStrategy - 
第 62 至 68 行:當丟擲
RemotingException時,如果進行訊息傳送失敗重試,則可能導致訊息傳送重覆。例如,傳送訊息超時(RemotingTimeoutException),實際Broker接收到該訊息並處理成功。因此,Consumer在消費時,需要保證冪等性。 
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
 // .... 省略程式碼
- 
說明 :獲得 Topic釋出資訊。優先從快取
topicPublishInfoTable,其次從Namesrv中獲得。 - 
第 3 行 :從快取
topicPublishInfoTable中獲得 Topic釋出資訊。 - 
第 5 至 9 行 :從
Namesrv中獲得 Topic釋出資訊。 - 
第 13 至 17 行 :當從
Namesrv無法獲取時,使用{@link DefaultMQProducer#createTopicKey}對應的 Topic釋出資訊。目的是當Broker開啟自動建立 Topic開關時,Broker接收到訊息後自動建立Topic,詳細解析見《RocketMQ 原始碼分析 —— Topic》。 
MQFaultStrategy
MQFaultStrategy
 // .... 省略程式碼
- 
說明 :
Producer訊息傳送容錯策略。預設情況下容錯策略關閉,即sendLatencyFaultEnable=false。 - 
第 30 至 62 行 :容錯策略選擇訊息佇列邏輯。優先獲取可用佇列,其次選擇一個broker獲取佇列,最差傳回任意broker的一個佇列。
 - 
第 64 行 :未開啟容錯策略選擇訊息佇列邏輯。
 - 
第 74 至 79 行 :更新延遲容錯資訊。當
Producer傳送訊息時間過長,則邏輯認為N秒內不可用。按照latencyMax,notAvailableDuration的配置,對應如下:Producer傳送訊息消耗時長 Broker不可用時長 >= 15000 ms 600 * 1000 ms >= 3000 ms 180 * 1000 ms >= 2000 ms 120 * 1000 ms >= 1000 ms 60 * 1000 ms >= 550 ms 30 * 1000 ms >= 100 ms 0 ms >= 50 ms 0 ms  
LatencyFaultTolerance
  // .... 省略程式碼
- 
說明 :延遲故障容錯介面
 
LatencyFaultToleranceImpl
  // .... 省略程式碼
- 
說明 :延遲故障容錯實現。維護每個物件的資訊。
 
FaultItem
 // .... 省略程式碼
- 
說明 :物件故障資訊。維護物件的名字、延遲、開始可用的時間。
 
DefaultMQProducerImpl#sendKernelImpl()
 // .... 省略程式碼
- 
說明 :傳送訊息核心方法。該方法真正發起網路請求,傳送訊息給
Broker。 - 
第 21 行 :生產訊息編號,詳細解析見《RocketMQ 原始碼分析 —— Message 基礎》。
 - 
第 64 至 121 行 :構建傳送訊息請求
SendMessageRequestHeader。 - 
第 107 至 117 行 :執行
MQClientInstance#sendMessage(...)發起網路請求。 
3、Broker 接收訊息
SendMessageProcessor#sendMessage
 // .... 省略程式碼
- 
#processRequest()說明 :處理訊息請求。 - 
#sendMessage()說明 :傳送訊息,並傳回傳送訊息結果。 - 
第 51 至 55 行 :訊息配置(Topic配置)校驗,詳細解析見:AbstractSendMessageProcessor#msgCheck()。
 - 
第 60 至 64 行 :訊息佇列編號小於0時,
Broker可以設定隨機選擇一個訊息佇列。 - 
第 72 至 103 行 :對RETRY型別的訊息處理。如果超過最大消費次數,則topic修改成”%DLQ%” + 分組名, 即加 死信隊 (Dead Letter Queue),詳細解析見:《RocketMQ 原始碼分析 —— Topic》。
 - 
第 105 至 118 行 :建立
MessageExtBrokerInner。 - 
第 132 :儲存訊息,詳細解析見:DefaultMessageStore#putMessage()。
 - 
第 133 至 183 行 :處理訊息傳送結果,設定響應結果和提示。
 - 
第 186 至 214 行 :傳送成功,響應。這裡
doResponse(ctx, request, response)進行響應,最後return null,原因是:響應給Producer可能發生異常,#doResponse(ctx, request, response)捕捉了該異常並輸出日誌。這樣做的話,我們進行排查Broker接收訊息成功後響應是否存在異常會方便很多。 
AbstractSendMessageProcessor#msgCheck
 // .... 省略程式碼
- 
說明:校驗訊息是否正確,主要是Topic配置方面,例如:
Broker是否有寫入許可權,topic配置是否存在,佇列編號是否正確。 - 
第 11 至 18 行 :檢查Topic是否可以被髮送。目前是
{@link MixAll.DEFAULT_TOPIC}不被允許傳送。 - 
第 20 至 51 行 :當找不到Topic配置,則進行建立。當然,建立會存在不成功的情況,例如說:
defaultTopic的Topic配置不存在,又或者是 存在但是不允許繼承,詳細解析見《RocketMQ 原始碼分析 —— Topic》。 
DefaultMessageStore#putMessage
  // .... 省略程式碼
- 
說明:儲存訊息封裝,最終儲存需要
CommitLog實現。 - 
第 7 至 27 行 :校驗
Broker是否可以寫入。 - 
第 29 至 39 行 :訊息格式與大小校驗。
 - 
第 47 行 :呼叫
CommitLong進行儲存,詳細邏輯見:《RocketMQ 原始碼分析 —— Message 儲存》 
4、某種結尾
感謝閱讀、收藏、點贊本文的工程師同學。
閱讀原始碼是件令自己很愉悅的事情,編寫原始碼解析是讓自己腦細胞死傷無數的過程,痛並快樂著。
如果有內容寫的存在錯誤,或是不清晰的地方,見笑了,?。歡迎加 QQ:7685413 我們一起探討,共進步。
再次感謝閱讀、收藏、點贊本文的工程師同學。
知識星球



