歡迎光臨
每天分享高質量文章

如何把開源專案用好?圖解 RocketMQ 核心原理

導讀:如何把開源專案用好,很大程度上是由學習路徑決定的:

  1. fork下來,起一個demo,上一個測試環境,遇到問題再去社區提問或找些實踐文章;

  2. 把官方文件通讀一遍,理解下產品、特點和應用場景;

  3. 先看一遍原始碼,理解清楚其中的代碼邏輯;

  4. 看原始碼太費勁,找本社區推薦的書系統的梳理下。

本文來自 Apache RocketMQ 的資深用戶丁威,他和 MyCat 的核心開發者周繼鋒合著了《RocketMQ技術內幕:架構設計與實現原理》一書,目的是希望用圖解的方式梳理 RocketMQ的核心原理

包括 RocketMQ Topic 的路由註冊與剔除機制、訊息發送高可用設計、訊息儲存檔案設計、併發訊息拉取與訊息消費流程、主從同步(HA)、事務訊息基本實現原理等,幫助開發者在使用 RocketMQ 的同時,還能對其核心原理瞭然於心。

 

 

作者:丁威

來源:阿裡巴巴中間件(ID:Aliware_2018)

 

▲Photo by Lucas Gallone on Unsplash

01 Topic 的路由機制 

介紹路由註冊機制之前,先簡單看下 RocketMQ 的整體架構:

  • Producer:訊息生產者,用於向訊息服務器發送訊息;

  • NameServer:路由註冊中心;

  • Broker:訊息儲存服務器;

  • Consumer:訊息消費者,該流程圖中未涉及。

1. 聯通性

  1. NameServer 之間互不通信,無法感知對方的存在。

  2. Producer 生產者與 NameServer 集群中的一臺服務器建立長連接,並持有整個 NameServer 集群的串列。

  3. Broker 服務會與每台 NameServer 保持長連接。

2. Topic路由註冊與剔除流程

  1. Broker 每30s向 NameServer 發送心跳包,心跳包中包含主題的路由信息(主題的讀寫佇列數、操作權限等),NameServer 會通過 HashMap 更新 Topic 的路由信息,並記錄最後一次收到 Broker 的時間戳。

  2. NameServer 以每10s的頻率清除已宕機的 Broker,NameServer 認為 Broker 宕機的依據是如果當前系統時間戳減去最後一次收到 Broker 心跳包的時間戳大於120s。

  3. 訊息生產者以每30s的頻率去拉取主題的路由信息,即訊息生產者並不會立即感知 Broker 服務器的新增與刪除。

3. 該部分涉及到的編程技巧

  1. 基於長連接的編程模型、心跳包。

  2. 多執行緒編程,讀寫鎖經典使用場景。

思考:由於訊息生產者無法實時感知 Broker 服務器的宕機,那訊息發送的高可用性如何保證呢?

02 訊息發送高可用設計

訊息發送佇列負載預設採用輪詢機制,訊息發送時預設選擇重試機制來保證訊息發送的高可用。

當 Broker 宕機後,雖然訊息發送者無法第一時間感知 Broker 宕機,但是當訊息發送者向 Broker 發送訊息傳回異常後,生產者會在接下來一定時間內,例如5分鐘內不會再次選擇該 Broker上的佇列,這樣就規避了發生故障的 Broker,結合重試機制,巧妙實現訊息發送的高可用。

03 訊息儲存檔案設計

RocketMQ 儲存設計主要包含 CommitLog 檔案、ConsumeQueue 檔案和 IndexFile 檔案。

1. CommitLog 檔案

訊息儲存檔案,所有主題的訊息隨著到達 Broker 的順序寫入 CommitLog 檔案,每個檔案預設為1G,檔案的命名也及其巧妙,使用該儲存在訊息檔案中的第一個全域性偏移量來命名檔案,這樣的設計主要是方便根據訊息的物理偏移量,快速定位到訊息所在的物理檔案。RocketMQ CommitLog 檔案使用順序寫,極大提高了檔案的寫性能。

2. ConsumeQueue 檔案

訊息消費佇列檔案,是 CommitLog 檔案的基於 Topic 的索引檔案,主要用於消費者根據 Topic消費訊息,其組織方式為 /topic/queue,同一個佇列中存在多個檔案,ConsumeQueue 設計極具技巧性,其每個條目使用固定長度(8位元組 CommitLog 物理偏移量、4位元組訊息長度、8位元組 Tag HashCode)。

 

這裡不是儲存 tag 的原始字串,而是儲存 HashCode,目的就是確保每個條目的長度固定,可以使用訪問類似陣列下標的方式來快速定位條目,極大的提高了 ConsumeQueue檔案的讀取性能。

 

試想一下,訊息消費者根據 Topic、訊息消費進度(ConsumeQueue 邏輯偏移量),即第幾個 ConsumeQueue 條目,這樣根據消費進度去訪問訊息的方法為使用邏輯偏移量logicOffset* 20即可找到該條目的起始偏移量( ConsumeQueue 檔案中的偏移量),然後讀取該偏移量後20個位元組即得到了一個條目,無需遍歷 ConsumeQueue 檔案。

3. IndexFile 檔案

基於物理磁盤檔案實現 Hash 索引。其檔案由40位元組的檔案頭、500W個 Hash 槽,每個 Hash 槽為4個位元組,最後由2000萬個 Index 條目,每個條目由20個位元組構成,分別為4位元組的索引key的 HashCode、8位元組訊息物理偏移量、4位元組時間戳、4位元組的前一個Index條目( Hash 衝突的鏈表結構)。

4. 儲存檔案部分的編程技巧

  1. 記憶體映射檔案編程技巧。

  2. 記憶體鎖定技術。

  3. 基於檔案的Hash索引實現技巧。

  4. 多執行緒協作技巧。

  5. 異步刷盤機制實現。

04 併發訊息拉取和訊息消費流程

訊息消費通常涉及到訊息佇列負載、訊息拉取、訊息過濾、訊息消費(處理訊息)、消費進度反饋等方面。併發訊息拉取與訊息消費流程如圖所示:

▲註:下麵有關訊息消費闡述的相關觀點主要基於集群消費樣式下的併發消費機制

1. 訊息佇列負載

集群內(同一消費組)內的消費者共同承擔主題下所有訊息的消費,即一條訊息只能被集群中一個消費者消費。RocketMQ的佇列負載原則是一個消費者可以承擔同一主題下的多個訊息消費佇列,但同一個訊息消費佇列同一時間只允許被分配給一個消費者。

2. RebalaceService 執行緒

其職責是負責訊息消費佇列的負載,預設以20s的間隔按照佇列負載演算法進行佇列分配,如果此次分配到的佇列與上一次分配的佇列不相同,則需要觸發訊息佇列的更新操作:

A. 如果是新分配的佇列,則創建 PullReqeust 物件(拉取訊息任務),添加到 PullMessageService 執行緒內部的阻塞佇列 pullRequestQueue 中。如果該佇列中存在拉取任務,則 PullMessageService 會向 Broker 拉取訊息。

B. 如果是上次分配但本次未分配的佇列,將其處理佇列 ProcessQueue 的狀態設置為丟棄,然後 PullMessageService 執行緒在根據 PullRequest 拉取訊息時首先會判斷 ProcessQueue 佇列的狀態,如果是已丟棄狀態,則直接丟棄 PullRequest 物件,停止拉取該佇列中的訊息,否則向Broker 拉取訊息,拉取到一批訊息後,提交到一個處理執行緒池,然後繼續將 PullRequest 物件添加到 pullRequestQueue,即很快就會再次觸發對該訊息消費佇列的再次拉取,這也是 RocketMQ 實現 PUSH 樣式的本質。

消費者消費執行緒池處理完一條訊息時,消費者需要向 Broker 彙報消費的進度,以防訊息重覆消費。這樣當消費者重啟後,指示消費者應該從哪條訊息開始消費。併發消費樣式下,由於多執行緒消費的緣故,提交到執行緒池消費的訊息預設情況下無法保證訊息消費的順序。

例如,執行緒池正在消費偏移量為1,2,3的訊息,並不保證偏移量為1的訊息先消費完成,如果訊息的處理完成順序為3,1,2,使用訊息完成的順序去更新訊息消費進度顯然是有問題的,有可能會造成訊息丟失,故RocketMQ的訊息消費進度反饋策略是每一條訊息處理完成後,並不是用訊息自身的偏移量去更新訊息消費進度,而是使用處理佇列中最小的偏移量去更新。

在此例中,如果是訊息3的訊息先處理完成,則會使用偏移量為1去更新訊息消費進度。當然這種處理保證了不丟訊息,但卻帶來了另外一個問題,訊息有可能會重覆訊息。

在 PUSH 樣式下,PullMessageService 拉取完一批訊息後,將訊息提交到執行緒池後會“馬不蹄停”去拉下一批訊息,如果此時訊息消費執行緒池處理速度很慢,處理佇列中的訊息會越積越多,占用的記憶體也隨之飆升,最終引發記憶體上限溢位,更加不能接受的訊息消費進度並不會向前推進,因為只要該處理佇列中偏移量最小的訊息未處理完成,整個訊息消費進度則無法向前推進,如果消費端重啟,又得重覆拉取訊息並造成大量訊息重覆消費。RocketMQ 解決該問題的策略是引入消費端的限流機制。

3. RocketMQ 訊息消費端的限流的兩個維度

A. 訊息堆積數量

如果訊息消費處理佇列中的訊息條數超過1000條會觸發消費端的流控,其具體做法是放棄本次拉取動作,並且延遲50ms後將放入該拉取任務放入到pullRequestQueue中,每1000次流控會打印一次消費端流控日誌。

B. 訊息堆積大小

如果處理佇列中堆積的訊息總記憶體大小超過100M,同樣觸發一次流控。

註:上述只需滿足條件之一就會觸發一次流控。

05 主從同步(HA)

RocketMQ 的主從同步機制如下:

  1. 首先啟動Master併在指定端口監聽;

  2. 客戶端啟動,主動連接Master,建立TCP連接;

  3. 客戶端以每隔5s的間隔時間向服務端拉取訊息,如果是第一次拉取的話,先獲取本地commitlog檔案中最大的偏移量,以該偏移量向服務端拉取訊息;

  4. 服務端解析請求,並傳回一批資料給客戶端;

  5. 客戶端收到一批訊息後,將訊息寫入本地commitlog檔案中,然後向Master彙報拉取進度,並更新下一次待拉取偏移量;

  6. 然後重覆第3步。

06 事務訊息

RocketMQ事務訊息的實現原理是類似基於二階段提交與事務狀態回查來實現的。事務訊息的發送只支持同步方式,其實現的關鍵點包括:

A. 在應用程式端,在一個本地事務中,通過發送訊息API向Broker發送Prepare狀態的訊息,收到訊息服務器傳回成功後執行事件回呼函式,在事件函式的職責就是記錄該訊息的事務狀態,通常採用訊息發送本地事務表,即往本地事務表中插入一條記錄,如果業務處理成功,則訊息本地事務中會存在相關記錄;如果本地事務執行失敗而導致事務回滾,此時本地事務狀態中不存在該訊息的事務狀態。

B.訊息服務端收到Prepare的訊息時,如何保證訊息不會被消費端立即處理呢?原來訊息服務端收到Prepare狀態的訊息,會先備份原訊息的主題與佇列,然後變更主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC,佇列為0。

C. 訊息服務端會開啟一個專門的執行緒,以每60s的頻率從RMQ_SYS_TRANS_OP_HALF_TOPIC中拉取一批訊息,進行事務狀態的回查,其實現原理是根據訊息所屬的訊息生產者組名隨機獲取一個生產者,向其詢問該訊息對應的本地事務是否成功,如果本地事務成功(該部分是由業務提供的事務回查監聽器來實現),則訊息服務端執行提交動作;如果事務狀態傳回失敗,則訊息服務端執行回滾動作;如果事務狀態未知,則不做處理,待下一次定時任務觸發再檢查。預設如果連續5次回查都無法得到確切的事務狀態,則執行回滾動作。

以上只是 RocketMQ 所有核心的一部分,在文章的結尾處,我想再分享一下我學習 RocketMQ的一些心得:

  1. 通讀 RocketMQ 官方文件,從全域性上瞭解 RocketMQ。

  2. 在IDE工具中搭建 RocketMQ 除錯環境,啟動 NameServer、Broker 服務器,並重點關註原始碼的 example 包,運行一個快速入門示例。

  3. 根據功能模塊進行學習,例如訊息發送、訊息儲存、訊息消費,同時註意不要發散,例如在學習訊息發送相關的流程時,遇到訊息儲存後,可暫時不去理會訊息儲存相關的細節,先一筆帶過,待學完訊息發送後,再去重點學習其他分支,例如儲存、刷盤,主從同步等。

關於作者:丁威,RocketMQ 官方直播講師,《RocketMQ技術內幕》作者。

 

延伸閱讀《RocketMQ技術內幕》

推薦語:本書從原始碼的角度對 RocketMQ 的核心技術架構,以及 NameServer、訊息發送及高可用、訊息儲存、訊息消費、訊息過濾、順序訊息、事務訊息、主從同步(HA)等主要功能模塊的實現原理進行了深入分析;同時展示了原始碼閱讀的相關技巧;併在實戰篇總結了大量的 RocketMQ 的使用技巧,並展示RocketMQ 運維管理界面的使用以及簡單介紹了 RocketMQ 39個運維命令的基本實現原理,最後在附錄部分羅列了RocketMQ所有的配置引數。

    赞(0)

    分享創造快樂