歡迎光臨
每天分享高質量文章

分散式訊息佇列 RocketMQ 原始碼分析 —— 高可用

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/high-availability/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. Namesrv 高可用

  • 2.1 Broker 註冊到 Namesrv

  • 2.2 Producer、Consumer 訪問 Namesrv

  • 3. Broker 高可用

  • 3.2 Broker 主從

    • 3.1.1 配置

    • 3.1.2 元件

    • 3.1.3 通訊協議

    • 3.1.4 Slave

    • 3.1.5 Master

    • 3.1.6 Master_SYNC

  • 3.2 Producer 傳送訊息

  • 3.3 Consumer 消費訊息

  • 4. 總結

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

本文主要解析 NamesrvBroker 如何實現高可用, ProducerConsumer 怎麼與它們通訊保證高可用。

2. Namesrv 高可用

啟動多個 Namesrv 實現高可用。
相較於 ZookeeperConsulEtcd 等, Namesrv 是一個超輕量級的註冊中心,提供命名服務

2.1 Broker 註冊到 Namesrv

  • 多個 Namesrv 之間,沒有任何關係(不存在類似 Zookeeper 的 LeaderFollower 等角色),不進行通訊與資料同步。透過 Broker 迴圈註冊多個 Namesrv

  1.  1: // ⬇️⬇️⬇️【BrokerOuterAPI.java】

  2.  2: public RegisterBrokerResult registerBrokerAll(

  3.  3:     final String clusterName,

  4.  4:     final String brokerAddr,

  5.  5:     final String brokerName,

  6.  6:     final long brokerId,

  7.  7:     final String haServerAddr,

  8.  8:     final TopicConfigSerializeWrapper topicConfigWrapper,

  9.  9:     final List<String> filterServerList,

  10. 10:     final boolean oneway,

  11. 11:     final int timeoutMills) {

  12. 12:     RegisterBrokerResult registerBrokerResult = null;

  13. 13:

  14. 14:     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

  15. 15:     if (nameServerAddressList != null) {

  16. 16:         for (String namesrvAddr : nameServerAddressList) { // 迴圈多個 Namesrv

  17. 17:             try {

  18. 18:                 RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,

  19. 19:                     haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);

  20. 20:                 if (result != null) {

  21. 21:                     registerBrokerResult = result;

  22. 22:                 }

  23. 23:

  24. 24:                 log.info("register broker to name server {} OK", namesrvAddr);

  25. 25:             } catch (Exception e) {

  26. 26:                 log.warn("registerBroker Exception, {}", namesrvAddr, e);

  27. 27:             }

  28. 28:         }

  29. 29:     }

  30. 30:

  31. 31:     return registerBrokerResult;

  32. 32: }

2.2 Producer、Consumer 訪問 Namesrv

  • Producer、 Consumer 從 Namesrv串列選擇一個可連線的進行通訊。

  1.  1: // ⬇️⬇️⬇️【NettyRemotingClient.java】

  2.  2: private Channel getAndCreateNameserverChannel() throws InterruptedException {

  3.  3:     // 傳回已選擇、可連線Namesrv

  4.  4:     String addr = this.namesrvAddrChoosed.get();

  5.  5:     if (addr != null) {

  6.  6:         ChannelWrapper cw = this.channelTables.get(addr);

  7.  7:         if (cw != null && cw.isOK()) {

  8.  8:             return cw.getChannel();

  9.  9:         }

  10. 10:     }

  11. 11:     //

  12. 12:     final List<String> addrList = this.namesrvAddrList.get();

  13. 13:     if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

  14. 14:         try {

  15. 15:             // 傳回已選擇、可連線的Namesrv

  16. 16:             addr = this.namesrvAddrChoosed.get();

  17. 17:             if (addr != null) {

  18. 18:                 ChannelWrapper cw = this.channelTables.get(addr);

  19. 19:                 if (cw != null && cw.isOK()) {

  20. 20:                     return cw.getChannel();

  21. 21:                 }

  22. 22:             }

  23. 23:             // 從【Namesrv串列】中選擇一個連線的傳回

  24. 24:             if (addrList != null && !addrList.isEmpty()) {

  25. 25:                 for (int i = 0; i < addrList.size(); i++) {

  26. 26:                     int index = this.namesrvIndex.incrementAndGet();

  27. 27:                     index = Math.abs(index);

  28. 28:                     index = index % addrList.size();

  29. 29:                     String newAddr = addrList.get(index);

  30. 30:

  31. 31:                     this.namesrvAddrChoosed.set(newAddr);

  32. 32:                     Channel channelNew = this.createChannel(newAddr);

  33. 33:                     if (channelNew != null)

  34. 34:                         return channelNew;

  35. 35:                 }

  36. 36:             }

  37. 37:         } catch (Exception e) {

  38. 38:             log.error("getAndCreateNameserverChannel: create name server channel exception", e);

  39. 39:         } finally {

  40. 40:             this.lockNamesrvChannel.unlock();

  41. 41:         }

  42. 42:     } else {

  43. 43:         log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);

  44. 44:     }

  45. 45:

  46. 46:     return null;

  47. 47: }

3. Broker 高可用

啟動多個 Broker分組 形成 叢集 實現高可用。
Broker分組 = Master節點x1 + Slave節點xN。
類似 MySQLMaster節點 提供讀寫服務, Slave節點 只提供服務。

3.2 Broker 主從

  • 每個分組, Master節點 不斷傳送新的 CommitLog 給 Slave節點。 Slave節點 不斷上報本地的 CommitLog 已經同步到的位置給 Master節點。

  • Broker分組 與 Broker分組 之間沒有任何關係,不進行通訊與資料同步。

  • 消費進度 目前不支援 MasterSlave 同步。

叢集內, Master節點 有兩種型別: Master_SYNCMaster_ASYNC:前者在 Producer 傳送訊息時,等待 Slave節點 儲存完畢後再傳回傳送結果,而後者不需要等待。

3.1.1 配置

目前官方提供三套配置:

  • 2m-2s-async

brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a ASYNC_MASTER 0
DefaultCluster broker-a SLAVE 1
DefaultCluster broker-b ASYNC_MASTER 0
DefaultCluster broker-b SLAVE 1
    • 2m-2s-sync

    brokerClusterName brokerName brokerRole brokerId
    DefaultCluster broker-a SYNC_MASTER 0
    DefaultCluster broker-a SLAVE 1
    DefaultCluster broker-b SYNC_MASTER 0
    DefaultCluster broker-b SLAVE 1
      • 2m-noslave

      brokerClusterName brokerName brokerRole brokerId
      DefaultCluster broker-a ASYNC_MASTER 0
      DefaultCluster broker-b ASYNC_MASTER 0

      3.1.2 元件

      再看具體實現程式碼之前,我們來看看 Master/ Slave節點 包含的元件:

      • Master節點

        • ReadSocketService :來自 Slave節點 的資料。

        • WriteSocketService :到往 Slave節點 的資料。

        • AcceptSocketService :接收 Slave節點 連線。

        • HAConnection

      • Slave節點

        • HAClient :對 Master節點 連線、讀寫資料。

      3.1.3 通訊協議

      Master節點 與 Slave節點 通訊協議很簡單,只有如下兩條。

      物件 用途 第幾位 欄位 資料型別 位元組數 說明
      Slave=>Master 上報CommitLog已經同步到的物理位置
      0 maxPhyOffset Long 8 CommitLog最大物理位置
      Master=>Slave 傳輸新的 CommitLog 資料
      0 fromPhyOffset Long 8 CommitLog開始物理位置
      1 size Int 4 傳輸CommitLog資料長度
      2 body Bytes size 傳輸CommitLog資料

      3.1.4 Slave


      • Slave 主迴圈,實現了不斷不斷不斷從 Master 傳輸 CommitLog 資料,上傳 Master 自己本地的 CommitLog 已經同步物理位置。

      1.  1: // ⬇️⬇️⬇️【HAClient.java】

      2.  2: public void run() {

      3.  3:     log.info(this.getServiceName() + " service started");

      4.  4:

      5.  5:     while (!this.isStopped()) {

      6.  6:         try {

      7.  7:             if (this.connectMaster()) {

      8.  8:                 // 若到滿足上報間隔,上報到Master進度

      9.  9:                 if (this.isTimeToReportOffset()) {

      10. 10:                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);

      11. 11:                     if (!result) {

      12. 12:                         this.closeMaster();

      13. 13:                     }

      14. 14:                 }

      15. 15:

      16. 16:                 this.selector.select(1000);

      17. 17:

      18. 18:                 // 處理讀取事件

      19. 19:                 boolean ok = this.processReadEvent();

      20. 20:                 if (!ok) {

      21. 21:                     this.closeMaster();

      22. 22:                 }

      23. 23:

      24. 24:                 // 若進度有變化,上報到Master進度

      25. 25:                 if (!reportSlaveMaxOffsetPlus()) {

      26. 26:                     continue;

      27. 27:                 }

      28. 28:

      29. 29:                 // Master過久未傳回資料,關閉連線

      30. 30:                 long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

      31. 31:                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()

      32. 32:                     .getHaHousekeepingInterval()) {

      33. 33:                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress

      34. 34:                         + "] expired, " + interval);

      35. 35:                     this.closeMaster();

      36. 36:                     log.warn("HAClient, master not response some time, so close connection");

      37. 37:                 }

      38. 38:             } else {

      39. 39:                 this.waitForRunning(1000 * 5);

      40. 40:             }

      41. 41:         } catch (Exception e) {

      42. 42:             log.warn(this.getServiceName() + " service has exception. ", e);

      43. 43:             this.waitForRunning(1000 * 5);

      44. 44:         }

      45. 45:     }

      46. 46:

      47. 47:     log.info(this.getServiceName() + " service end");

      48. 48: }

      • 第 8 至 14 行 :固定間隔(預設5s)向 Master 上報 Slave 本地 CommitLog 已經同步到的物理位置。該操作還有心跳的作用。

      • 第 16 至 22 行 :處理 Master 傳輸 Slave 的 CommitLog 資料。


      • 我們來看看 #dispatchReadRequest(...) 與 #reportSlaveMaxOffset(...) 是怎麼實現的。

      1.  1: // 【HAClient.java】

      2.  2: /**

      3.  3:  * 讀取Master傳輸的CommitLog資料,並傳回是異常

      4.  4:  * 如果讀取到資料,寫入CommitLog

      5.  5:  * 異常原因:

      6.  6:  *   1. Master傳輸來的資料offset 不等於 Slave的CommitLog資料最大offset

      7.  7:  *   2. 上報到Master進度失敗

      8.  8:  *

      9.  9:  * @return 是否異常

      10. 10:  */

      11. 11: private boolean dispatchReadRequest() {

      12. 12:     final int msgHeaderSize = 8 + 4; // phyoffset + size

      13. 13:     int readSocketPos = this.byteBufferRead.position();

      14. 14:

      15. 15:     while (true) {

      16. 16:         // 讀取到請求

      17. 17:         int diff = this.byteBufferRead.position() - this.dispatchPostion;

      18. 18:         if (diff >= msgHeaderSize) {

      19. 19:             // 讀取masterPhyOffset、bodySize。使用dispatchPostion的原因是:處理資料“粘包”導致資料讀取不完整。

      20. 20:             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);

      21. 21:             int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);

      22. 22:             // 校驗 Master傳輸來的資料offset 是否和 Slave的CommitLog資料最大offset 是否相同。

      23. 23:             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

      24. 24:             if (slavePhyOffset != 0) {

      25. 25:                 if (slavePhyOffset != masterPhyOffset) {

      26. 26:                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "

      27. 27:                         + slavePhyOffset + " MASTER: " + masterPhyOffset);

      28. 28:                     return false;

      29. 29:                 }

      30. 30:             }

      31. 31:             // 讀取到訊息

      32. 32:             if (diff >= (msgHeaderSize + bodySize)) {

      33. 33:                 // 寫入CommitLog

      34. 34:                 byte[] bodyData = new byte[bodySize];

      35. 35:                 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);

      36. 36:                 this.byteBufferRead.get(bodyData);

      37. 37:                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

      38. 38:                 // 設定處理到的位置

      39. 39:                 this.byteBufferRead.position(readSocketPos);

      40. 40:                 this.dispatchPostion += msgHeaderSize + bodySize;

      41. 41:                 // 上報到Master進度

      42. 42:                 if (!reportSlaveMaxOffsetPlus()) {

      43. 43:                     return false;

      44. 44:                 }

      45. 45:                 // 繼續迴圈

      46. 46:                 continue;

      47. 47:             }

      48. 48:         }

      49. 49:

      50. 50:         // 空間寫滿,重新分配空間

      51. 51:         if (!this.byteBufferRead.hasRemaining()) {

      52. 52:             this.reallocateByteBuffer();

      53. 53:         }

      54. 54:

      55. 55:         break;

      56. 56:     }

      57. 57:

      58. 58:     return true;

      59. 59: }

      60. 60:

      61. 61: /**

      62. 62:  * 上報進度

      63. 63:  *

      64. 64:  * @param maxOffset 進度

      65. 65:  * @return 是否上報成功

      66. 66:  */

      67. 67: private boolean reportSlaveMaxOffset(final long maxOffset) {

      68. 68:     this.reportOffset.position(0);

      69. 69:     this.reportOffset.limit(8);

      70. 70:     this.reportOffset.putLong(maxOffset);

      71. 71:     this.reportOffset.position(0);

      72. 72:     this.reportOffset.limit(8);

      73. 73:

      74. 74:     for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {

      75. 75:         try {

      76. 76:             this.socketChannel.write(this.reportOffset);

      77. 77:         } catch (IOException e) {

      78. 78:             log.error(this.getServiceName()

      79. 79:                 + "reportSlaveMaxOffset this.socketChannel.write exception", e);

      80. 80:             return false;

      81. 81:         }

      82. 82:     }

      83. 83:

      84. 84:     return !this.reportOffset.hasRemaining();

      85. 85: }

      3.1.5 Master

      • ReadSocketService 邏輯同 HAClient#processReadEvent(...) 基本相同,我們直接看程式碼。

      1.  1: // ⬇️⬇️⬇️【ReadSocketService.java】

      2.  2: private boolean processReadEvent() {

      3.  3:     int readSizeZeroTimes = 0;

      4.  4:

      5.  5:     // 清空byteBufferRead

      6.  6:     if (!this.byteBufferRead.hasRemaining()) {

      7.  7:         this.byteBufferRead.flip();

      8.  8:         this.processPostion = 0;

      9.  9:     }

      10. 10:

      11. 11:     while (this.byteBufferRead.hasRemaining()) {

      12. 12:         try {

      13. 13:             int readSize = this.socketChannel.read(this.byteBufferRead);

      14. 14:             if (readSize > 0) {

      15. 15:                 readSizeZeroTimes = 0;

      16. 16:

      17. 17:                 // 設定最後讀取時間

      18. 18:                 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

      19. 19:

      20. 20:                 if ((this.byteBufferRead.position() - this.processPostion) >= 8) {

      21. 21:                     // 讀取Slave 請求來的CommitLog的最大位置

      22. 22:                     int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);

      23. 23:                     long readOffset = this.byteBufferRead.getLong(pos - 8);

      24. 24:                     this.processPostion = pos;

      25. 25:

      26. 26:                     // 設定Slave CommitLog的最大位置

      27. 27:                     HAConnection.this.slaveAckOffset = readOffset;

      28. 28:

      29. 29:                     // 設定Slave 第一次請求的位置

      30. 30:                     if (HAConnection.this.slaveRequestOffset < 0) {

      31. 31:                         HAConnection.this.slaveRequestOffset = readOffset;

      32. 32:                         log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);

      33. 33:                     }

      34. 34:

      35. 35:                     // 通知目前Slave進度。主要用於Master節點為同步型別的。

      36. 36:                     HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);

      37. 37:                 }

      38. 38:             } else if (readSize == 0) {

      39. 39:                 if (++readSizeZeroTimes >= 3) {

      40. 40:                     break;

      41. 41:                 }

      42. 42:             } else {

      43. 43:                 log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");

      44. 44:                 return false;

      45. 45:             }

      46. 46:         } catch (IOException e) {

      47. 47:             log.error("processReadEvent exception", e);

      48. 48:             return false;

      49. 49:         }

      50. 50:     }

      51. 51:

      52. 52:     return true;

      53. 53: }


      • WriteSocketService 計算 Slave開始同步的位置後,不斷向 Slave 傳輸新的 CommitLog資料。

      1.  1: // ⬇️⬇️⬇️【WriteSocketService.java】

      2.  2: @Override

      3.  3: public void run() {

      4.  4:     HAConnection.log.info(this.getServiceName() + " service started");

      5.  5:

      6.  6:     while (!this.isStopped()) {

      7.  7:         try {

      8.  8:             this.selector.select(1000);

      9.  9:

      10. 10:             // 未獲得Slave讀取進度請求,sleep等待。

      11. 11:             if (-1 == HAConnection.this.slaveRequestOffset) {

      12. 12:                 Thread.sleep(10);

      13. 13:                 continue;

      14. 14:             }

      15. 15:

      16. 16:             // 計算初始化nextTransferFromWhere

      17. 17:             if (-1 == this.nextTransferFromWhere) {

      18. 18:                 if (0 == HAConnection.this.slaveRequestOffset) {

      19. 19:                     long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();

      20. 20:                     masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());

      21. 21:                     if (masterOffset < 0) {

      22. 22:                         masterOffset = 0;

      23. 23:                     }

      24. 24:

      25. 25:                     this.nextTransferFromWhere = masterOffset;

      26. 26:                 } else {

      27. 27:                     this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;

      28. 28:                 }

      29. 29:

      30. 30:                 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr

      31. 31:                     + "], and slave request " + HAConnection.this.slaveRequestOffset);

      32. 32:             }

      33. 33:

      34. 34:             if (this.lastWriteOver) {

      35. 35:                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

      36. 36:                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳

      37. 37:

      38. 38:                     // Build Header

      39. 39:                     this.byteBufferHeader.position(0);

      40. 40:                     this.byteBufferHeader.limit(essay-headerSize);

      41. 41:                     this.byteBufferHeader.putLong(this.nextTransferFromWhere);

      42. 42:                     this.byteBufferHeader.putInt(0);

      43. 43:                     this.byteBufferHeader.flip();

      44. 44:

      45. 45:                     this.lastWriteOver = this.transferData();

      46. 46:                     if (!this.lastWriteOver)

      47. 47:                         continue;

      48. 48:                 }

      49. 49:             } else { // 未傳輸完成,繼續傳輸

      50. 50:                 this.lastWriteOver = this.transferData();

      51. 51:                 if (!this.lastWriteOver)

      52. 52:                     continue;

      53. 53:             }

      54. 54:

      55. 55:             // 選擇新的CommitLog資料進行傳輸

      56. 56:             SelectMappedBufferResult selectResult =

      57. 57:                 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);

      58. 58:             if (selectResult != null) {

      59. 59:                 int size = selectResult.getSize();

      60. 60:                 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {

      61. 61:                     size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();

      62. 62:                 }

      63. 63:

      64. 64:                 long thisOffset = this.nextTransferFromWhere;

      65. 65:                 this.nextTransferFromWhere += size;

      66. 66:

      67. 67:                 selectResult.getByteBuffer().limit(size);

      68. 68:                 this.selectMappedBufferResult = selectResult;

      69. 69:

      70. 70:                 // Build Header

      71. 71:                 this.byteBufferHeader.position(0);

      72. 72:                 this.byteBufferHeader.limit(essay-headerSize);

      73. 73:                 this.byteBufferHeader.putLong(thisOffset);

      74. 74:                 this.byteBufferHeader.putInt(size);

      75. 75:                 this.byteBufferHeader.flip();

      76. 76:

      77. 77:                 this.lastWriteOver = this.transferData();

      78. 78:             } else { // 沒新的訊息,掛起等待

      79. 79:                 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);

      80. 80:             }

      81. 81:         } catch (Exception e) {

      82. 82:

      83. 83:             HAConnection.log.error(this.getServiceName() + " service has exception.", e);

      84. 84:             break;

      85. 85:         }

      86. 86:     }

      87. 87:

      88. 88:     // 斷開連線 & 暫停寫執行緒 & 暫停讀執行緒 & 釋放CommitLog

      89. 89:     if (this.selectMappedBufferResult != null) {

      90. 90:         this.selectMappedBufferResult.release();

      91. 91:     }

      92. 92:

      93. 93:     this.makeStop();

      94. 94:

      95. 95:     readSocketService.makeStop();

      96. 96:

      97. 97:     haService.removeConnection(HAConnection.this);

      98. 98:

      99. 99:     SelectionKey sk = this.socketChannel.keyFor(this.selector);

      100. 100:     if (sk != null) {

      101. 101:         sk.cancel();

      102. 102:     }

      103. 103:

      104. 104:     try {

      105. 105:         this.selector.close();

      106. 106:         this.socketChannel.close();

      107. 107:     } catch (IOException e) {

      108. 108:         HAConnection.log.error("", e);

      109. 109:     }

      110. 110:

      111. 111:     HAConnection.log.info(this.getServiceName() + " service end");

      112. 112: }

      113. 113:

      114. 114: /**

      115. 115:  * 傳輸資料

      116. 116:  */

      117. 117: private boolean transferData() throws Exception {

      118. 118:     int writeSizeZeroTimes = 0;

      119. 119:     // Write Header

      120. 120:     while (this.byteBufferHeader.hasRemaining()) {

      121. 121:         int writeSize = this.socketChannel.write(this.byteBufferHeader);

      122. 122:         if (writeSize > 0) {

      123. 123:             writeSizeZeroTimes = 0;

      124. 124:             this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

      125. 125:         } else if (writeSize == 0) {

      126. 126:             if (++writeSizeZeroTimes >= 3) {

      127. 127:                 break;

      128. 128:             }

      129. 129:         } else {

      130. 130:             throw new Exception("ha master write essay-header error < 0");

      131. 131:         }

      132. 132:     }

      133. 133:

      134. 134:     if (null == this.selectMappedBufferResult) {

      135. 135:         return !this.byteBufferHeader.hasRemaining();

      136. 136:     }

      137. 137:

      138. 138:     writeSizeZeroTimes = 0;

      139. 139:

      140. 140:     // Write Body

      141. 141:     if (!this.byteBufferHeader.hasRemaining()) {

      142. 142:         while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {

      143. 143:             int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());

      144. 144:             if (writeSize > 0) {

      145. 145:                 writeSizeZeroTimes = 0;

      146. 146:                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

      147. 147:             } else if (writeSize == 0) {

      148. 148:                 if (++writeSizeZeroTimes >= 3) {

      149. 149:                     break;

      150. 150:                 }

      151. 151:             } else {

      152. 152:                 throw new Exception("ha master write body error < 0");

      153. 153:             }

      154. 154:         }

      155. 155:     }

      156. 156:

      157. 157:     boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

      158. 158:

      159. 159:     if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {

      160. 160:         this.selectMappedBufferResult.release();

      161. 161:         this.selectMappedBufferResult = null;

      162. 162:     }

      163. 163:

      164. 164:     return result;

      165. 165: }

      3.1.6 Master_SYNC

      • Producer 傳送訊息時, Master_SYNC節點 會等待 Slave節點 儲存完畢後再傳回傳送結果。

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【CommitLog.java】

      2.  2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

      3.  3:     // ....省略處理髮送程式碼

      4.  4:     // Synchronous write double 如果是同步Master,同步到從節點

      5.  5:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {

      6.  6:         HAService service = this.defaultMessageStore.getHaService();

      7.  7:         if (msg.isWaitStoreMsgOK()) {

      8.  8:             // Determine whether to wait

      9.  9:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {

      10. 10:                 if (null == request) {

      11. 11:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

      12. 12:                 }

      13. 13:                 service.putRequest(request);

      14. 14:

      15. 15:                 // 喚醒WriteSocketService

      16. 16:                 service.getWaitNotifyObject().wakeupAll();

      17. 17:

      18. 18:                 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

      19. 19:                 if (!flushOK) {

      20. 20:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "

      21. 21:                         + msg.getTags() + " client address: " + msg.getBornHostString());

      22. 22:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);

      23. 23:                 }

      24. 24:             }

      25. 25:             // Slave problem

      26. 26:             else {

      27. 27:                 // Tell the producer, slave not available

      28. 28:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);

      29. 29:             }

      30. 30:         }

      31. 31:     }

      32. 32:

      33. 33:     return putMessageResult;

      34. 34: }

      • 第 16 行 :喚醒 WriteSocketService

        • 喚醒後, WriteSocketService 掛起等待新訊息結束, Master 傳輸 Slave 新的 CommitLog 資料。

        • Slave 收到資料後,立即上報最新的 CommitLog 同步進度到 Master。 ReadSocketService 喚醒第 18 行: request#waitForFlush(...)

      我們來看下 GroupTransferService 的核心邏輯程式碼:

      1.  1: // ⬇️⬇️⬇️【GroupTransferService.java】

      2.  2: private void doWaitTransfer() {

      3.  3:     synchronized (this.requestsRead) {

      4.  4:         if (!this.requestsRead.isEmpty()) {

      5.  5:             for (CommitLog.GroupCommitRequest req : this.requestsRead) {

      6.  6:                 // 等待Slave上傳進度

      7.  7:                 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

      8.  8:                 for (int i = 0; !transferOK && i < 5; i++) {

      9.  9:                     this.notifyTransferObject.waitForRunning(1000); // 喚醒

      10. 10:                     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

      11. 11:                 }

      12. 12:

      13. 13:                 if (!transferOK) {

      14. 14:                     log.warn("transfer messsage to slave timeout, " + req.getNextOffset());

      15. 15:                 }

      16. 16:

      17. 17:                 // 喚醒請求,並設定是否Slave同步成功

      18. 18:                 req.wakeupCustomer(transferOK);

      19. 19:             }

      20. 20:

      21. 21:             this.requestsRead.clear();

      22. 22:         }

      23. 23:     }

      24. 24: }

      3.2 Producer 傳送訊息

      • Producer 傳送訊息時,會對 Broker叢集 的所有佇列進行選擇。

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】

      2.  2: private SendResult sendDefaultImpl(//

      3.  3:     Message msg, //

      4.  4:     final CommunicationMode communicationMode, //

      5.  5:     final SendCallback sendCallback, //

      6.  6:     final long timeout//

      7.  7: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

      8.  8:     // .... 省略:處理【校驗邏輯】

      9.  9:     // 獲取 Topic路由資訊

      10. 10:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

      11. 11:     if (topicPublishInfo != null && topicPublishInfo.ok()) {

      12. 12:         MessageQueue mq = null; // 最後選擇訊息要傳送到的佇列

      13. 13:         Exception exception = null;

      14. 14:         SendResult sendResult = null; // 最後一次傳送結果

      15. 15:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次呼叫

      16. 16:         int times = 0; // 第幾次傳送

      17. 17:         String[] brokersSent = new String[timesTotal]; // 儲存每次傳送訊息選擇的broker名

      18. 18:         // 迴圈呼叫傳送訊息,直到成功

      19. 19:         for (; times < timesTotal; times++) {

      20. 20:             String lastBrokerName = null == mq ? null : mq.getBrokerName();

      21. 21:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇訊息要傳送到的佇列

      22. 22:             if (tmpmq != null) {

      23. 23:                 mq = tmpmq;

      24. 24:                 brokersSent[times] = mq.getBrokerName();

      25. 25:                 try {

      26. 26:                     beginTimestampPrev = System.currentTimeMillis();

      27. 27:                     // 呼叫傳送訊息核心方法

      28. 28:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);

      29. 29:                     endTimestamp = System.currentTimeMillis();

      30. 30:                     // 更新Broker可用性資訊

      31. 31:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

      32. 32:                     // .... 省略:處理【傳送傳回結果】

      33. 33:                     }

      34. 34:                 } catch (e) { // .... 省略:處理【異常】

      35. 35:                    

      36. 36:                 }

      37. 37:             } else {

      38. 38:                 break;

      39. 39:             }

      40. 40:         }

      41. 41:         // .... 省略:處理【傳送傳回結果】

      42. 42:     }

      43. 43:     // .... 省略:處理【找不到訊息路由】

      44. 44: }

      如下是除錯 #sendDefaultImpl(...)TopicPublishInfo 的結果, Producer 獲得到了 broker-a, broker-b兩個 Broker分組 的訊息佇列:

      3.3 Consumer 消費訊息

      • Consumer 消費訊息時,會對 Broker叢集 的所有佇列進行選擇。

      4. 總結

      贊(0)

      分享創造快樂