歡迎光臨
每天分享高質量文章

深度剖析一站式分佈式事務方案 Seata-Server

 

本文作者:李釗,公眾號「咖啡拿鐵」作者,分佈式事務 Seata 社區 Contributor。

1.關於 Seata

在前不久,我寫了一篇關於分佈式事務中間件 Fescar 的解析,沒過幾天 Fescar 團隊對其進行了品牌升級,取名為 Seata(Simpe Extensible Autonomous Transcaction Architecture),而以前的 Fescar 的英文全稱為 Fast & EaSy Commit And Rollback。可以看見 Fescar 從名字上來看更加局限於 Commit 和 Rollback,而新的品牌名字 Seata 旨在打造一套一站式分佈式事務解決方案。更換名字之後,我對其未來的發展更有信心。

這裡先大概回憶一下 Seata 的整個過程模型:

  • TM:事務的發起者。用來告訴 TC,全域性事務的開始,提交,回滾。

  • RM:具體的事務資源,每一個 RM 都會作為一個分支事務註冊在 TC。

  • TC 事務的協調者。也可以看做是 Fescar-Server,用於接收我們的事務的註冊,提交和回滾。

在之前的文章中對整個角色有個大體的介紹,在這篇文章中我將重點介紹其中的核心角色 TC,也就是事務協調器。

2.Transaction Coordinator

為什麼之前一直強調 TC 是核心呢?那因為 TC 這個角色就好像上帝一樣,管控著芸芸眾生的 RM 和 TM。如果 TC 一旦不好使,那麼 RM 和 TM 一旦出現小問題,那必定會亂的一塌糊塗。所以要想瞭解 Seata,那麼必須要瞭解他的 TC。

那麼一個優秀的事務協調者應該具備哪些能力呢?我覺得應該有以下幾個:

  • 正確的協調:能正確的協調 RM 和 TM 接下來應該做什麼,做錯了應該怎麼辦,做對了應該怎麼辦。

  • 高可用:事務協調器在分佈式事務中很重要,如果不能保證高可用,那麼他也沒有存在的必要了。

  • 高性能:事務協調器的性能一定要高,如果事務協調器性能有瓶頸,那麼他所管理的 RM 和 TM 會經常遇到超時,從而引起回滾頻繁。

  • 高擴展性:這個特點是屬於代碼層面的,如果是一個優秀的框架,那麼需要給使用方很多自定義擴展,比如服務註冊/發現,讀取配置等等。

下麵我也將逐步闡述 Seata 是如何做到上面四點。

2.1 Seata-Server 的設計

Seata-Server 整體的模塊圖如上所示:

  • Coordinator Core:最下麵的模塊是事務協調器核心代碼,主要用來處理事務協調的邏輯,如是否 Commit、Rollback 等協調活動。

  • Store:儲存模塊,用來將資料持久化,防止重啟或者宕機資料丟失。

  • Discover:服務註冊/發現模塊,用於將 Server 地址暴露給 Client。

  • Config:用來儲存和查找服務端的配置。

  • Lock:鎖模塊,用於給 Seata 提供全域性鎖的功能。

  • Rpc:用於和其他端通信。

  • HA-Cluster:高可用集群,目前還沒開源,為 Seata 提供可靠的高可用功能。

2.2 Discover

首先來講講比較基礎的 Discover 模塊,又稱服務註冊/發現模塊。我們將 Seata-Server 啟動之後,需要將自己的地址暴露給其他使用者,那麼就需要這個模塊幫忙。

這個模塊有個核心接口 RegistryService,如上圖所示:

  • register:服務端使用,進行服務註冊。

  • unregister:服務端使用,一般在 JVM 關閉鉤子,ShutdownHook 中呼叫。

  • subscribe:客戶端使用,註冊監聽事件,用來監聽地址的變化。

  • unsubscribe:客戶端使用,取消註冊監聽事件。

  • lookup:客戶端使用,根據 Key 查找服務地址串列。

  • close:都可以使用,用於關閉 Register 資源。

如果需要添加自己定義的服務註冊/發現,那麼實現這個接口即可。截止目前在社區的不斷開發推動下,已經有四種服務註冊/發現,分別是 redis、zk、nacos、eruka。下麵簡單介紹下 Nacos 的實現:

2.2.1 register 接口

step1:校驗地址是否合法;

step2:獲取 Nacos 的 Name 實體,然後將地址註冊到當前 Cluster 名稱上面。

unregister 接口類似,這裡不做詳解。

2.2.2 lookup 接口

step1:獲取當前 clusterName 名字;

step2:判斷當前 Cluster 是否已經獲取過了,如果獲取過就從 Map 中取;

step3:從 Nacos 拿到地址資料,將其轉換成所需要的;

step4:將事件變動的 Listener 註冊到 Nacos。

2.2.3 subscribe 接口

這個接口比較簡單,具體分兩步:

step1:將 Clstuer 和 Listener 添加進 Map 中;

step2:向 Nacos 註冊。

2.3 Config

配置模塊也是一個比較基礎,比較簡單的模塊。我們需要配置一些常用的引數比如:Netty 的 Select 執行緒數量,Work 執行緒數量,Session 允許最大為多少等等,當然這些引數在 Seata 中都有自己的預設設置。

同樣的在 Seata 中也提供了一個接口 Configuration,用來自定義需要的獲取配置的地方:

  • getInt/Long/Boolean/Config():通過 DataId 來獲取對應的值。

  • putConfig:用於添加配置。

  • removeConfig:刪除一個配置。

  • add/remove/get ConfigListener:添加/刪除/獲取 配置監聽器,一般用來監聽配置的變更。

目前為止有四種方式獲取 Config:File(檔案獲取)、Nacos、Apollo、ZK。在 Seata 中首先需要配置 registry.conf,來配置 conf 的型別。實現 conf 比較簡單這裡就不深入分析。

2.4 Store

儲存層的實現對於 Seata 是否高性能,是否可靠非常關鍵。
如果儲存層沒有實現好,那麼如果發生宕機,在 TC 中正在進行分佈式事務處理的資料將會被丟失。既然使用了分佈式事務,那麼其肯定不能容忍丟失。如果儲存層實現好了,但是其性能有很大問題,RM 可能會發生頻繁回滾那麼其完全無法應對高併發的場景。

在 Seata 中預設提供了檔案方式的儲存,下麵定義儲存的資料為 Session,而 TM 創造的全域性事務資料叫 GlobalSession,RM 創造的分支事務叫 BranchSession,一個 GlobalSession 可以擁有多個 BranchSession。我們的目的就是要將這麼多 Session 儲存下來。

在 FileTransactionStoreManager#writeSession 代碼中:

上面的代碼主要分為下麵幾步:

step1:生成一個 TransactionWriteFuture。

step2:將這個 futureRequest 丟進一個 LinkedBlockingQueue 中。為什麼需要將所有資料都丟進佇列中呢?當然這裡其實也可以用鎖來實現,在另外一個阿裡開源的 RocketMQ 中使用的鎖。不論是佇列還是鎖,他們的目的是為了保證單執行緒寫,這又是為什麼呢?有人會解釋說,需要保證順序寫,這樣速度就很快,這個理解是錯誤的,我們的 FileChannel 其實是執行緒安全的,已經能保證順序寫了。保證單執行緒寫其實是為了讓這個寫邏輯都是單執行緒的,因為可能有些檔案寫滿或者記錄寫資料位置等等邏輯,當然這些邏輯都可以主動加鎖去做,但是為了實現簡單方便,直接再整個寫邏輯加鎖是最為合適的。

step3:呼叫 future.get,等待該條資料寫邏輯完成通知。

我們將資料提交到佇列之後,接下來需要對其進行消費,代碼如下:

這裡將一個 WriteDataFileRunnable() 提交進執行緒池,這個 Runnable 的 run() 方法如下:

分為下麵幾步:

step1:判斷是否停止,如果 stopping 為 true 則傳回 null。

step2:從佇列中獲取資料。

step3:判斷 future 是否已經超時了,如果超時,則設置結果為 false,此時我們生產者 get() 方法會接觸阻塞。

step4:將資料寫進檔案,此時資料還在 pageCache 層並沒有掃清到磁盤,如果寫成功然後根據條件判斷是否進行刷盤操作。

step5:當寫入數量到達一定的時候,或者寫入時間到達一定的時候,需要將當前的檔案儲存為歷史檔案,刪除以前的歷史檔案,然後創建新的檔案。這一步是為了防止檔案無限增長,大量無效資料浪費磁盤資源。

在 writeDataFile 中有如下代碼:

step1:首先獲取 ByteBuffer,如果超出最大迴圈 BufferSize 就直接創建一個新的,否則就使用快取的 Buffer。這一步可以很大的減少 GC。

step2:然後將資料添加進入 ByteBuffer。

step3:最後將 ByteBuffer 寫入 fileChannel,這裡會重試三次。此時的資料還在 pageCache 層,受兩方面的影響,OS 有自己的掃清策略,但是這個業務程式不能控制,為了防止宕機等事件出現造成大量資料丟失,所以就需要業務自己控制 flush。下麵是 flush 的代碼:

這裡 flush 的條件寫入一定數量或者寫的時間超過一定時間,這樣也會有個小問題如果是停電,那麼 pageCache 中有可能還有資料並沒有被刷盤,會導致少量的資料丟失。目前還不支持同步樣式,也就是每條資料都需要做刷盤操作,這樣可以保證每條訊息都落盤,但是性能也會受到極大的影響,當然後續會不斷的演進支持。

Store 核心流程主要是上面幾個方法,當然還有一些比如 Session 重建等,這些比較簡單,讀者可以自行閱讀。

2.5 Lock

大家知道資料庫實現隔離級別主要是通過鎖來實現的,同樣的再分佈式事務框架 Seata 中要實現隔離級別也需要通過鎖。一般在資料庫中資料庫的隔離級別一共有四種:讀未提交、讀已提交、可重覆讀、串行化。在 Seata 中可以保證寫的互斥,而讀的隔離級別一般是未提交,但是提供了達到讀已提交隔離的手段。

Lock 模塊也就是 Seata 實現隔離級別的核心模塊。在 Lock 模塊中提供了一個接口用於管理鎖:

其中有三個方法:

  • acquireLock:用於對 BranchSession 加鎖,這裡雖然是傳的分支事務 Session,實際上是對分支事務的資源加鎖,成功傳回 true。

  • isLockable:根據事務 ID,資源 ID,鎖住的 Key 來查詢是否已經加鎖。

  • cleanAllLocks:清除所有的鎖。

對於鎖我們可以在本地實現,也可以通過 redis 或者 mysql 來幫助我們實現。官方預設提供了本地全域性鎖的實現:

在本地鎖的實現中有兩個常量需要關註:

  • BUCKET_PER_TABLE:用來定義每個 table 有多少個 bucket,目的是為了後續對同一個表加鎖的時候減少競爭。

  • LOCK_MAP:這個 Map 從定義上來看非常複雜,裡裡外外套了很多層 Map,這裡用個表格具體說明一下:

層數

key

value

1-LOCK_MAP

resourceId(jdbcUrl)

dbLockMap

2-dbLockMap

tableName (表名)

tableLockMap

3-tableLockMap

PK.hashcode%Bucket (主鍵值的 hashcode%bucket)

bucketLockMap

4-bucketLockMap

PK

trascationId

可以看見實際上的加鎖在 bucketLockMap 這個 Map 中,這裡具體的加鎖方法比較簡單就不作詳細闡述,主要是逐步的找到 bucketLockMap ,然後將當前 TrascationId 塞進去,如果這個主鍵當前有 TranscationId,那麼比較是否是自己,如果不是則加鎖失敗。

2.6 RPC

保證 Seata 高性能的關鍵之一也是使用了 Netty 作為 RPC 框架,採用預設配置的執行緒模型如下圖所示:

如果採用預設的基本配置那麼會有一個 Acceptor 執行緒用於處理客戶端的鏈接,會有 cpu*2 數量的 NIO-Thread,再這個執行緒中不會做業務太重的事情,只會做一些速度比較快的事情,比如編解碼,心跳事件和TM註冊。一些比較費時間的業務操作將會交給業務執行緒池,預設情況下業務執行緒池配置為最小執行緒為 100,最大為 500。

這裡需要提一下的是 Seata 的心跳機制,這裡是使用 Netty 的 IdleStateHandler 完成的,如下:

在 Server 端對於寫沒有設置最大空閑時間,對於讀設置了最大空閑時間,預設為 15s,如果超過 15s 則會將鏈接斷開,關閉資源。

step1:判斷是否是讀空閑的檢測事件;

step2:如果是,則斷開鏈接,關閉資源。

2.7 HA-Cluster

目前官方沒有公佈 HA-Cluster,但是通過一些其他中間件和官方的一些透露,可以將 HA-Cluster 用如下方式設計:

具體的流程如下:

step1:客戶端發佈信息的時候根據 TranscationId 保證同一個 Transcation 是在同一個 Master 上,通過多個 Master 水平擴展,提供併發處理性能。

step2:在 Server 端中一個 Master 有多個 Slave,Master 中的資料近實時同步到 Slave上,保證當 Master 宕機的時候,還能有其他 Slave 頂上來可以用。

當然上述一切都是猜測,具體的設計實現還得等 0.5 版本之後。目前有一個 Go 版本的 Seata-Server 也捐贈給了 Seata (還在流程中),其通過 Raft 實現副本一致性,其他細節不是太清楚。

2.8 Metrics & Tracing

這個模塊也是一個沒有具體公佈實現的模塊,當然有可能會提供插件口,讓其他第三方 metric 接入進來。另外最近 Apache SkyWalking 正在和 Seata 小組商討如何接入進來。

3.Coordinator Core

上面我們講了很多 Server 基礎模塊,想必大家對 Seata 的實現已經有個大概,接下來我會講解事務協調器具體邏輯是如何實現的,讓大家更加瞭解 Seata 的實現內幕。

3.1 啟動流程

啟動方法在 Server 類有個 main 方法,定義了我們啟動流程:

step1:創建一個 RpcServer,再這個裡麵包含了我們網絡的操作,用 Netty 實現了服務端。

step2:解析端口號和檔案地址。

step3:初始化 SessionHolder,其中最重要的重要就是重我們 dataDir 這個檔案夾中恢復我們的資料,重建我們的Session。

step4:創建一個CoorDinator,這個也是我們事務協調器的邏輯核心代碼,然後將其初始化,其內部初始化的邏輯會創建四個定時任務:

  • retryRollbacking:重試 rollback 定時任務,用於將那些失敗的 rollback 進行重試的,每隔 5ms 執行一次。

  • retryCommitting:重試 commit 定時任務,用於將那些失敗的 commit 進行重試的,每隔 5ms 執行一次。

  • asyncCommitting:異步 commit 定時任務,用於執行異步的 commit,每隔 10ms 一次。

  • timeoutCheck:超時定時任務檢測,用於檢測超時的任務,然後執行超時的邏輯,每隔 2ms 執行一次。

step5: 初始化 UUIDGenerator 這個也是我們生成各種 ID(transcationId,branchId) 的基本類。

step6:將本地 IP 和監聽端口設置到 XID 中,初始化 RpcServer 等待客戶端的連接。

啟動流程比較簡單,下麵我會介紹分佈式事務框架中的常見的一些業務邏輯 Seata 是如何處理的。

3.2 Begin – 開啟全域性事務

一次分佈式事務的起始點一定是開啟全域性事務,首先我們看看全域性事務 Seata 是如何實現的:

step1: 根據應用 ID,事務分組,名字,超時時間創建一個 GlobalSession,這個再前面也提到過他和 branchSession 分別是什麼。

step2:對其添加一個 RootSessionManager 用於監聽一些事件,這裡要說一下目前在 Seata 裡面有四種型別的 Listener (這裡要說明的是所有的 sessionManager 都實現了 SessionLifecycleListener):

  • ROOTSESSIONMANAGER:最全,最大的,擁有所有的 Session。

  • ASYNCCOMMITTINGSESSION_MANAGER:用於管理需要做異步 commit 的 Session。

  • RETRYCOMMITTINGSESSION_MANAGER:用於管理重試 commit 的 Session。

  • RETRYROLLBACKINGSESSION_MANAGER:用於管理重試回滾的 Session。
    由於這裡是開啟事務,其他 SessionManager 不需要關註,我們只添加 RootSessionManager 即可。

step3:開啟 GlobalSession:

這一步會把狀態變為 Begin,記錄開始時間,並且呼叫 RootSessionManager的 onBegin 監聽方法,將 Session 儲存到 Map 並寫入到我們的檔案。

step4:最後傳回 XID,這個 XID 是由 ip+port+transactionId 組成的,非常重要,當 TM 申請到之後需要將這個 ID 傳到 RM 中,RM 通過 XID 來決定到底應該訪問哪一臺 Server。

3.3 BranchRegister – 分支事務註冊

當全域性事務在 TM 開啟之後,RM 的分支事務也需要註冊到全域性事務之上,這裡看看是如何處理的:

step1:通過 transactionId 獲取並校驗全域性事務是否是開啟狀態。

step2:創建一個新的分支事務,也就是 BranchSession。

step3:對分支事務進行加全域性鎖,這裡的邏輯就是使用鎖模塊的邏輯。

step4:添加 branchSession,主要是將其添加到 GlobalSession 物件中,並寫入到我們的檔案中。

step5:傳回 branchId,這個 ID 也很重要,我們後續需要用它來回滾我們的事務,或者對我們分支事務狀態更新。

分支事務註冊之後,還需要彙報分支事務的後續狀態到底是成功還是失敗,在 Server 目前只是簡單的做一下儲存記錄,彙報的目的是,就算這個分支事務失敗,如果 TM 還是執意要提交全域性事務,那麼再遍歷提交分支事務的時候,這個失敗的分支事務就不需要提交。

3.4 GlobalCommit – 全域性提交

當分支事務執行完成之後,就輪到 TM – 事務管理器來決定是提交還是回滾,如果是提交,那麼就會走到下麵的邏輯:

step1:首先找到 GlobalSession。如果他為 Null 證明已經被 Commit 過了,那麼直接冪等操作,傳回成功。

step2:關閉  GlobalSession 防止再次有新的 branch 進來。

step3:如果 status 是等於 Begin,那麼久證明還沒有提交過,改變其狀態為 Committing 也就是正在提交。

step4:判斷是否是可以異步提交,目前只有AT樣式可以異步提交,因為是通過 Undolog 的方式去做的。MT 和 TCC 都需要走同步提交的代碼。

step5:如果是異步提交,直接將其放進 ASYNCCOMMITTINGSESSION_MANAGER,讓其再後臺執行緒異步去做  step6,如果是同步的那麼直接執行 step6。

step6:遍歷 BranchSession 進行提交,如果某個分支事務失敗,根據不同的條件來判斷是否進行重試,異步不需要重試,因為其本身都在 manager 中,只要沒有成功就不會被刪除會一直重試,如果是同步提交的會放進異步重試佇列進行重試。

3.5 GlobalRollback – 全域性回滾

如果 TM 決定全域性回滾,那麼會走到下麵的邏輯:

這個邏輯和提交流程基本一致,可以看作是他的反向,這裡就不展開講了。

4.總結

最後再總結一下開始我們提出了分佈式事務的關鍵四點,Seata 到底是怎麼解決的:

  • 正確的協調:通過後臺定時任務各種正確的重試,並且未來會推出監控平臺有可能可以手動回滾。

  • 高可用: 通過 HA-Cluster 保證高可用。

  • 高性能:檔案順序寫,RPC 通過 Netty 實現,Seata 未來可以水平擴展,提高處理性能。

  • 高擴展性:提供給用戶可以自由實現的地方,比如配置,服務發現和註冊,全域性鎖等等。

最後希望大家能從這篇文章能瞭解 Seata-Server 的核心設計原理,當然你也可以想象如果你自己去實現一個分佈式事務的 Server 應該怎樣去設計?

文中涉及的相關鏈接

    赞(0)

    分享創造快樂