歡迎光臨
每天分享高質量文章

分佈式訊息佇列 RocketMQ原始碼解析:事務訊息

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-transaction/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!

本文主要基於 RocketMQ 4.0.x 正式版

  • 1. 概述

  • 2. 事務訊息發送

    • 2.1 Producer 發送事務訊息

    • 2.2 Broker 處理結束事務請求

    • 2.3 Broker 生成 ConsumeQueue

  • 3. 事務訊息回查

    • 3.1 Broker 發起【事務訊息回查】

      • 3.1.1 官方V3.1.4:基於檔案系統

        • 3.1.1.1 儲存訊息到 CommitLog

        • 3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)

        • 3.1.1.3 【事務訊息】回查

        • 3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)

        • 3.1.1.5 補充

      • 3.1.2 官方V4.0.0:基於資料庫

    • 3.2 Producer 接收【事務訊息回查】


  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。


1. 概述

必須必須必須 前置閱讀內容:

  • 《事務訊息(阿裡雲)》

2. 事務訊息發送

2.1 Producer 發送事務訊息

  • 活動圖如下(結合 核心代碼 理解):

  • 實現代碼如下:

  1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
 2: /**
 3:  * 發送事務訊息
 4:  *
 5:  * @param msg 訊息
 6:  * @param tranExecuter 【本地事務】執行器
 7:  * @param arg 【本地事務】執行器引數
 8:  * @return 事務發送結果
 9:  * @throws MQClientException 當 Client 發生異常時
10:  */

11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
12:     throws MQClientException
{
13:     if (null == tranExecuter) {
14:         throw new MQClientException("tranExecutor is null", null);
15:     }
16:     Validators.checkMessage(msg, this.defaultMQProducer);
17:
18:     // 發送【Half訊息】
19:     SendResult sendResult;
20:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
21:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
22:     try {
23:         sendResult = this.send(msg);
24:     } catch (Exception e) {
25:         throw new MQClientException("send message Exception", e);
26:     }
27:
28:     // 處理髮送【Half訊息】結果
29:     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
30:     Throwable localException = null;
31:     switch (sendResult.getSendStatus()) {
32:         // 發送【Half訊息】成功,執行【本地事務】邏輯
33:         case SEND_OK: {
34:             try {
35:                 if (sendResult.getTransactionId() != null) { // 事務編號。目前開源版本暫時沒用到,猜想ONS在使用。
36:                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
37:                 }
38:
39:                 // 執行【本地事務】邏輯
40:                 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
41:                 if (null == localTransactionState) {
42:                     localTransactionState = LocalTransactionState.UNKNOW;
43:                 }
44:
45:                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
46:                     log.info("executeLocalTransactionBranch return {}", localTransactionState);
47:                     log.info(msg.toString());
48:                 }
49:             } catch (Throwable e) {
50:                 log.info("executeLocalTransactionBranch exception", e);
51:                 log.info(msg.toString());
52:                 localException = e;
53:             }
54:         }
55:         break;
56:         // 發送【Half訊息】失敗,標記【本地事務】狀態為回滾
57:         case FLUSH_DISK_TIMEOUT:
58:         case FLUSH_SLAVE_TIMEOUT:
59:         case SLAVE_NOT_AVAILABLE:
60:             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
61:             break;
62:         default:
63:             break;
64:     }
65:
66:     // 結束事務:提交訊息 COMMIT / ROLLBACK
67:     try {
68:         this.endTransaction(sendResult, localTransactionState, localException);
69:     } catch (Exception e) {
70:         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
71:     }
72:
73:     // 傳回【事務發送結果】
74:     TransactionSendResult transactionSendResult = new TransactionSendResult();
75:     transactionSendResult.setSendStatus(sendResult.getSendStatus());
76:     transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
77:     transactionSendResult.setMsgId(sendResult.getMsgId());
78:     transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
79:     transactionSendResult.setTransactionId(sendResult.getTransactionId());
80:     transactionSendResult.setLocalTransactionState(localTransactionState);
81:     return transactionSendResult;
82: }
83:
84: /**
85:  * 結束事務:提交訊息 COMMIT / ROLLBACK
86:  *
87:  * @param sendResult 發送【Half訊息】結果
88:  * @param localTransactionState 【本地事務】狀態
89:  * @param localException 執行【本地事務】邏輯產生的異常
90:  * @throws RemotingException 當遠程呼叫發生異常時
91:  * @throws MQBrokerException 當 Broker 發生異常時
92:  * @throws InterruptedException 當執行緒中斷時
93:  * @throws UnknownHostException 當解碼訊息編號失敗是
94:  */

95: public void endTransaction(//
96:     final SendResult sendResult, //
97:     final LocalTransactionState localTransactionState, //
98:     final Throwable localException)
throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException
{
99:     // 解碼訊息編號
100:     final MessageId id;
101:     if (sendResult.getOffsetMsgId() != null) {
102:         id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
103:     } else {
104:         id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
105:     }
106:
107:     // 創建請求
108:     String transactionId = sendResult.getTransactionId();
109:     final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
110:     EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
111:     requestHeader.setTransactionId(transactionId);
112:     requestHeader.setCommitLogOffset(id.getOffset());
113:     switch (localTransactionState) {
114:         case COMMIT_MESSAGE:
115:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
116:             break;
117:         case ROLLBACK_MESSAGE:
118:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
119:             break;
120:         case UNKNOW:
121:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
122:             break;
123:         default:
124:             break;
125:     }
126:     requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
127:     requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
128:     requestHeader.setMsgId(sendResult.getMsgId());
129:     String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
130:
131:     // 提交訊息 COMMIT / ROLLBACK。!!!通信方式為:Oneway!!!
132:     this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
133: }

2.2 Broker 處理結束事務請求

  • ? 查詢請求的訊息,進行提交 / 回滾。實現代碼如下:

  1: // ⬇️⬇️⬇️【EndTransactionProcessor.java】
 2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
 3:     final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 4:     final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
 5:
 6:     // 省略代碼 =》打印日誌(只處理 COMMIT / ROLLBACK)
 7:
 8:     // 查詢提交的訊息
 9:     final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
10:     if (msgExt != null) {
11:         // 省略代碼 =》校驗訊息
12:
13:         // 生成訊息
14:         MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
15:         msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
16:         msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
17:         msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
18:         msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
19:         if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
20:             msgInner.setBody(null);
21:         }
22:
23:         // 儲存生成訊息
24:         final MessageStore messageStore = this.brokerController.getMessageStore();
25:         final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
26:
27:         // 處理儲存結果
28:         if (putMessageResult != null) {
29:             switch (putMessageResult.getPutMessageStatus()) {
30:                 // Success
31:                 case PUT_OK:
32:                 case FLUSH_DISK_TIMEOUT:
33:                 case FLUSH_SLAVE_TIMEOUT:
34:                 case SLAVE_NOT_AVAILABLE:
35:                     response.setCode(ResponseCode.SUCCESS);
36:                     response.setRemark(null);
37:                     break;
38:                 // Failed
39:                 case CREATE_MAPEDFILE_FAILED:
40:                     response.setCode(ResponseCode.SYSTEM_ERROR);
41:                     response.setRemark("create maped file failed.");
42:                     break;
43:                 case MESSAGE_ILLEGAL:
44:                 case PROPERTIES_SIZE_EXCEEDED:
45:                     response.setCode(ResponseCode.MESSAGE_ILLEGAL);
46:                     response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
47:                     break;
48:                 case SERVICE_NOT_AVAILABLE:
49:                     response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
50:                     response.setRemark("service not available now.");
51:                     break;
52:                 case OS_PAGECACHE_BUSY:
53:                     response.setCode(ResponseCode.SYSTEM_ERROR);
54:                     response.setRemark("OS page cache busy, please try another machine");
55:                     break;
56:                 case UNKNOWN_ERROR:
57:                     response.setCode(ResponseCode.SYSTEM_ERROR);
58:                     response.setRemark("UNKNOWN_ERROR");
59:                     break;
60:                 default:
61:                     response.setCode(ResponseCode.SYSTEM_ERROR);
62:                     response.setRemark("UNKNOWN_ERROR DEFAULT");
63:                     break;
64:             }
65:
66:             return response;
67:         } else {
68:             response.setCode(ResponseCode.SYSTEM_ERROR);
69:             response.setRemark("store putMessage return null");
70:         }
71:     } else {
72:         response.setCode(ResponseCode.SYSTEM_ERROR);
73:         response.setRemark("find prepared transaction message failed");
74:         return response;
75:     }
76:
77:     return response;
78: }

2.3 Broker 生成 ConsumeQueue

  • ? 事務訊息,提交(COMMIT)後才生成 ConsumeQueue

  1: // ⬇️⬇️⬇️【DefaultMessageStore.java】
 2: public void doDispatch(DispatchRequest req) {
 3:     // 非事務訊息 或 事務提交訊息 建立 訊息位置信息 到 ConsumeQueue
 4:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
 5:     switch (tranType) {
 6:         case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事務訊息
 7:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事務訊息COMMIT
 8:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
 9:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
10:             break;
11:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事務訊息PREPARED
12:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事務訊息ROLLBACK
13:             break;
14:     }
15:     // 省略代碼 =》 建立 索引信息 到 IndexFile
16: }

3. 事務訊息回查

  • 【事務訊息回查】功能曾經開源過,目前(V4.0.0)暫未開源。如下是該功能的開源情況:

版本 【事務訊息回查】
官方V3.0.4 ~ V3.1.4 基於 檔案系統 實現 已開源
官方V3.1.5 ~ V4.0.0 基於 資料庫 實現 未完全開源

我們來看看兩種情況下是怎麼實現的。

3.1 Broker 發起【事務訊息回查】

3.1.1 官方V3.1.4:基於檔案系統

倉庫地址:https://github.com/YunaiV/rocketmq-3.1.9/tree/release_3.1.4

相較於普通訊息,【事務訊息】多依賴如下三個組件:

  • TransactionStateService :事務狀態服務,負責對【事務訊息】進行管理,包括儲存與更新事務訊息狀態、回查事務訊息狀態等等。

  • TranStateTable :【事務訊息】狀態儲存。基於 MappedFileQueue 實現,預設儲存路徑為 ~/store/transaction/statetable,每條【事務訊息】狀態儲存結構如下:

第幾位 欄位 說明 資料型別 位元組數
1 offset CommitLog 物理儲存位置 Long 8
2 size 訊息長度 Int 4
3 timestamp 訊息儲存時間,單位:秒 Int 4
4 producerGroupHash producerGroup 求 HashCode Int 4
5 state 事務狀態 Int 4
    • TranRedoLog :TranStateTable重放日誌,每次寫操作 TranStateTable記錄重放日誌。當 Broker 異常關閉時,使用 TranRedoLog 恢復 TranStateTable。基於 ConsumeQueue 實現,Topic 為 TRANSACTION_REDOLOG_TOPIC_XXXX,預設儲存路徑為 ~/store/transaction/redolog


    簡單手繪邏輯圖如下?:

    3.1.1.1 儲存訊息到 CommitLog

    • ?儲存【half訊息】到 CommitLog 時,訊息佇列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置)。這樣,訊息可以索引到自己對應的 TranStateTable 的位置和記錄。

    核心代碼如下:

      1: // ⬇️⬇️⬇️【DefaultAppendMessageCallback.java】
     2: class DefaultAppendMessageCallback implements AppendMessageCallback {
     3:     public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,  final int maxBlank, final Object msg) {
     4:         // ...省略代碼
     5:
     6:         // 事務訊息需要特殊處理
     7:         final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
     8:         switch (tranType) {
     9:         case MessageSysFlag.TransactionPreparedType: // 訊息佇列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置)
    10:             queueOffset = CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().get();
    11:             break;
    12:         case MessageSysFlag.TransactionRollbackType:
    13:             queueOffset = msgInner.getQueueOffset();
    14:             break;
    15:         case MessageSysFlag.TransactionNotType:
    16:         case MessageSysFlag.TransactionCommitType:
    17:         default:
    18:             break;
    19:         }
    20:
    21:         // ...省略代碼
    22:
    23:         switch (tranType) {
    24:         case MessageSysFlag.TransactionPreparedType:
    25:             // 更新 TranStateTable 最大物理位置(可寫入物理位置)
    26:             CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().incrementAndGet();
    27:             break;
    28:         case MessageSysFlag.TransactionRollbackType:
    29:             break;
    30:         case MessageSysFlag.TransactionNotType:
    31:         case MessageSysFlag.TransactionCommitType:
    32:             // 更新下一次的ConsumeQueue信息
    33:             CommitLog.this.topicQueueTable.put(key, ++queueOffset);
    34:             break;
    35:         default:
    36:             break;
    37:         }
    38:
    39:         // 傳回結果
    40:         return result;
    41:     }
    42: }

    3.1.1.2 寫【事務訊息】狀態儲存(TranStateTable)

    • ?處理【Half訊息】時,新增【事務訊息】狀態儲存(TranStateTable)。

    • ?處理【Commit / Rollback訊息】時,更新 【事務訊息】狀態儲存(TranStateTable) COMMIT / ROLLBACK。

    • ?每次寫操作【事務訊息】狀態儲存(TranStateTable),記錄重放日誌(TranRedoLog)。

    核心代碼如下:

      1: // ⬇️⬇️⬇️【DispatchMessageService.java】
     2: private void doDispatch() {
     3:     if (!this.requestsRead.isEmpty()) {
     4:         for (DispatchRequest req : this.requestsRead) {
     5:
     6:             // ...省略代碼
     7:
     8:             // 2、寫【事務訊息】狀態儲存(TranStateTable)
     9:             if (req.getProducerGroup() != null) {
    10:                 switch (tranType) {
    11:                 case MessageSysFlag.TransactionNotType:
    12:                     break;
    13:                 case MessageSysFlag.TransactionPreparedType:
    14:                     // 新增 【事務訊息】狀態儲存(TranStateTable)
    15:                     DefaultMessageStore.this.getTransactionStateService().appendPreparedTransaction(
    16:                         req.getCommitLogOffset(), req.getMsgSize(), (int) (req.getStoreTimestamp() / 1000), req.getProducerGroup().hashCode());
    17:                     break;
    18:                 case MessageSysFlag.TransactionCommitType:
    19:                 case MessageSysFlag.TransactionRollbackType:
    20:                     // 更新 【事務訊息】狀態儲存(TranStateTable) COMMIT / ROLLBACK
    21:                     DefaultMessageStore.this.getTransactionStateService().updateTransactionState(
    22:                         req.getTranStateTableOffset(), req.getPreparedTransactionOffset(), req.getProducerGroup().hashCode(), tranType);
    23:                     break;
    24:                 }
    25:             }
    26:             // 3、記錄 TranRedoLog
    27:             switch (tranType) {
    28:             case MessageSysFlag.TransactionNotType:
    29:                 break;
    30:             case MessageSysFlag.TransactionPreparedType:
    31:                 // 記錄 TranRedoLog
    32:                 DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
    33:                         req.getCommitLogOffset(), req.getMsgSize(), TransactionStateService.PreparedMessageTagsCode,
    34:                         req.getStoreTimestamp(), 0L);
    35:                 break;
    36:             case MessageSysFlag.TransactionCommitType:
    37:             case MessageSysFlag.TransactionRollbackType:
    38:                 // 記錄 TranRedoLog
    39:                 DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
    40:                         req.getCommitLogOffset(), req.getMsgSize(), req.getPreparedTransactionOffset(),
    41:                         req.getStoreTimestamp(), 0L);
    42:                 break;
    43:             }
    44:         }
    45:
    46:         // ...省略代碼
    47:     }
    48: }
    49: // ⬇️⬇️⬇️【TransactionStateService.java】
    50: /**
    51:  * 新增事務狀態
    52:  *
    53:  * @param clOffset commitLog 物理位置
    54:  * @param size 訊息長度
    55:  * @param timestamp 訊息儲存時間
    56:  * @param groupHashCode groupHashCode
    57:  * @return 是否成功
    58:  */

    59: public boolean appendPreparedTransaction(//
    60:         final long clOffset,//
    61:         final int size,//
    62:         final int timestamp,//
    63:         final int groupHashCode//
    64: )
    {
    65:     MapedFile mapedFile = this.tranStateTable.getLastMapedFile();
    66:     if (null == mapedFile) {
    67:         log.error("appendPreparedTransaction: create mapedfile error.");
    68:         return false;
    69:     }
    70:
    71:     // 首次創建,加入定時任務中
    72:     if (0 == mapedFile.getWrotePostion()) {
    73:         this.addTimerTask(mapedFile);
    74:     }
    75:
    76:     this.byteBufferAppend.position(0);
    77:     this.byteBufferAppend.limit(TSStoreUnitSize);
    78:
    79:     // Commit Log Offset
    80:     this.byteBufferAppend.putLong(clOffset);
    81:     // Message Size
    82:     this.byteBufferAppend.putInt(size);
    83:     // Timestamp
    84:     this.byteBufferAppend.putInt(timestamp);
    85:     // Producer Group Hashcode
    86:     this.byteBufferAppend.putInt(groupHashCode);
    87:     // Transaction State
    88:     this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);
    89:
    90:     return mapedFile.appendMessage(this.byteBufferAppend.array());
    91: }
    92:
    93: /**
    94:  * 更新事務狀態
    95:  *
    96:  * @param tsOffset tranStateTable 物理位置
    97:  * @param clOffset commitLog 物理位置
    98:  * @param groupHashCode groupHashCode
    99:  * @param state 事務狀態
    100:  * @return 是否成功
    101:  */

    102: public boolean updateTransactionState(
    103:         final long tsOffset,
    104:         final long clOffset,
    105:         final int groupHashCode,
    106:         final int state)
    {
    107:     SelectMapedBufferResult selectMapedBufferResult = this.findTransactionBuffer(tsOffset);
    108:     if (selectMapedBufferResult != null) {
    109:         try {
    110:
    111:             // ....省略代碼:校驗是否能夠更新
    112:
    113:             // 更新事務狀態
    114:             selectMapedBufferResult.getByteBuffer().putInt(TS_STATE_POS, state);
    115:         }
    116:         catch (Exception e) {
    117:             log.error("updateTransactionState exception", e);
    118:         }
    119:         finally {
    120:             selectMapedBufferResult.release();
    121:         }
    122:     }
    123:
    124:     return false;
    125: }

    3.1.1.3 【事務訊息】回查

    • ?TranStateTable 每個 MappedFile 都對應一個 TimerTimer 固定周期(預設:60s)遍歷 MappedFile,查找【half訊息】,向 Producer 發起【事務訊息】回查請求。【事務訊息】回查結果的邏輯不在此處進行,在 CommitLog dispatch時執行。

    實現代碼如下:

      1: // ⬇️⬇️⬇️【TransactionStateService.java】
     2: /**
     3:  * 初始化定時任務
     4:  */

     5: private void initTimerTask() {
     6:     //
     7:     final List mapedFiles = this.tranStateTable.getMapedFiles();
     8:     for (MapedFile mf : mapedFiles) {
     9:         this.addTimerTask(mf);
    10:     }
    11: }
    12:
    13: /**
    14:  * 每個檔案初始化定時任務
    15:  * @param mf 檔案
    16:  */

    17: private void addTimerTask(final MapedFile mf) {
    18:     this.timer.scheduleAtFixedRate(new TimerTask() {
    19:         private final MapedFile mapedFile = mf;
    20:         private final TransactionCheckExecuter transactionCheckExecuter = TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();
    21:         private final long checkTransactionMessageAtleastInterval = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
    22:                     .getCheckTransactionMessageAtleastInterval();
    23:         private final boolean slave = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
    24:
    25:         @Override
    26:         public void run() {
    27:             // Slave不需要回查事務狀態
    28:             if (slave) {
    29:                 return;
    30:             }
    31:             // Check功能是否開啟
    32:             if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
    33:                 .isCheckTransactionMessageEnable()) {
    34:                 return;
    35:             }
    36:
    37:             try {
    38:                 SelectMapedBufferResult selectMapedBufferResult = mapedFile.selectMapedBuffer(0);
    39:                 if (selectMapedBufferResult != null) {
    40:                     long preparedMessageCountInThisMapedFile = 0; // 回查的【half訊息】數量
    41:                     int i = 0;
    42:                     try {
    43:                         // 迴圈每條【事務訊息】狀態,對【half訊息】進行回查
    44:                         for (; i < selectMapedBufferResult.getSize(); i += TSStoreUnitSize) {
    45:                             selectMapedBufferResult.getByteBuffer().position(i);
    46:
    47:                             // Commit Log Offset
    48:                             long clOffset = selectMapedBufferResult.getByteBuffer().getLong();
    49:                             // Message Size
    50:                             int msgSize = selectMapedBufferResult.getByteBuffer().getInt();
    51:                             // Timestamp
    52:                             int timestamp = selectMapedBufferResult.getByteBuffer().getInt();
    53:                             // Producer Group Hashcode
    54:                             int groupHashCode = selectMapedBufferResult.getByteBuffer().getInt();
    55:                             // Transaction State
    56:                             int tranType = selectMapedBufferResult.getByteBuffer().getInt();
    57:
    58:                             // 已經提交或者回滾的訊息跳過
    59:                             if (tranType != MessageSysFlag.TransactionPreparedType) {
    60:                                 continue;
    61:                             }
    62:
    63:                             // 遇到時間不符合最小輪詢間隔,終止
    64:                             long timestampLong = timestamp * 1000;
    65:                             long diff = System.currentTimeMillis() - timestampLong;
    66:                             if (diff < checkTransactionMessageAtleastInterval) {
    67:                                 break;
    68:                             }
    69:
    70:                             preparedMessageCountInThisMapedFile++;
    71:
    72:                             // 回查Producer
    73:                             try {
    74:                                 this.transactionCheckExecuter.gotoCheck(groupHashCode, getTranStateOffset(i), clOffset, msgSize);
    75:                             } catch (Exception e) {
    76:                                 tranlog.warn("gotoCheck Exception", e);
    77:                             }
    78:                         }
    79:
    80:                         // 無回查的【half訊息】數量,且遍歷完,則終止定時任務
    81:                         if (0 == preparedMessageCountInThisMapedFile //
    82:                                 && i == mapedFile.getFileSize()) {
    83:                             tranlog.info("remove the transaction timer task, because no prepared message in this mapedfile[{}]", mapedFile.getFileName());
    84:                             this.cancel();
    85:                         }
    86:                     } finally {
    87:                         selectMapedBufferResult.release();
    88:                     }
    89:
    90:                     tranlog.info("the transaction timer task execute over in this period, {} Prepared Message: {} Check Progress: {}/{}", mapedFile.getFileName(),//
    91:                             preparedMessageCountInThisMapedFile, i / TSStoreUnitSize, mapedFile.getFileSize() / TSStoreUnitSize);
    92:                 } else if (mapedFile.isFull()) {
    93:                     tranlog.info("the mapedfile[{}] maybe deleted, cancel check transaction timer task", mapedFile.getFileName());
    94:                     this.cancel();
    95:                     return;
    96:                 }
    97:             } catch (Exception e) {
    98:                 log.error("check transaction timer task Exception", e);
    99:             }
    100:         }
    101:
    102:
    103:         private long getTranStateOffset(final long currentIndex) {
    104:             long offset = (this.mapedFile.getFileFromOffset() + currentIndex) / TransactionStateService.TSStoreUnitSize;
    105:             return offset;
    106:         }
    107:     }, 1000 * 60, this.defaultMessageStore.getMessageStoreConfig().getCheckTransactionMessageTimerInterval());
    108: }
    109:
    110: // 【DefaultTransactionCheckExecuter.java】
    111: @Override
    112: public void gotoCheck(int producerGroupHashCode, long tranStateTableOffset, long commitLogOffset,
    113:         int msgSize)
    {
    114:     // 第一步、查詢Producer
    115:     final ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager().pickProducerChannelRandomly(producerGroupHashCode);
    116:     if (null == clientChannelInfo) {
    117:         log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode);
    118:         return;
    119:     }
    120:
    121:     // 第二步、查詢訊息
    122:     SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(commitLogOffset, msgSize);
    123:     if (null == selectMapedBufferResult) {
    124:         log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: ", commitLogOffset, msgSize);
    125:         return;
    126:     }
    127:
    128:     // 第三步、向Producer發起請求
    129:     final CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
    130:     requestHeader.setCommitLogOffset(commitLogOffset);
    131:     requestHeader.setTranStateTableOffset(tranStateTableOffset);
    132:     this.brokerController.getBroker2Client().checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, selectMapedBufferResult);
    133: }

    3.1.1.4 初始化【事務訊息】狀態儲存(TranStateTable)

    • ?根據最後 Broker 關閉是否正常,會有不同的初始化方式。

    核心代碼如下:

    // 微信文章長度限制,請點擊【閱讀原文】

    3.1.1.5 補充

    • 為什麼 V3.1.5 開始,使用 資料庫 實現【事務狀態】的儲存?如下是來自官方文件的說明,可能是一部分原因:

    RocketMQ 這種實現事務方式,沒有通過 KV 儲存做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改資料,會令系統的臟頁過多,需要特別關註。

    3.1.2 官方V4.0.0:基於資料庫

    倉庫地址:https://github.com/apache/incubator-rocketmq

    官方V4.0.0 暫時未完全開源【事務訊息回查】功能,So 我們需要進行一些猜想,可能不一定正確?

    ?我們來對比【官方V3.1.4:基於檔案】的實現。

    • TransactionRecord :記錄每條【事務訊息】。類似 TranStateTable

    TranStateTable TransactionRecord
    offset offset
    producerGroupHash producerGroup
    size 非必須欄位:【事務訊息】回查時,使用 offset 讀取 CommitLog 獲得。
    timestamp 非必須欄位:【事務訊息】回查時,使用 offset 讀取 CommitLog 獲得。
    state 非必須欄位: 事務開始,增加記錄;事務結束,刪除記錄。

    另外,資料庫本身保證了資料儲存的可靠性,無需 TranRedoLog


    簡單手繪邏輯圖如下?:

    3.2 Producer 接收【事務訊息回查】

    • 順序圖如下:

    • 核心代碼如下:

    // 微信文章長度限制,請點擊【閱讀原文】

    赞(0)

    分享創造快樂