歡迎光臨
每天分享高質量文章

分散式訊息佇列 RocketMQ 原始碼分析 —— 定時訊息與訊息重試

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

排版又崩了,請【閱讀原文】。

本文主要基於 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. 定時訊息

    • 2.1 延遲級別

    • 2.2 Producer 傳送定時訊息

    • 2.3 Broker 儲存定時訊息

    • 2.4 Broker 傳送定時訊息

    • 2.5 Broker 持久化定時傳送進度

  • 3. 訊息重試

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

建議前置閱讀內容:

  • 《RocketMQ 原始碼分析 —— Message 傳送與接收》

  • 《RocketMQ 原始碼分析 —— Message 拉取與消費(下)》

? 為什麼把定時訊息訊息重試放在一起?你猜。
? 你猜我猜不猜。

2. 定時訊息

定時訊息是指訊息發到 Broker 後,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能被消費。

下圖是定時訊息的處理邏輯圖:

2.1 延遲級別

RocketMQ 目前只支援固定精度的定時訊息。官方說法如下:

如果要支援任意的時間精度,在 Broker 層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。

  • 延遲級別:

延遲級別 時間
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
    • 核心原始碼如下:

    1.  1: // ⬇️⬇️⬇️【MessageStoreConfig.java】

    2: /**
    3: * 訊息延遲級別字串配置
    4: */
    5: private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
    6:
    7: // ⬇️⬇️⬇️【ScheduleMessageService.java】
    8: /**
    9: * 解析延遲級別
    10: *
    11: * @return 是否解析成功
    12: */
    13: public boolean parseDelayLevel() {
    14: HashMap timeUnitTable = new HashMap<>();
    15: timeUnitTable.put(“s”, 1000L);
    16: timeUnitTable.put(“m”, 1000L * 60);
    17: timeUnitTable.put(“h”, 1000L * 60 * 60);
    18: timeUnitTable.put(“d”, 1000L * 60 * 60 * 24);
    19:
    20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    21: try {
    22: String[] levelArray = levelString.split(” “);
    23: for (int i = 0; i < levelArray.length; i++) {
    24: String value = levelArray[i];
    25: String ch = value.substring(value.length() – 1);
    26: Long tu = timeUnitTable.get(ch);
    27:
    28: int level = i + 1;
    29: if (level > this.maxDelayLevel) {
    30: this.maxDelayLevel = level;
    31: }
    32: long num = Long.parseLong(value.substring(0, value.length() – 1));
    33: long delayTimeMillis = tu * num;
    34: this.delayLevelTable.put(level, delayTimeMillis);
    35: }
    36: } catch (Exception e) {
    37: log.error(“parseDelayLevel exception”, e);
    38: log.info(“levelString String = {}”, levelString);
    39: return false;
    40: }
    41:
    42: return true;
    43: }

    2.2 Producer 傳送定時訊息

      • ?傳送時,設定訊息的延遲級別

      1. Message msg = new Message(...);

      2. msg.setDelayTimeLevel(level);

      2.3 Broker 儲存定時訊息

      • ? 儲存訊息時,延遲訊息進入 Topic 為 SCHEDULE_TOPIC_XXXX

      • ? 延遲級別 與 訊息佇列編號 做固定對映:QueueId = DelayLevel – 1

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【CommitLog.java】

      2.  2: /**

      3.  3:  * 新增訊息,傳回訊息結果

      4.  4:  *

      5.  5:  * @param msg 訊息

      6.  6:  * @return 結果

      7.  7:  */

      8.  8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

      9.  9:     // ....(省略程式碼)

      10. 10:

      11. 11:     // 定時訊息處理

      12. 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

      13. 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//

      14. 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

      15. 15:         // Delay Delivery

      16. 16:         if (msg.getDelayTimeLevel() > 0) {

      17. 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

      18. 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

      19. 19:             }

      20. 20:

      21. 21:             // 儲存訊息時,延遲訊息進入 `Topic` 為 `SCHEDULE_TOPIC_XXXX` 。

      22. 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC;

      23. 23:

      24. 24:             // 延遲級別 與 訊息佇列編號 做固定對映

      25. 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

      26. 26:

      27. 27:             // Backup real topic, queueId

      28. 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

      29. 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

      30. 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

      31. 31:

      32. 32:             msg.setTopic(topic);

      33. 33:             msg.setQueueId(queueId);

      34. 34:         }

      35. 35:     }

      36. 36:

      37. 37:     // ....(省略程式碼)

      38. 38: }

      39. 39:

      40. 40: // ⬇️⬇️⬇️【ScheduleMessageService.java】

      41. 41: /**

      42. 42:  * 根據 延遲級別 計算 訊息佇列編號

      43. 43:  * QueueId = DelayLevel - 1

      44. 44:  *

      45. 45:  * @param delayLevel 延遲級別

      46. 46:  * @return 訊息佇列編號

      47. 47:  */

      48. 48: public static int delayLevel2QueueId(final int delayLevel) {

      49. 49:     return delayLevel - 1;

      50. 50: }


      • ? 生成 ConsumeQueue 時,每條訊息的 tagsCode 使用【訊息計劃消費時間】。這樣, ScheduleMessageService在輪詢 ConsumeQueue 時,可以使用 tagsCode 進行過濾。

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【CommitLog.java】

      2.  2: /**

      3.  3:  * check the message and returns the message size

      4.  4:  *

      5.  5:  * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure

      6.  6:  */

      7.  7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {

      8.  8:     try {

      9.  9:         // // ....(省略程式碼)

      10. 10:

      11. 11:         // 17 properties

      12. 12:         short propertiesLength = byteBuffer.getShort();

      13. 13:         if (propertiesLength > 0) {

      14. 14:             // ....(省略程式碼)

      15. 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);

      16. 16:             if (tags != null && tags.length() > 0) {

      17. 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);

      18. 18:             }

      19. 19:

      20. 20:             // Timing message processing

      21. 21:             {

      22. 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);

      23. 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {

      24. 24:                     int delayLevel = Integer.parseInt(t);

      25. 25:

      26. 26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

      27. 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();

      28. 28:                     }

      29. 29:

      30. 30:                     if (delayLevel > 0) {

      31. 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,

      32. 32:                             storeTimestamp);

      33. 33:                     }

      34. 34:                 }

      35. 35:             }

      36. 36:         }

      37. 37:

      38. 38:         // ....(省略程式碼)

      39. 39:

      40. 40:         return new DispatchRequest(//

      41. 41:             topic, // 1

      42. 42:             queueId, // 2

      43. 43:             physicOffset, // 3

      44. 44:             totalSize, // 4

      45. 45:             tagsCode, // 5

      46. 46:             storeTimestamp, // 6

      47. 47:             queueOffset, // 7

      48. 48:             keys, // 8

      49. 49:             uniqKey, //9

      50. 50:             sysFlag, // 9

      51. 51:             preparedTransactionOffset// 10

      52. 52:         );

      53. 53:     } catch (Exception e) {

      54. 54:     }

      55. 55:

      56. 56:     return new DispatchRequest(-1, false /* success */);

      57. 57: }

      58. 58:

      59. 59: // ⬇️⬇️⬇️【ScheduleMessageService.java】

      60. 60: /**

      61. 61:  * 計算 投遞時間【計劃消費時間】

      62. 62:  *

      63. 63:  * @param delayLevel 延遲級別

      64. 64:  * @param storeTimestamp 儲存時間

      65. 65:  * @return 投遞時間【計劃消費時間】

      66. 66:  */

      67. 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {

      68. 68:     Long time = this.delayLevelTable.get(delayLevel);

      69. 69:     if (time != null) {

      70. 70:         return time + storeTimestamp;

      71. 71:     }

      72. 72:

      73. 73:     return storeTimestamp + 1000;

      74. 74: }

      2.4 Broker 傳送定時訊息

      • ? 對 SCHEDULE_TOPIC_XXXX 每條消費佇列對應單獨一個定時任務進行輪詢,傳送 到達投遞時間【計劃消費時間】 的訊息。

      下圖是傳送定時訊息的處理邏輯圖:

      實現程式碼如下:

      1.  1: /**

      2.  2:  * ⬇️⬇️⬇️ 傳送(投遞)延遲訊息定時任務

      3.  3:  */

      4.  4: class DeliverDelayedMessageTimerTask extends TimerTask {

      5.  5:     /**

      6.  6:      * 延遲級別

      7.  7:      */

      8.  8:     private final int delayLevel;

      9.  9:     /**

      10. 10:      * 位置

      11. 11:      */

      12. 12:     private final long offset;

      13. 13:

      14. 14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {

      15. 15:         this.delayLevel = delayLevel;

      16. 16:         this.offset = offset;

      17. 17:     }

      18. 18:

      19. 19:     @Override

      20. 20:     public void run() {

      21. 21:         try {

      22. 22:             this.executeOnTimeup();

      23. 23:         } catch (Exception e) {

      24. 24:             // XXX: warn and notify me

      25. 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e);

      26. 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(

      27. 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);

      28. 28:         }

      29. 29:     }

      30. 30:

      31. 31:     /**

      32. 32:      * 糾正可投遞時間。

      33. 33:      * 因為傳送級別對應的傳送間隔可以調整,如果超過當前間隔,則修正成當前配置,避免後面的訊息無法傳送。

      34. 34:      *

      35. 35:      * @param now 當前時間

      36. 36:      * @param deliverTimestamp 投遞時間

      37. 37:      * @return 糾正結果

      38. 38:      */

      39. 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

      40. 40:         long result = deliverTimestamp;

      41. 41:

      42. 42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);

      43. 43:         if (deliverTimestamp > maxTimestamp) {

      44. 44:             result = now;

      45. 45:         }

      46. 46:

      47. 47:         return result;

      48. 48:     }

      49. 49:

      50. 50:     public void executeOnTimeup() {

      51. 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));

      52. 52:

      53. 53:         long failScheduleOffset = offset;

      54. 54:

      55. 55:         if (cq != null) {

      56. 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

      57. 57:             if (bufferCQ != null) {

      58. 58:                 try {

      59. 59:                     long nextOffset = offset;

      60. 60:                     int i = 0;

      61. 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

      62. 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong();

      63. 63:                         int sizePy = bufferCQ.getByteBuffer().getInt();

      64. 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong();

      65. 65:

      66. 66:                         long now = System.currentTimeMillis();

      67. 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

      68. 68:

      69. 69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

      70. 70:

      71. 71:                         long countdown = deliverTimestamp - now;

      72. 72:

      73. 73:                         if (countdown <= 0) { // 訊息到達可傳送時間

      74. 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

      75. 75:                             if (msgExt != null) {

      76. 76:                                 try {

      77. 77:                                     // 傳送訊息

      78. 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

      79. 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);

      80. 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 傳送成功

      81. 81:                                         continue;

      82. 82:                                     } else { // 傳送失敗

      83. 83:                                         // XXX: warn and notify me

      84. 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());

      85. 85:

      86. 86:                                         // 安排下一次任務

      87. 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);

      88. 88:

      89. 89:                                         // 更新進度

      90. 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

      91. 91:                                         return;

      92. 92:                                     }

      93. 93:                                 } catch (Exception e) {

      94. 94:                                     // XXX: warn and notify me

      95. 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="

      96. 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);

      97. 97:                                 }

      98. 98:                             }

      99. 99:                         } else {

      100. 100:                             // 安排下一次任務

      101. 101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);

      102. 102:

      103. 103:                             // 更新進度

      104. 104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

      105. 105:                             return;

      106. 106:                         }

      107. 107:                     } // end of for

      108. 108:

      109. 109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

      110. 110:

      111. 111:                     // 安排下一次任務

      112. 112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);

      113. 113:

      114. 114:                     // 更新進度

      115. 115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

      116. 116:                     return;

      117. 117:                 } finally {

      118. 118:                     bufferCQ.release();

      119. 119:                 }

      120. 120:             } // end of if (bufferCQ != null)

      121. 121:             else { // 消費佇列已經被刪除部分,跳轉到最小的消費進度

      122. 122:                 long cqMinOffset = cq.getMinOffsetInQueue();

      123. 123:                 if (offset < cqMinOffset) {

      124. 124:                     failScheduleOffset = cqMinOffset;

      125. 125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="

      126. 126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());

      127. 127:                 }

      128. 128:             }

      129. 129:         } // end of if (cq != null)

      130. 130:

      131. 131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);

      132. 132:     }

      133. 133:

      134. 134:     /**

      135. 135:      * 設定訊息內容

      136. 136:      *

      137. 137:      * @param msgExt 訊息

      138. 138:      * @return 訊息

      139. 139:      */

      140. 140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {

      141. 141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();

      142. 142:         msgInner.setBody(msgExt.getBody());

      143. 143:         msgInner.setFlag(msgExt.getFlag());

      144. 144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());

      145. 145:

      146. 146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());

      147. 147:         long tagsCodeValue =

      148. 148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());

      149. 149:         msgInner.setTagsCode(tagsCodeValue);

      150. 150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

      151. 151:

      152. 152:         msgInner.setSysFlag(msgExt.getSysFlag());

      153. 153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());

      154. 154:         msgInner.setBornHost(msgExt.getBornHost());

      155. 155:         msgInner.setStoreHost(msgExt.getStoreHost());

      156. 156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

      157. 157:

      158. 158:         msgInner.setWaitStoreMsgOK(false);

      159. 159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

      160. 160:

      161. 161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

      162. 162:

      163. 163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);

      164. 164:         int queueId = Integer.parseInt(queueIdStr);

      165. 165:         msgInner.setQueueId(queueId);

      166. 166:

      167. 167:         return msgInner;

      168. 168:     }

      169. 169: }

      2.5 Broker 持久化定時傳送進度

      • ? 定時訊息傳送進度儲存在檔案( ../config/delayOffset.json)裡

      • ? 每 10s 定時持久化傳送進度。

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【ScheduleMessageService.java】

      2.  2: /**

      3.  3: public void start() {

      4.  4:     // 定時傳送訊息

      5.  5:     for (Map.Entry entry : this.delayLevelTable.entrySet()) {

      6.  6:         Integer level = entry.getKey();

      7.  7:         Long timeDelay = entry.getValue();

      8.  8:         Long offset = this.offsetTable.get(level);

      9.  9:         if (null == offset) {

      10. 10:             offset = 0L;

      11. 11:         }

      12. 12:

      13. 13:         if (timeDelay != null) {

      14. 14:             this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);

      15. 15:         }

      16. 16:     }

      17. 17:

      18. 18:     // 定時持久化傳送進度

      19. 19:     this.timer.scheduleAtFixedRate(new TimerTask() {

      20. 20:

      21. 21:         @Override

      22. 22:         public void run() {

      23. 23:             try {

      24. 24:                 ScheduleMessageService.this.persist();

      25. 25:             } catch (Exception e) {

      26. 26:                 log.error("scheduleAtFixedRate flush exception", e);

      27. 27:             }

      28. 28:         }

      29. 29:     }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

      30. 30: }

      3. 訊息重試

      Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。

      • Consumer 將消費失敗的訊息發回 Broker,進入延遲訊息佇列。即,消費失敗的訊息,不會立即消費。

      核心程式碼如下:

      1.  1: // ⬇️⬇️⬇️【SendMessageProcessor.java】

      2.  2: /**

      3.  3:  * 消費者發回訊息

      4.  4:  *

      5.  5:  * @param ctx ctx

      6.  6:  * @param request 請求

      7.  7:  * @return 響應

      8.  8:  * @throws RemotingCommandException 當遠端呼叫異常

      9.  9:  */

      10. 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)

      11. 11:     throws RemotingCommandException {

      12. 12:     // ....(省略程式碼)

      13. 13:     // 處理 delayLevel(獨有)。

      14. 14:     int delayLevel = requestHeader.getDelayLevel();

      15. 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();

      16. 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {

      17. 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();

      18. 18:     }

      19. 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//

      20. 20:     // ....(省略程式碼)

      21. 21:     } else {

      22. 22:         if (0 == delayLevel) {

      23. 23:             delayLevel = 3 + msgExt.getReconsumeTimes();

      24. 24:         }

      25. 25:         msgExt.setDelayTimeLevel(delayLevel);

      26. 26:     }

      27. 27:

      28. 28:     // ....(省略程式碼)

      29. 29:     return response;

      30. 30: }

      贊(0)

      分享創造快樂