歡迎光臨
每天分享高質量文章

分佈式事務 TCC-Transaction 原始碼解析 —— 事務儲存器

本文主要基於 TCC-Transaction 1.2.3.3 正式版

  • 1. 概述

  • 2. 序列化

    • 2.1 JDK 序列化實現

    • 2.2 Kyro 序列化實現

    • 2.3 JSON 序列化實現

  • 3. 儲存器

    • 3.1 可快取的事務儲存器抽象類

    • 3.2 JDBC 事務儲存器

    • 3.3 Redis 事務儲存器

    • 3.4 Zookeeper 事務儲存器

    • 3.5 File 事務儲存器

  • 666. 彩蛋


友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。

友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】】搞基嗨皮。


1. 概述

本文分享 事務儲存器。主要涉及如下 Maven 專案:

  • tcc-transaction-core :tcc-transaction 底層實現。

在 TCC 的過程中,根據應用記憶體中的事務信息完成整個事務流程。But 實際業務場景中,將事務信息只放在應用記憶體中是遠遠不夠可靠的。例如:

  1. 應用行程異常崩潰,未完成的事務信息將丟失。

  2. 應用行程集群,當提供遠程服務呼叫時,事務信息需要集群內共享。

  3. 發起事務的應用需要重啟部署新版本,因為各種原因,有未完成的事務。

因此,TCC-Transaction 將事務信息添加到記憶體中的同時,會使用外部儲存進行持久化。目前提供四種外部儲存:

  • JdbcTransactionRepository,JDBC 事務儲存器

  • RedisTransactionRepository,Redis 事務儲存器

  • ZooKeeperTransactionRepository,Zookeeper 事務儲存器

  • FileSystemTransactionRepository,File 事務儲存器

本文涉及到的類關係如下圖( 打開大圖 ):

你行好事會因為得到贊賞而愉悅 
同理,開源專案貢獻者會因為 Star 而更加有動力 
為 TCC-Transaction 點贊!傳送門

ps:筆者假設你已經閱讀過《tcc-transaction 官方文件 —— 使用指南1.2.x》。

2. 序列化

在《TCC-Transaction 原始碼分析 —— TCC 實現》「4. 事務與參與者」,可以看到 Transaction 是一個比較複雜的物件,內嵌 Participant 陣列,而 Participant 本身也是複雜的物件,內嵌了更多的其他物件,因此,儲存器在持久化 Transaction 時,需要序列化後才能儲存。

org.mengyun.tcctransaction.serializer.ObjectSerializer,物件序列化接口。實現代碼如下:

public interface ObjectSerializer<T> {

   byte[] serialize(T t);

   T deserialize(byte[] bytes);

}

目前提供 JDK自帶序列化 和 Kyro序列化 兩種實現。

2.1 JDK 序列化實現

org.mengyun.tcctransaction.serializer.JdkSerializationSerializer,JDK 序列化實現。比較易懂,點擊鏈接直接查看。

TCC-Transaction 使用的預設的序列化

2.2 Kyro 序列化實現

org.mengyun.tcctransaction.serializer.KryoTransactionSerializer,Kyro 序列化實現。比較易懂,點擊鏈接直接查看。

2.3 JSON 序列化實現

JDK 和 Kyro 的序列化實現,肉眼無法直觀具體儲存事務的信息,你可以通過實現 ObjectSerializer 接口,實現自定義的 JSON 序列化。

3. 儲存器

org.mengyun.tcctransaction.TransactionRepository,事務儲存器接口。實現代碼如下:

public interface TransactionRepository {

   /**
    * 新增事務
    *
    * @param transaction 事務
    * @return 新增數量
    */

   int create(Transaction transaction);

   /**
    * 更新事務
    *
    * @param transaction 事務
    * @return 更新數量
    */

   int update(Transaction transaction);

   /**
    * 刪除事務
    *
    * @param transaction 事務
    * @return 刪除數量
    */

   int delete(Transaction transaction);

   /**
    * 獲取事務
    *
    * @param xid 事務編號
    * @return 事務
    */

   Transaction findByXid(TransactionXid xid);

   /**
    * 獲取超過指定時間的事務集合
    *
    * @param date 指定時間
    * @return 事務集合
    */

   List findAllUnmodifiedSince(Date date);
}

不同的儲存器通過實現該接口,提供事務的增刪改查功能。

3.1 可快取的事務儲存器抽象類

org.mengyun.tcctransaction.repository.CachableTransactionRepository可快取的事務儲存器抽象類,實現增刪改查事務時,同時快取事務信息。在上面類圖,我們也可以看到 TCC-Transaction 自帶的多種儲存器都繼承該抽象類。

CachableTransactionRepository 構造方法實現代碼如下:

public abstract class CachableTransactionRepository implements TransactionRepository {

   /**
    * 快取過期時間
    */

   private int expireDuration = 120;
   /**
    * 快取
    */

   private Cache transactionXidCompensableTransactionCache;

   public CachableTransactionRepository() {
       transactionXidCompensableTransactionCache = CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();
   }
}
  • 使用 Guava Cache 記憶體快取事務信息,設置最大快取個數為 1000 個,快取過期時間為最後訪問時間 120 秒。


#create(...) 實現代碼如下:

@Override
public int create(Transaction transaction) {
  int result = doCreate(transaction);
  if (result > 0) {
      putToCache(transaction);
  }
  return result;
}

/**
* 添加到快取
*
* @param transaction 事務
*/

protected void putToCache(Transaction transaction) {
  transactionXidCompensableTransactionCache.put(transaction.getXid(), transaction);
}

/**
* 新增事務
*
* @param transaction 事務
* @return 新增數量
*/

protected abstract int doCreate(Transaction transaction);
  • 呼叫 #doCreate(...) 方法,新增事務。新增成功後,呼叫 #putToCache(...) 方法,添加事務到快取。

  • #doCreate(...) 為抽象方法,子類實現該方法,提供新增事務功能。


#update(...) 實現代碼如下:

@Override
public int update(Transaction transaction) {
  int result = 0;
  try {
      result = doUpdate(transaction);
      if (result > 0) {
          putToCache(transaction);
      } else {
          throw new OptimisticLockException();
      }
  } finally {
      if (result <= 0) { // 更新失敗,移除快取。下次訪問,從儲存器讀取
          removeFromCache(transaction);
      }
  }
  return result;
}

/**
* 移除事務從快取
*
* @param transaction 事務
*/

protected void removeFromCache(Transaction transaction) {
  transactionXidCompensableTransactionCache.invalidate(transaction.getXid());
}

/**
* 更新事務
*
* @param transaction 事務
* @return 更新數量
*/

protected abstract int doUpdate(Transaction transaction);
  • 呼叫 #doUpdate(...) 方法,更新事務。

    • 若更新成功後,呼叫 #putToCache(...) 方法,添加事務到快取。

    • 若更新失敗後,丟擲 OptimisticLockException 異常。有兩種情況會導致更新失敗:(1) 該事務已經被提交,被刪除;(2) 樂觀鎖更新時,快取的事務的版本號( Transaction.version )和儲存器里的事務的版本號不同,更新失敗。為什麼?在《TCC-Transaction 原始碼分析 —— 事務恢復》詳細解析。更新失敗,意味著快取已經不不一致,呼叫 #removeFromCache(...) 方法,移除事務從快取中。

  • #doUpdate(...) 為抽象方法,子類實現該方法,提供更新事務功能。


#delete(...) 實現代碼如下:

@Override
public int delete(Transaction transaction) {
  int result;
  try {
      result = doDelete(transaction);
  } finally {
      removeFromCache(transaction);
  }
  return result;
}

/**
* 刪除事務
*
* @param transaction 事務
* @return 刪除數量
*/

protected abstract int doDelete(Transaction transaction);
  • 呼叫 #doDelete(...) 方法,刪除事務。

  • 呼叫 #removeFromCache(...) 方法,移除事務從快取中。

  • #doDelete(...) 為抽象方法,子類實現該方法,提供刪除事務功能。


#findByXid(...) 實現代碼如下:

@Override
public Transaction findByXid(TransactionXid transactionXid) {
  Transaction transaction = findFromCache(transactionXid);
  if (transaction == null) {
      transaction = doFindOne(transactionXid);
      if (transaction != null) {
          putToCache(transaction);
      }
  }
  return transaction;
}

/**
* 獲得事務從快取中
*
* @param transactionXid 事務編號
* @return 事務
*/

protected Transaction findFromCache(TransactionXid transactionXid) {
  return transactionXidCompensableTransactionCache.getIfPresent(transactionXid);
}

/**
* 查詢事務
*
* @param xid 事務編號
* @return 事務
*/

protected abstract Transaction doFindOne(Xid xid);
  • 呼叫 #findFromCache() 方法,優先從快取中獲取事務。

  • 呼叫 #doFindOne() 方法,快取中事務不存在,從儲存器中獲取。獲取到後,呼叫 #putToCache() 方法,添加事務到快取中。

  • #doFindOne(...) 為抽象方法,子類實現該方法,提供查詢事務功能。


#findAllUnmodifiedSince(...) 實現代碼如下:

@Override
public List findAllUnmodifiedSince(Date date) {
  List transactions = doFindAllUnmodifiedSince(date);
  // 添加到快取
  for (Transaction transaction : transactions) {
      putToCache(transaction);
  }
  return transactions;
}

/**
* 獲取超過指定時間的事務集合
*
* @param date 指定時間
* @return 事務集合
*/

protected abstract List doFindAllUnmodifiedSince(Date date);
  • 呼叫 #findAllUnmodifiedSince(...) 方法,從儲存器獲取超過指定時間的事務集合。呼叫 #putToCache(...) 方法,迴圈事務集合添加到快取。

  • #doFindAllUnmodifiedSince(...) 為抽象方法,子類實現該方法,提供獲取超過指定時間的事務集合功能。

3.2 JDBC 事務儲存器

org.mengyun.tcctransaction.repository.JdbcTransactionRepository,JDBC 事務儲存器,通過 JDBC 驅動,將 Transaction 儲存到 MySQL / Oracle / PostgreSQL / SQLServer 等關係資料庫。實現代碼如下:

public class JdbcTransactionRepository extends CachableTransactionRepository {

   /**
    * 領域
    */

   private String domain;
   /**
    * 表後綴
    */

   private String tbSuffix;
   /**
    * 資料源
    */

   private DataSource dataSource;
   /**
    * 序列化
    */

   private ObjectSerializer serializer = new JdkSerializationSerializer();
}
  • domain,領域,或者也可以稱為模塊名,應用名,用於唯一標識一個資源。例如,Maven 模塊 xxx-order,我們可以配置該屬性為 ORDER

  • tbSuffix,表後綴。預設儲存表名為 TCC_TRANSACTION,配置表名後,為 TCC_TRANSACTION${tbSuffix}

  • dataSource,儲存資料的資料源。

  • serializer,序列化。當資料庫里已經有資料的情況下,不要更換別的序列化,否則會導致反序列化報錯。建議:TCC-Transaction 儲存時,新增欄位,記錄序列化的方式。

表結構如下:

CREATE TABLE `TCC_TRANSACTION` (
 `TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
 `DOMAIN` varchar(100) DEFAULT NULL,
 `GLOBAL_TX_ID` varbinary(32) NOT NULL,
 `BRANCH_QUALIFIER` varbinary(32) NOT NULL,
 `CONTENT` varbinary(8000) DEFAULT NULL,
 `STATUS` int(11) DEFAULT NULL,
 `TRANSACTION_TYPE` int(11) DEFAULT NULL,
 `RETRIED_COUNT` int(11) DEFAULT NULL,
 `CREATE_TIME` datetime DEFAULT NULL,
 `LAST_UPDATE_TIME` datetime DEFAULT NULL,
 `VERSION` int(11) DEFAULT NULL,
 PRIMARY KEY (`TRANSACTION_ID`),
 UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE
=InnoDB DEFAULT CHARSET=utf8
  • TRANSACTION_ID,僅僅資料庫自增,無實際用途。

  • CONTENT,Transaction 序列化。

ps:點擊鏈接查看 JdbcTransactionRepository 代碼實現,已經添加完整中文註釋。

3.3 Redis 事務儲存器

org.mengyun.tcctransaction.repository.RedisTransactionRepository,Redis 事務儲存器,將 Transaction 儲存到 Redis。實現代碼如下:

public class RedisTransactionRepository extends CachableTransactionRepository {

   /**
    * Jedis Pool
    */

   private JedisPool jedisPool;
   /**
    * key 前綴
    */

   private String keyPrefix = "TCC:";
   /**
    * 序列化
    */

   private ObjectSerializer serializer = new JdkSerializationSerializer();

}
  • keyPrefix,key 前綴。類似 JdbcTransactionRepository 的 domain 屬性。

一個事務儲存到 Reids,使用 Redis 的資料結構為 HASHES。

  • key : 使用 keyPrefix + xid,實現代碼如下:

    /**
    * 創建事務的 Redis Key
    *
    * @param keyPrefix key 前綴
    * @param xid 事務
    * @return Redis Key
    */

    public static byte[] getRedisKey(String keyPrefix, Xid xid) {
      byte[] prefix = keyPrefix.getBytes();
      byte[] globalTransactionId = xid.getGlobalTransactionId();
      byte[] branchQualifier = xid.getBranchQualifier();
      // 拼接 key
      byte[] key = new byte[prefix.length + globalTransactionId.length + branchQualifier.length];
      System.arraycopy(prefix, 0, key, 0, prefix.length);
      System.arraycopy(globalTransactionId, 0, key, prefix.length, globalTransactionId.length);
      System.arraycopy(branchQualifier, 0, key, prefix.length + globalTransactionId.length, branchQualifier.length);
      return key;
    }
  • HASHES 的 key :使用 version

    • 添加和更新 Transaction 時,使用 Redis HSETNX,不存在當前版本的值時,進行設置,重而實現類似樂觀鎖的更新。

    • 讀取 Transaction 時,使用 Redis HGETALL,將 Transaction 所有 version 對應的值讀取到記憶體後,取 version值最大的對應的值。

  • HASHES 的 value :呼叫 TransactionSerializer#serialize(...) 方法,序列化 Transaction。實現代碼如下:

    public static byte[] serialize(ObjectSerializer serializer, Transaction transaction) {
      Map map = new HashMap();
      map.put("GLOBAL_TX_ID", transaction.getXid().getGlobalTransactionId());
      map.put("BRANCH_QUALIFIER", transaction.getXid().getBranchQualifier());
      map.put("STATUS", transaction.getStatus().getId());
      map.put("TRANSACTION_TYPE", transaction.getTransactionType().getId());
      map.put("RETRIED_COUNT", transaction.getRetriedCount());
      map.put("CREATE_TIME", transaction.getCreateTime());
      map.put("LAST_UPDATE_TIME", transaction.getLastUpdateTime());
      map.put("VERSION", transaction.getVersion());
      // 序列化
      map.put("CONTENT", serializer.serialize(transaction));
      return serializer.serialize(map);
    }
    • TODO 為什麼序列化兩次

在實現 #doFindAllUnmodifiedSince(date) 方法,無法像資料庫使用時間條件進行過濾,因此,加載所有事務後在記憶體中過濾。實現代碼如下:

@Override
protected List doFindAllUnmodifiedSince(Date date) {
  // 獲得所有事務
  List allTransactions = doFindAll();
  // 過濾時間
  List allUnmodifiedSince = new ArrayList();
  for (Transaction transaction : allTransactions) {
      if (transaction.getLastUpdateTime().compareTo(date) < 0) {
          allUnmodifiedSince.add(transaction);
      }
  }
  return allUnmodifiedSince;
}

ps:點擊鏈接查看 RedisTransactionRepository 代碼實現,已經添加完整中文註釋。

FROM 《TCC-Transaction 官方文件 —— 使用指南1.2.x》 
使用 RedisTransactionRepository 需要配置 Redis 服務器如下: 
appendonly yes 
appendfsync always

3.4 Zookeeper 事務儲存器

org.mengyun.tcctransaction.repository.ZooKeeperTransactionRepository,Zookeeper 事務儲存器,將 Transaction 儲存到 Zookeeper。實現代碼如下:

public class ZooKeeperTransactionRepository extends CachableTransactionRepository {

   /**
    * Zookeeper 服務器地址陣列
    */

   private String zkServers;
   /**
    * Zookeeper 超時時間
    */

   private int zkTimeout;
   /**
    * TCC 儲存 Zookeeper 根目錄
    */

   private String zkRootPath = "/tcc";
   /**
    * Zookeeper 連接
    */

   private volatile ZooKeeper zk;
   /**
    * 序列化
    */

   private ObjectSerializer serializer = new JdkSerializationSerializer();
}
  • zkRootPath,儲存 Zookeeper 根目錄,類似 JdbcTransactionRepository 的 domain 屬性。

一個事務儲存到 Zookeeper,使用 Zookeeper 的持久資料節點

  • path:${zkRootPath} + / + ${xid}。實現代碼如下:

    // ZooKeeperTransactionRepository.java
    private String getTxidPath(Xid xid) {
      return String.format("%s/%s", zkRootPath, xid);
    }

    // TransactionXid.java
    @Override
    public String toString() {
      StringBuilder stringBuilder = new StringBuilder();
      stringBuilder.append("globalTransactionId:").append(UUID.nameUUIDFromBytes(globalTransactionId).toString());
      stringBuilder.append(",").append("branchQualifier:").append(UUID.nameUUIDFromBytes(branchQualifier).toString());
      return stringBuilder.toString();
    }
  • data:呼叫 TransactionSerializer#serialize(...) 方法,序列化 Transaction。

  • version:使用 Zookeeper 資料節點自帶版本功能。這裡要註意下,Transaction 的版本從 1 開始,而 Zookeeper 資料節點版本從 0 開始。

ps:點擊鏈接查看 ZooKeeperTransactionRepository 代碼實現,已經添加完整中文註釋。

另外,在生產上暫時不建議使用 ZooKeeperTransactionRepository,原因有兩點:

  • 不支持 Zookeeper 安全認證。

  • 使用 Zookeeper 時,未考慮斷網重連等情況。

如果你要使用 Zookeeper 進行事務的儲存,可以考慮使用 Apache Curator 操作 Zookeeper,重寫 ZooKeeperTransactionRepository 部分代碼。

3.5 File 事務儲存器

org.mengyun.tcctransaction.repository.FileSystemTransactionRepository,File 事務儲存器,將 Transaction 儲存到檔案系統。

實現上和 ZooKeeperTransactionRepository,區別主要在於不支持樂觀鎖更新。有興趣的同學點擊鏈接查看,這裡就不拓展開來。

另外,在生產上不建議使用 FileSystemTransactionRepository,因為不支持多節點共享。用分佈式儲存掛載檔案另說,當然還是不建議,因為不支持樂觀鎖併發更新。

666. 彩蛋

這篇略( 超 )微( 級 )水更,哈哈哈,為《TCC-Transaction 原始碼分析 —— 事務恢復》做鋪墊啦。

使用 RedisTransactionRepository 和 ZooKeeperTransactionRepository 儲存事務還是 Get 蠻多點的。

胖友,分享一個朋友圈可好?

赞(0)

分享創造快樂