歡迎光臨
每天分享高質量文章

分散式訊息佇列 RocketMQ 原始碼分析 —— Message 順序傳送與消費

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-send-and-consume-orderly/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. Producer 順序傳送

  • 3. Consumer 嚴格順序消費

  • 3.1 獲得(鎖定)訊息佇列

  • 3.2 移除訊息佇列

  • 3.3 消費訊息佇列

    • 3.1.1 消費訊息

    • 3.1.2 處理消費結果

    • 3.13 訊息處理佇列核心方法

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

建議前置閱讀內容:

  • 《RocketMQ 原始碼分析 —— Message 傳送與接收》

  • 《RocketMQ 原始碼分析 —— Message 拉取與消費(下)》

當然對 Message 傳送與消費已經有一定瞭解的同學,可以選擇跳過。


RocketMQ 提供了兩種順序級別:

  • 普通順序訊息 : Producer 將相關聯的訊息傳送到相同的訊息佇列。

  • 完全嚴格順序 :在 普通順序訊息 的基礎上, Consumer 嚴格順序消費。

絕大部分場景下只需要用到普通順序訊息
例如說:給使用者傳送簡訊訊息 + 傳送推送訊息,將兩條訊息傳送到不同的訊息佇列,若其中一條訊息佇列消費較慢造成堵塞,使用者可能會收到兩條訊息會存在一定的時間差,帶來的體驗會相對較差。當然類似這種場景,即使有一定的時間差,不會產生系統邏輯上BUG。另外, 普通順序訊息效能能更加好。
那麼什麼時候使用使用完全嚴格順序?如下是來自官方檔案的說明:

目前已知的應用只有資料庫 binlog 同步強依賴嚴格順序訊息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序訊息


?上程式碼!!!

2. Producer 順序傳送

官方傳送順序訊息的例子

  1.  1: package org.apache.rocketmq.example.ordermessage;

  2.  2:

  3.  3: import java.io.UnsupportedEncodingException;

  4.  4: import java.util.List;

  5.  5: import org.apache.rocketmq.client.exception.MQBrokerException;

  6.  6: import org.apache.rocketmq.client.exception.MQClientException;

  7.  7: import org.apache.rocketmq.client.producer.DefaultMQProducer;

  8.  8: import org.apache.rocketmq.client.producer.MQProducer;

  9.  9: import org.apache.rocketmq.client.producer.MessageQueueSelector;

  10. 10: import org.apache.rocketmq.client.producer.SendResult;

  11. 11: import org.apache.rocketmq.common.message.Message;

  12. 12: import org.apache.rocketmq.common.message.MessageQueue;

  13. 13: import org.apache.rocketmq.remoting.common.RemotingHelper;

  14. 14: import org.apache.rocketmq.remoting.exception.RemotingException;

  15. 15:

  16. 16: public class Producer {

  17. 17:     public static void main(String[] args) throws UnsupportedEncodingException {

  18. 18:         try {

  19. 19:             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

  20. 20:             producer.start();

  21. 21:

  22. 22:             String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

  23. 23:             for (int i = 0; i < 100; i++) {

  24. 24:                 int orderId = i % 10;

  25. 25:                 Message msg =

  26. 26:                     new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,

  27. 27:                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

  28. 28:                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  29. 29:                     @Override

  30. 30:                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  31. 31:                         Integer id = (Integer) arg;

  32. 32:                         int index = id % mqs.size();

  33. 33:                         return mqs.get(index);

  34. 34:                     }

  35. 35:                 }, orderId);

  36. 36:

  37. 37:                 System.out.printf("%s%n", sendResult);

  38. 38:             }

  39. 39:

  40. 40:             producer.shutdown();

  41. 41:         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {

  42. 42:             e.printStackTrace();

  43. 43:         }

  44. 44:     }

  45. 45: }

  • 第 28 至 35 行 :實現了根據 id%mqs.size() 來進行訊息佇列的選擇。當前例子,我們傳遞 orderId 作為引數,那麼相同的 orderId 能夠進入相同的訊息佇列


MessageQueueSelector 介面的原始碼

  1.  1: public interface MessageQueueSelector {

  2.  2:

  3.  3:     /**

  4.  4:      * 選擇訊息佇列

  5.  5:      *

  6.  6:      * @param mqs 訊息佇列

  7.  7:      * @param msg 訊息

  8.  8:      * @param arg 引數

  9.  9:      * @return 訊息佇列

  10. 10:      */

  11. 11:     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

  12. 12: }


Producer 選擇佇列傳送訊息方法的原始碼

  1. 16: private SendResult sendSelectImpl(//

  2. 17:     Message msg, //

  3. 18:     MessageQueueSelector selector, //

  4. 19:     Object arg, //

  5. 20:     final CommunicationMode communicationMode, //

  6. 21:     final SendCallback sendCallback, final long timeout//

  7. 22: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

  8. 23:     this.makeSureStateOK();

  9. 24:     Validators.checkMessage(msg, this.defaultMQProducer);

  10. 25:

  11. 26:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

  12. 27:     if (topicPublishInfo != null && topicPublishInfo.ok()) {

  13. 28:         MessageQueue mq = null;

  14. 29:         try {

  15. 30:             mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);

  16. 31:         } catch (Throwable e) {

  17. 32:             throw new MQClientException("select message queue throwed exception.", e);

  18. 33:         }

  19. 34:

  20. 35:         if (mq != null) {

  21. 36:             return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);

  22. 37:         } else {

  23. 38:             throw new MQClientException("select message queue return null.", null);

  24. 39:         }

  25. 40:     }

  26. 41:

  27. 42:     throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);

  28. 43: }

  • 第 30 行 :選擇訊息佇列。

  • 第 36 行 :傳送訊息。

3. Consumer 嚴格順序消費

Consumer 在嚴格順序消費時,透過 把鎖保證嚴格順序消費。

  • Broker 訊息佇列鎖(分散式鎖) :

    • 叢集樣式下, Consumer 從 Broker 獲得該鎖後,才能進行訊息拉取、消費。

    • 廣播樣式下, Consumer 無需該鎖。

  • Consumer 訊息佇列鎖(本地鎖) : Consumer 獲得該鎖才能操作訊息佇列。

  • Consumer 訊息處理佇列消費鎖(本地鎖) : Consumer 獲得該鎖才能消費訊息佇列。

可能同學有疑問,為什麼有 Consumer 訊息佇列鎖還需要有 Consumer 訊息佇列消費鎖呢??讓我們帶著疑問繼續往下看。


3.1 獲得(鎖定)訊息佇列

叢集樣式下, Consumer 更新屬於自己的訊息佇列時,會向 Broker 鎖定該訊息佇列(廣播樣式下不需要)。如果鎖定失敗,則更新失敗,即該訊息佇列不屬於自己,不能進行消費。核心程式碼如下:

  1.  1: // ⬇️⬇️⬇️【RebalanceImpl.java】

  2.  2: private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {

  3.  3: // ..... 此處省略部分程式碼

  4.  4:     // 增加 不在processQueueTable && 存在於mqSet 裡的訊息佇列。

  5.  5:     List<PullRequest> pullRequestList = new ArrayList<>(); // 拉訊息請求陣列

  6.  6:     for (MessageQueue mq : mqSet) {

  7.  7:         if (!this.processQueueTable.containsKey(mq)) {

  8.  8:             if (isOrder && !this.lock(mq)) { // 順序訊息鎖定訊息佇列

  9.  9:                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);

  10. 10:                 continue;

  11. 11:             }

  12. 12:

  13. 13:             this.removeDirtyOffset(mq);

  14. 14:             ProcessQueue pq = new ProcessQueue();

  15. 15:             long nextOffset = this.computePullFromWhere(mq);

  16. 16:             if (nextOffset >= 0) {

  17. 17:                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);

  18. 18:                 if (pre != null) {

  19. 19:                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);

  20. 20:                 } else {

  21. 21:                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);

  22. 22:                     PullRequest pullRequest = new PullRequest();

  23. 23:                     pullRequest.setConsumerGroup(consumerGroup);

  24. 24:                     pullRequest.setNextOffset(nextOffset);

  25. 25:                     pullRequest.setMessageQueue(mq);

  26. 26:                     pullRequest.setProcessQueue(pq);

  27. 27:                     pullRequestList.add(pullRequest);

  28. 28:                     changed = true;

  29. 29:                 }

  30. 30:             } else {

  31. 31:                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);

  32. 32:             }

  33. 33:         }

  34. 34:     }

  35. 35:

  36. 36: // ..... 此處省略部分程式碼

  37. 37: }

  38. 38:

  39. 39: // ⬇️⬇️⬇️【RebalanceImpl.java】

  40. 40: /**

  41. 41:  * 請求Broker獲得指定訊息佇列的分散式鎖

  42. 42:  *

  43. 43:  * @param mq 佇列

  44. 44:  * @return 是否成功

  45. 45:  */

  46. 46: public boolean lock(final MessageQueue mq) {

  47. 47:     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);

  48. 48:     if (findBrokerResult != null) {

  49. 49:         LockBatchRequestBody requestBody = new LockBatchRequestBody();

  50. 50:         requestBody.setConsumerGroup(this.consumerGroup);

  51. 51:         requestBody.setClientId(this.mQClientFactory.getClientId());

  52. 52:         requestBody.getMqSet().add(mq);

  53. 53:

  54. 54:         try {

  55. 55:             // 請求Broker獲得指定訊息佇列的分散式鎖

  56. 56:             Set<MessageQueue> lockedMq =

  57. 57:                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

  58. 58:

  59. 59:             // 設定訊息處理佇列鎖定成功。鎖定訊息佇列成功,可能本地沒有訊息處理佇列,設定鎖定成功會在lockAll()方法。

  60. 60:             for (MessageQueue mmqq : lockedMq) {

  61. 61:                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);

  62. 62:                 if (processQueue != null) {

  63. 63:                     processQueue.setLocked(true);

  64. 64:                     processQueue.setLastLockTimestamp(System.currentTimeMillis());

  65. 65:                 }

  66. 66:             }

  67. 67:

  68. 68:             boolean lockOK = lockedMq.contains(mq);

  69. 69:             log.info("the message queue lock {}, {} {}",

  70. 70:                 lockOK ? "OK" : "Failed",

  71. 71:                 this.consumerGroup,

  72. 72:                 mq);

  73. 73:             return lockOK;

  74. 74:         } catch (Exception e) {

  75. 75:             log.error("lockBatchMQ exception, " + mq, e);

  76. 76:         }

  77. 77:     }

  78. 78:

  79. 79:     return false;

  80. 80: }

  • ⬆️⬆️⬆️

  • 第 8 至 11 行 :順序消費時,鎖定訊息佇列。如果鎖定失敗,新增訊息處理佇列失敗。


Broker 訊息佇列鎖會過期,預設配置 30s。因此, Consumer 需要不斷向 Broker 掃清該鎖過期時間,預設配置 20s 掃清一次。核心程式碼如下:

  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: public void start() {

  3.  3:     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {

  4.  4:         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  5.  5:             @Override

  6.  6:             public void run() {

  7.  7:                 ConsumeMessageOrderlyService.this.lockMQPeriodically();

  8.  8:             }

  9.  9:         }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

  10. 10:     }

  11. 11: }

3.2 移除訊息佇列

叢集樣式下, Consumer 移除自己的訊息佇列時,會向 Broker 解鎖該訊息佇列(廣播樣式下不需要)。核心程式碼如下:

  1.  1: // ⬇️⬇️⬇️【RebalancePushImpl.java】

  2.  2: /**

  3.  3:  * 移除不需要的佇列相關的資訊

  4.  4:  * 1. 持久化消費進度,並移除之

  5.  5:  * 2. 順序消費&叢集樣式,解鎖對該佇列的鎖定

  6.  6:  *

  7.  7:  * @param mq 訊息佇列

  8.  8:  * @param pq 訊息處理佇列

  9.  9:  * @return 是否移除成功

  10. 10:  */

  11. 11: @Override

  12. 12: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {

  13. 13:     // 同步佇列的消費進度,並移除之。

  14. 14:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);

  15. 15:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);

  16. 16:     // 叢集樣式下,順序消費移除時,解鎖對佇列的鎖定

  17. 17:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()

  18. 18:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {

  19. 19:         try {

  20. 20:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {

  21. 21:                 try {

  22. 22:                     return this.unlockDelay(mq, pq);

  23. 23:                 } finally {

  24. 24:                     pq.getLockConsume().unlock();

  25. 25:                 }

  26. 26:             } else {

  27. 27:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //

  28. 28:                     mq, //

  29. 29:                     pq.getTryUnlockTimes());

  30. 30:

  31. 31:                 pq.incTryUnlockTimes();

  32. 32:             }

  33. 33:         } catch (Exception e) {

  34. 34:             log.error("removeUnnecessaryMessageQueue Exception", e);

  35. 35:         }

  36. 36:

  37. 37:         return false;

  38. 38:     }

  39. 39:     return true;

  40. 40: }

  41. 41:

  42. 42: // ⬇️⬇️⬇️【RebalancePushImpl.java】

  43. 43: /**

  44. 44:  * 延遲解鎖 Broker 訊息佇列鎖

  45. 45:  * 當訊息處理佇列不存在訊息,則直接解鎖

  46. 46:  *

  47. 47:  * @param mq 訊息佇列

  48. 48:  * @param pq 訊息處理佇列

  49. 49:  * @return 是否解鎖成功

  50. 50:  */

  51. 51: private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {

  52. 52:     if (pq.hasTempMessage()) { // TODO 疑問:為什麼要延遲移除

  53. 53:         log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);

  54. 54:         this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {

  55. 55:             @Override

  56. 56:             public void run() {

  57. 57:                 log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);

  58. 58:                 RebalancePushImpl.this.unlock(mq, true);

  59. 59:             }

  60. 60:         }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);

  61. 61:     } else {

  62. 62:         this.unlock(mq, true);

  63. 63:     }

  64. 64:     return true;

  65. 65: }

  • ⬆️⬆️⬆️

  • 第 20 至 32 行 :獲取訊息佇列消費鎖,避免和訊息佇列消費衝突。如果獲取鎖失敗,則移除訊息佇列失敗,等待下次重新分配消費佇列時,再進行移除。如果未獲得鎖而進行移除,則可能出現另外的 Consumer 和當前 Consumer 同時消費該訊息佇列,導致訊息無法嚴格順序消費。

  • 第 51 至 64 行 :解鎖 Broker 訊息佇列鎖。如果訊息處理佇列存在剩餘訊息,則延遲解鎖 Broker 訊息佇列鎖。❓為什麼訊息處理佇列存在剩餘訊息不能直接解鎖呢??我也不知道,百思不得其解。如果有知道的同學麻煩教育下俺。

3.3 消費訊息佇列

?本節會類比併發消費消費佇列,建議對照 PushConsumer併發消費訊息 一起理解。

3.1.1 消費訊息

  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: class ConsumeRequest implements Runnable {

  3.  3:

  4.  4:     /**

  5.  5:      * 訊息處理佇列

  6.  6:      */

  7.  7:     private final ProcessQueue processQueue;

  8.  8:     /**

  9.  9:      * 訊息佇列

  10. 10:      */

  11. 11:     private final MessageQueue messageQueue;

  12. 12:

  13. 13:     @Override

  14. 14:     public void run() {

  15. 15:         if (this.processQueue.isDropped()) {

  16. 16:             log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  17. 17:             return;

  18. 18:         }

  19. 19:

  20. 20:         // 獲得 Consumer 訊息佇列鎖

  21. 21:         final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);

  22. 22:         synchronized (objLock) {

  23. 23:             // (廣播樣式) 或者 (叢集樣式 && Broker訊息佇列鎖有效)

  24. 24:             if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  25. 25:                 || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {

  26. 26:                 final long beginTime = System.currentTimeMillis();

  27. 27:                 // 迴圈

  28. 28:                 for (boolean continueConsume = true; continueConsume; ) {

  29. 29:                     if (this.processQueue.isDropped()) {

  30. 30:                         log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  31. 31:                         break;

  32. 32:                     }

  33. 33:

  34. 34:                     // 訊息佇列分散式鎖未鎖定,提交延遲獲得鎖並消費請求

  35. 35:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  36. 36:                         && !this.processQueue.isLocked()) {

  37. 37:                         log.warn("the message queue not locked, so consume later, {}", this.messageQueue);

  38. 38:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

  39. 39:                         break;

  40. 40:                     }

  41. 41:                     // 訊息佇列分散式鎖已經過期,提交延遲獲得鎖並消費請求

  42. 42:                     if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

  43. 43:                         && this.processQueue.isLockExpired()) {

  44. 44:                         log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);

  45. 45:                         ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

  46. 46:                         break;

  47. 47:                     }

  48. 48:

  49. 49:                     // 當前週期消費時間超過連續時長,預設:60s,提交延遲消費請求。預設情況下,每消費1分鐘休息10ms。

  50. 50:                     long interval = System.currentTimeMillis() - beginTime;

  51. 51:                     if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {

  52. 52:                         ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);

  53. 53:                         break;

  54. 54:                     }

  55. 55:

  56. 56:                     // 獲取消費訊息。此處和併發訊息請求不同,併發訊息請求已經帶了消費哪些訊息。

  57. 57:                     final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

  58. 58:                     List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);

  59. 59:                     if (!msgs.isEmpty()) {

  60. 60:                         final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

  61. 61:

  62. 62:                         ConsumeOrderlyStatus status = null;

  63. 63:

  64. 64:                         // ....省略程式碼:Hook:before

  65. 65:

  66. 66:                         // 執行消費

  67. 67:                         long beginTimestamp = System.currentTimeMillis();

  68. 68:                         ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

  69. 69:                         boolean hasException = false;

  70. 70:                         try {

  71. 71:                             this.processQueue.getLockConsume().lock(); // 鎖定佇列消費鎖

  72. 72:

  73. 73:                             if (this.processQueue.isDropped()) {

  74. 74:                                 log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",

  75. 75:                                     this.messageQueue);

  76. 76:                                 break;

  77. 77:                             }

  78. 78:

  79. 79:                             status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);

  80. 80:                         } catch (Throwable e) {

  81. 81:                             log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //

  82. 82:                                 RemotingHelper.exceptionSimpleDesc(e), //

  83. 83:                                 ConsumeMessageOrderlyService.this.consumerGroup, //

  84. 84:                                 msgs, //

  85. 85:                                 messageQueue);

  86. 86:                             hasException = true;

  87. 87:                         } finally {

  88. 88:                             this.processQueue.getLockConsume().unlock(); // 鎖定佇列消費鎖

  89. 89:                         }

  90. 90:

  91. 91:                         // ....省略程式碼:解析消費結果狀態

  92. 92:

  93. 93:                         // ....省略程式碼:Hook:after

  94. 94:

  95. 95:                         ConsumeMessageOrderlyService.this.getConsumerStatsManager()

  96. 96:                             .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

  97. 97:

  98. 98:                         // 處理消費結果

  99. 99:                         continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);

  100. 100:                     } else {

  101. 101:                         continueConsume = false;

  102. 102:                     }

  103. 103:                 }

  104. 104:             } else {

  105. 105:                 if (this.processQueue.isDropped()) {

  106. 106:                     log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);

  107. 107:                     return;

  108. 108:                 }

  109. 109:

  110. 110:                 ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);

  111. 111:             }

  112. 112:         }

  113. 113:     }

  114. 114:

  115. 115: }

  • ⬆️⬆️⬆️

  • 第 20 行 :獲得 Consumer 訊息佇列鎖。

  • 第 58 行 :從訊息處理佇列順序獲得訊息。和併發消費獲得訊息不同。併發消費請求在請求建立時,已經設定好消費哪些訊息。

  • 第 71 行 :獲得 Consumer 訊息處理佇列消費鎖。相比【 Consumer訊息佇列鎖】,其粒度較小。這就是上文提到的❓為什麼有 Consumer訊息佇列鎖還需要有 Consumer 訊息佇列消費鎖呢的原因。

  • 第 79 行 :執行消費

  • 第 99 行 :處理消費結果。

3.1.2 處理消費結果

順序消費訊息結果 ( ConsumeOrderlyStatus) 有四種情況:

  • SUCCESS :消費成功但不提交

  • ROLLBACK :消費失敗,消費回滾。

  • COMMIT :消費成功提交並且提交。

  • SUSPEND_CURRENT_QUEUE_A_MOMENT :消費失敗,掛起消費佇列一會會,稍後繼續消費。

考慮到 ROLLBACKCOMMIT 暫時只使用在 MySQLbinlog 場景,官方將這兩狀態標記為 @Deprecated。當然,相應的實現邏輯依然保留。

併發消費場景時,如果消費失敗, Consumer 會將消費失敗訊息發回到 Broker 重試佇列,跳過當前訊息,等待下次拉取該訊息再進行消費。

但是在完全嚴格順序消費消費時,這樣做顯然不行。也因此,消費失敗的訊息,會掛起佇列一會會,稍後繼續消費。

不過消費失敗的訊息一直失敗,也不可能一直消費。當超過消費重試上限時, Consumer 會將消費失敗超過上限的訊息發回到 Broker 死信佇列。

讓我們來看看程式碼:

  1.  1: // ⬇️⬇️⬇️【ConsumeMessageOrderlyService.java】

  2.  2: /**

  3.  3:  * 處理消費結果,並傳回是否繼續消費

  4.  4:  *

  5.  5:  * @param msgs 訊息

  6.  6:  * @param status 消費結果狀態

  7.  7:  * @param context 消費Context

  8.  8:  * @param consumeRequest 消費請求

  9.  9:  * @return 是否繼續消費

  10. 10:  */

  11. 11: public boolean processConsumeResult(//

  12. 12:     final List<MessageExt> msgs, //

  13. 13:     final ConsumeOrderlyStatus status, //

  14. 14:     final ConsumeOrderlyContext context, //

  15. 15:     final ConsumeRequest consumeRequest//

  16. 16: ) {

  17. 17:     boolean continueConsume = true;

  18. 18:     long commitOffset = -1L;

  19. 19:     if (context.isAutoCommit()) {

  20. 20:         switch (status) {

  21. 21:             case COMMIT:

  22. 22:             case ROLLBACK:

  23. 23:                 log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());

  24. 24:             case SUCCESS:

  25. 25:                 // 提交訊息已消費成功到訊息處理佇列

  26. 26:                 commitOffset = consumeRequest.getProcessQueue().commit();

  27. 27:                 // 統計

  28. 28:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  29. 29:                 break;

  30. 30:             case SUSPEND_CURRENT_QUEUE_A_MOMENT:

  31. 31:                 // 統計

  32. 32:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  33. 33:                 if (checkReconsumeTimes(msgs)) { // 計算是否暫時掛起(暫停)消費N毫秒,預設:10ms

  34. 34:                     // 設定訊息重新消費

  35. 35:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);

  36. 36:                     // 提交延遲消費請求

  37. 37:                     this.submitConsumeRequestLater(//

  38. 38:                         consumeRequest.getProcessQueue(), //

  39. 39:                         consumeRequest.getMessageQueue(), //

  40. 40:                         context.getSuspendCurrentQueueTimeMillis());

  41. 41:                     continueConsume = false;

  42. 42:                 } else {

  43. 43:                     commitOffset = consumeRequest.getProcessQueue().commit();

  44. 44:                 }

  45. 45:                 break;

  46. 46:             default:

  47. 47:                 break;

  48. 48:         }

  49. 49:     } else {

  50. 50:         switch (status) {

  51. 51:             case SUCCESS:

  52. 52:                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  53. 53:                 break;

  54. 54:             case COMMIT:

  55. 55:                 // 提交訊息已消費成功到訊息處理佇列

  56. 56:                 commitOffset = consumeRequest.getProcessQueue().commit();

  57. 57:                 break;

  58. 58:             case ROLLBACK:

  59. 59:                 // 設定訊息重新消費

  60. 60:                 consumeRequest.getProcessQueue().rollback();

  61. 61:                 this.submitConsumeRequestLater(//

  62. 62:                     consumeRequest.getProcessQueue(), //

  63. 63:                     consumeRequest.getMessageQueue(), //

  64. 64:                     context.getSuspendCurrentQueueTimeMillis());

  65. 65:                 continueConsume = false;

  66. 66:                 break;

  67. 67:             case SUSPEND_CURRENT_QUEUE_A_MOMENT: // 計算是否暫時掛起(暫停)消費N毫秒,預設:10ms

  68. 68:                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

  69. 69:                 if (checkReconsumeTimes(msgs)) {

  70. 70:                     // 設定訊息重新消費

  71. 71:                     consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);

  72. 72:                     // 提交延遲消費請求

  73. 73:                     this.submitConsumeRequestLater(//

  74. 74:                         consumeRequest.getProcessQueue(), //

  75. 75:                         consumeRequest.getMessageQueue(), //

  76. 76:                         context.getSuspendCurrentQueueTimeMillis());

  77. 77:                     continueConsume = false;

  78. 78:                 }

  79. 79:                 break;

  80. 80:             default:

  81. 81:                 break;

  82. 82:         }

  83. 83:     }

  84. 84:

  85. 85:     // 訊息處理佇列未dropped,提交有效消費進度

  86. 86:     if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

  87. 87:         this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);

  88. 88:     }

  89. 89:

  90. 90:     return continueConsume;

  91. 91: }

  92. 92:

  93. 93: private int getMaxReconsumeTimes() {

  94. 94:     // default reconsume times: Integer.MAX_VALUE

  95. 95:     if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {

  96. 96:         return Integer.MAX_VALUE;

  97. 97:     } else {

  98. 98:         return this.defaultMQPushConsumer.getMaxReconsumeTimes();

  99. 99:     }

  100. 100: }

  101. 101:

  102. 102: /**

  103. 103:  * 計算是否要暫停消費

  104. 104:  * 不暫停條件:存在訊息都超過最大消費次數並且都發回broker成功

  105. 105:  *

  106. 106:  * @param msgs 訊息

  107. 107:  * @return 是否要暫停

  108. 108:  */

  109. 109: private boolean checkReconsumeTimes(List<MessageExt> msgs) {

  110. 110:     boolean suspend = false;

  111. 111:     if (msgs != null && !msgs.isEmpty()) {

  112. 112:         for (MessageExt msg : msgs) {

  113. 113:             if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {

  114. 114:                 MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));

  115. 115:                 if (!sendMessageBack(msg)) { // 發回失敗,中斷

  116. 116:                     suspend = true;

  117. 117:                     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

  118. 118:                 }

  119. 119:             } else {

  120. 120:                 suspend = true;

  121. 121:                 msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

  122. 122:             }

  123. 123:         }

  124. 124:     }

  125. 125:     return suspend;

  126. 126: }

  127. 127:

  128. 128: /**

  129. 129:  * 發回訊息。

  130. 130:  * 訊息發回broker後,對應的訊息佇列是死信佇列。

  131. 131:  *

  132. 132:  * @param msg 訊息

  133. 133:  * @return 是否傳送成功

  134. 134:  */

  135. 135: public boolean sendMessageBack(final MessageExt msg) {

  136. 136:     try {

  137. 137:         // max reconsume times exceeded then send to dead letter queue.

  138. 138:         Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

  139. 139:         String originMsgId = MessageAccessor.getOriginMessageId(msg);

  140. 140:         MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

  141. 141:         newMsg.setFlag(msg.getFlag());

  142. 142:         MessageAccessor.setProperties(newMsg, msg.getProperties());

  143. 143:         MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

  144. 144:         MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));

  145. 145:         MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

  146. 146:         newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

  147. 147:

  148. 148:         this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);

  149. 149:         return true;

  150. 150:     } catch (Exception e) {

  151. 151:         log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);

  152. 152:     }

  153. 153:

  154. 154:     return false;

  155. 155: }

  • ⬆️⬆️⬆️

  • 第 21 至 29 行 :消費成功。在自動提交進度( AutoCommit )的情況下, COMMIT、 ROLLBACK、 SUCCESS 邏輯已經統一

  • 第 30 至 45 行 :消費失敗。當訊息重試次數超過上限(預設 :16次)時,將訊息傳送到 Broker 死信佇列,跳過這些訊息。此時,訊息佇列無需掛起,繼續消費後面的訊息。

  • 第 85 至 88 行 :提交消費進度。

3.13 訊息處理佇列核心方法

?涉及到的四個核心方法的原始碼:

  1.  1: // ⬇️⬇️⬇️【ProcessQueue.java】

  2.  2: /**

  3.  3:  * 訊息對映

  4.  4:  * key:訊息佇列位置

  5.  5:  */

  6.  6: private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();    /**

  7.  7:  * 訊息對映臨時儲存(消費中的訊息)

  8.  8:  */

  9.  9: private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<>();

  10. 10:

  11. 11: /**

  12. 12:  * 回滾消費中的訊息

  13. 13:  * 邏輯類似於{@link #makeMessageToCosumeAgain(List)}

  14. 14:  */

  15. 15: public void rollback() {

  16. 16:     try {

  17. 17:         this.lockTreeMap.writeLock().lockInterruptibly();

  18. 18:         try {

  19. 19:             this.msgTreeMap.putAll(this.msgTreeMapTemp);

  20. 20:             this.msgTreeMapTemp.clear();

  21. 21:         } finally {

  22. 22:             this.lockTreeMap.writeLock().unlock();

  23. 23:         }

  24. 24:     } catch (InterruptedException e) {

  25. 25:         log.error("rollback exception", e);

  26. 26:     }

  27. 27: }

  28. 28:

  29. 29: /**

  30. 30:  * 提交消費中的訊息已消費成功,傳回消費進度

  31. 31:  *

  32. 32:  * @return 消費進度

  33. 33:  */

  34. 34: public long commit() {

  35. 35:     try {

  36. 36:         this.lockTreeMap.writeLock().lockInterruptibly();

  37. 37:         try {

  38. 38:             // 消費進度

  39. 39:             Long offset = this.msgTreeMapTemp.lastKey();

  40. 40:

  41. 41:             //

  42. 42:             msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));

  43. 43:

  44. 44:             //

  45. 45:             this.msgTreeMapTemp.clear();

  46. 46:

  47. 47:             // 傳回消費進度

  48. 48:             if (offset != null) {

  49. 49:                 return offset + 1;

  50. 50:             }

  51. 51:         } finally {

  52. 52:             this.lockTreeMap.writeLock().unlock();

  53. 53:         }

  54. 54:     } catch (InterruptedException e) {

  55. 55:         log.error("commit exception", e);

  56. 56:     }

  57. 57:

  58. 58:     return -1;

  59. 59: }

  60. 60:

  61. 61: /**

  62. 62:  * 指定訊息重新消費

  63. 63:  * 邏輯類似於{@link #rollback()}

  64. 64:  *

  65. 65:  * @param msgs 訊息

  66. 66:  */

  67. 67: public void makeMessageToCosumeAgain(List<MessageExt> msgs) {

  68. 68:     try {

  69. 69:         this.lockTreeMap.writeLock().lockInterruptibly();

  70. 70:         try {

  71. 71:             for (MessageExt msg : msgs) {

  72. 72:                 this.msgTreeMapTemp.remove(msg.getQueueOffset());

  73. 73:                 this.msgTreeMap.put(msg.getQueueOffset(), msg);

  74. 74:             }

  75. 75:         } finally {

  76. 76:             this.lockTreeMap.writeLock().unlock();

  77. 77:         }

  78. 78:     } catch (InterruptedException e) {

  79. 79:         log.error("makeMessageToCosumeAgain exception", e);

  80. 80:     }

  81. 81: }

  82. 82:

  83. 83: /**

  84. 84:  * 獲得持有訊息前N條

  85. 85:  *

  86. 86:  * @param batchSize 條數

  87. 87:  * @return 訊息

  88. 88:  */

  89. 89: public List<MessageExt> takeMessags(final int batchSize) {

  90. 90:     List<MessageExt> result = new ArrayList<>(batchSize);

  91. 91:     final long now = System.currentTimeMillis();

  92. 92:     try {

  93. 93:         this.lockTreeMap.writeLock().lockInterruptibly();

  94. 94:         this.lastConsumeTimestamp = now;

  95. 95:         try {

  96. 96:             if (!this.msgTreeMap.isEmpty()) {

  97. 97:                 for (int i = 0; i < batchSize; i++) {

  98. 98:                     Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();

  99. 99:                     if (entry != null) {

  100. 100:                         result.add(entry.getValue());

  101. 101:                         msgTreeMapTemp.put(entry.getKey(), entry.getValue());

  102. 102:                     } else {

  103. 103:                         break;

  104. 104:                     }

  105. 105:                 }

  106. 106:             }

  107. 107:

  108. 108:             if (result.isEmpty()) {

  109. 109:                 consuming = false;

  110. 110:             }

  111. 111:         } finally {

  112. 112:             this.lockTreeMap.writeLock().unlock();

  113. 113:         }

  114. 114:     } catch (InterruptedException e) {

  115. 115:         log.error("take Messages exception", e);

  116. 116:     }

  117. 117:

  118. 118:     return result;

  119. 119: }

贊(0)

分享創造快樂