歡迎光臨
每天分享高質量文章

高可用分佈式儲存 etcd 的實現原理

原文鏈接:https://draveness.me/etcd-introduction
在上一篇文章《詳解分佈式協調服務 ZooKeeper》中,我們介紹過分佈式協調服務 ZooKeeper 的實現原理以及應用,今天想要介紹的 etcd 其實也是在生產環境中經常被使用的協調服務,它與 ZooKeeper 一樣,也能夠為整個集群提供服務發現、配置以及分佈式協調的功能。
這篇文章將會介紹 etcd 的實現原理,其中包括 Raft 協議、儲存兩大模塊,在最後我們也會簡單介紹 etcd 一些具體應用場景。
簡介

 

etcd 的官方將它定位成一個可信賴的分佈式鍵值儲存服務,它能夠為整個分佈式集群儲存一些關鍵資料,協助分佈式集群的正常運轉。 
我們可以簡單看一下 etcd 和 ZooKeeper 在定義上有什麼不同:
  • etcd is a distributed reliable key-value store for the most critical data of a distributed system…

  • ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

其中前者是一個用於儲存關鍵資料的鍵值儲存,後者是一個用於管理配置等信息的中心化服務。
etcd 的使用其實非常簡單,它對外提供了 gRPC 接口,我們可以通過 Protobuf 和 gRPC 直接對 etcd 中儲存的資料進行管理,也可以使用官方提供的 etcdctl 操作儲存的資料。
  1. service KV {
  2.  rpc Range(RangeRequest) returns (RangeResponse) {
  3.      option (google.api.http) = {
  4.        post: "/v3beta/kv/range"
  5.        body: "*"
  6.    };
  7.  }
  8.  
  9.  rpc Put(PutRequest) returns (PutResponse) {
  10.      option (google.api.http) = {
  11.        post: "/v3beta/kv/put"
  12.        body: "*"
  13.    };
  14.  }
  15. }
文章並不會展開介紹 etcd 的使用方法,這一小節將逐步介紹幾大核心模塊的實現原理,包括 etcd 使用 Raft 協議同步各個節點資料的過程以及 etcd 底層儲存資料使用的結構。
Raft

 

在每一個分佈式系統中,etcd 往往都扮演了非常重要的地位,由於很多服務配置發現以及配置的信息都儲存在 etcd 中,所以整個集群可用性的上限往往就是 etcd 的可用性,而使用 3 ~ 5 個 etcd 節點構成高可用的集群往往都是常規操作。 
正是因為 etcd 在使用的過程中會啟動多個節點,如何處理幾個節點之間的分佈式一致性就是一個比較有挑戰的問題了。
解決多個節點資料一致性的方案其實就是共識演算法,在之前的文章中我們簡單介紹過 ZooKeeper 使用的 Zab 協議 以及常見的 共識演算法 Paxos 和 Raft,etcd 使用的共識演算法就是 Raft,這一節我們將詳細介紹 Raft 以及 etcd 中 Raft 的一些實現細節。
介紹
Raft 從一開始就被設計成一個易於理解和實現的共識演算法,它在容錯和性能上與 Paxos 協議比較類似,區別在於它將分佈式一致性的問題分解成了幾個子問題,然後一一進行解決。
每一個 Raft 集群中都包含多個服務器,在任意時刻,每一臺服務器只可能處於 Leader、Follower 以及 Candidate 三種狀態;在處於正常的狀態時,集群中只會存在一個 Leader,其餘的服務器都是 Follower。 
*上述圖片修改自 In Search of an Understandable Consensus Algorithm(https://raft.github.io/raft.pdf)一文 5.1 小結中圖四。
所有的 Follower 節點都是被動的,它們不會主動發出任何的請求,只會響應 Leader 和 Candidate 發出的請求,對於每一個用戶的可變操作,都會被路由給 Leader 節點進行處理,除了 Leader 和 Follower 節點之外,Candidate 節點其實只是集群運行過程中的一個臨時狀態。
Raft 集群中的時間也被切分成了不同的幾個任期(Term),每一個任期都會由 Leader 的選舉開始,選舉結束後就會進入正常操作的階段,直到 Leader 節點出現問題才會開始新一輪的選擇。 
每一個服務器都會儲存當前集群的最新任期,它就像是一個單調遞增的邏輯時鐘,能夠同步各個節點之間的狀態,當前節點持有的任期會隨著每一個請求被傳遞到其他的節點上。
Raft 協議在每一個任期的開始時都會從一個集群中選出一個節點作為集群的 Leader 節點,這個節點會負責集群中的日誌的複製以及管理工作。
 
我們將 Raft 協議分成三個子問題:節點選舉、日誌複製以及安全性,文章會以 etcd 為例介紹 Raft 協議是如何解決這三個子問題的。
節點選舉
使用 Raft 協議的 etcd 集群在啟動節點時,會遵循 Raft 協議的規則,所有節點一開始都被初始化為 Follower 狀態,新加入的節點會在 NewNode 中做一些配置的初始化,包括用於接收各種信息的 Channel:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/node.go#L190-225
  2. func StartNode(c *Config, peers []Peer) Node {
  3.    r := newRaft(c)
  4.    r.becomeFollower(1, None)
  5.    r.raftLog.committed = r.raftLog.lastIndex()
  6.    for _, peer := range peers {
  7.        r.addNode(peer.ID)
  8.    }
  9.  
  10.    n := newNode()
  11.    go n.run(r)
  12.    return &n
  13. }
在做完這些初始化的節點和 Raft 配置的事情之後,就會進入一個由 for 和 select 組成的超大型迴圈,這個迴圈會從 Channel 中獲取待處理的事件:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/node.go#L291-423
  2. func (n *node) run(r *raft) {
  3.    lead := None
  4.  
  5.    for {
  6.        if lead != r.lead {
  7.            lead = r.lead
  8.        }
  9.  
  10.        select {
  11.        case m := n.recvc:
  12.            r.Step(m)
  13.        case n.tickc:
  14.            r.tick()
  15.        case n.stop:
  16.            close(n.done)
  17.            return
  18.        }
  19.    }
  20. }
作者對整個迴圈內的代碼進行了簡化,因為當前只需要關心三個 Channel 中的訊息,也就是用於接受其他節點訊息的 recvc、用於觸發定時任務的 tickc 以及用於暫停當前節點的 stop。 
除了 stop Channel 中介紹到的訊息之外,recvc 和 tickc 兩個 Channel 中介紹到事件時都會交給當前節點持有 Raft 結構體處理。
 
定時器與心跳
當節點從任意狀態(包括啟動)呼叫 becomeFollower 時,都會將節點的定時器設置為 tickElection:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L636-643
  2. func (r *raft) tickElection() {
  3.    r.electionElapsed++
  4.  
  5.    if r.promotable() && r.pastElectionTimeout() {
  6.        r.electionElapsed = 0
  7.        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  8.    }
  9. }
如果當前節點可以成為 Leader 並且上一次收到 Leader 節點的訊息或者心跳已經超過了等待的時間,當前節點就會發送 MsgHup 訊息嘗試開始新的選舉。
但是如果 Leader 節點正常運行,就能夠同樣通過它的定時器 tickHeartbeat 向所有的 Follower 節點廣播心跳請求,也就是 MsgBeat 型別的 RPC 訊息:
  1. func (r *raft) tickHeartbeat() {
  2.    r.heartbeatElapsed++
  3.    r.electionElapsed++
  4.  
  5.    if r.heartbeatElapsed >= r.heartbeatTimeout {
  6.        r.heartbeatElapsed = 0
  7.        r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  8.    }
  9. }
上述代碼段 Leader 節點中呼叫的 Step 函式,最終會呼叫 stepLeader 方法,該方法會根據訊息的型別進行不同的處理:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L931-1142
  2. func stepLeader(r *raft, m pb.Message) error {
  3.    switch m.Type {
  4.    case pb.MsgBeat:
  5.        r.bcastHeartbeat()
  6.        return nil
  7.    // ...
  8.    }
  9.  
  10.    //...
  11. }
bcastHeartbeat 方法最終會向所有的 Follower 節點發送 MsgHeartbeat 型別的訊息,通知它們目前 Leader 的存活狀態,重置所有 Follower 持有的超時計時器。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L518-534
  2. func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
  3.    commit := min(r.getProgress(to).Match, r.raftLog.committed)
  4.    m := pb.Message{
  5.        To:      to,
  6.        Type:    pb.MsgHeartbeat,
  7.        Commit:  commit,
  8.        Context: ctx,
  9.    }
  10.  
  11.    r.send(m)
  12. }
作為集群中的 Follower,它們會在 stepFollower 方法中處理接收到的全部訊息,包括 Leader 節點發送的心跳 RPC 訊息:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L1191-1247
  2. func stepFollower(r *raft, m pb.Message) error {
  3.    switch m.Type {
  4.    case pb.MsgHeartbeat:
  5.        r.electionElapsed = 0
  6.        r.lead = m.From
  7.        r.handleHeartbeat(m)
  8.    // ...
  9.    }
  10.    return nil
  11. }
當 Follower 接受到了來自 Leader 的 RPC 訊息 MsgHeartbeat 時,會將當前節點的選舉超時時間重置並通過 handleHeartbeat 向 Leader 節點發出響應 —— 通知 Leader 當前節點能夠正常運行。
而 Candidate 節點對於 MsgHeartBeat 訊息的處理會稍有不同,它會先執行 becomeFollower 設置當前節點和 Raft 協議的配置:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L1146-1189
  2. func stepCandidate(r *raft, m pb.Message) error {
  3.  // ...
  4.    switch m.Type {
  5.    case pb.MsgHeartbeat:
  6.        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
  7.        r.handleHeartbeat(m)
  8.    }
  9.  // ...
  10.    return nil
  11. }
Follower 與 Candidate 會根據節點型別的不同做出不同的響應,兩者收到心跳請求時都會重置節點的選舉超時時間,不過後者會將節點的狀態直接轉變成 Follower: 
當 Leader 節點收到心跳的響應時就會將對應節點的狀態設置為 Active,如果 Follower 節點在一段時間內沒有收到來自 Leader 節點的訊息就會嘗試發起競選。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L636-643
  2. func (r *raft) tickElection() {
  3.    r.electionElapsed++
  4.  
  5.    if r.promotable() && r.pastElectionTimeout() {
  6.        r.electionElapsed = 0
  7.        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  8.    }
  9. }
到了這裡,心跳機制就起到了作用開始發送 MsgHup 嘗試重置整個集群中的 Leader 節點,接下來我們就會開始分析 Raft 協議中的競選流程了。
 
競選流程
如果集群中的某一個 Follower 節點長時間內沒有收到來自 Leader 的心跳請求,當前節點就會通過 MsgHup 訊息進入預選舉或者選舉的流程。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L785-927
  2. func (r *raft) Step(m pb.Message) error {
  3.  // ...
  4.  
  5.    switch m.Type {
  6.    case pb.MsgHup:
  7.        if r.state != StateLeader {
  8.            if r.preVote {
  9.                r.campaign(campaignPreElection)
  10.            } else {
  11.                r.campaign(campaignElection)
  12.            }
  13.        } else {
  14.            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
  15.        }
  16.    }
  17.  // ...
  18.  return nil
  19. }
如果收到 MsgHup 訊息的節點不是 Leader 狀態,就會根據當前集群的配置選擇進入 PreElection 或者 Election 階段,PreElection 階段並不會真正增加當前節點的 Term,它的主要作用是得到當前集群能否成功選舉出一個 Leader 的答案,如果當前集群中只有兩個節點而且沒有預選舉階段,那麼這兩個節點的 Term 會無休止的增加,預選舉階段就是為瞭解決這一問題而出現的。 
在這裡不會討論預選舉的過程,而是將目光主要放在選舉階段,具體瞭解一下使用 Raft 協議的 etcd 集群是如何從眾多節點中選出 Leader 節點的。
我們可以繼續來分析 campaign 方法的具體實現,下麵就是刪去預選舉相關邏輯後的代碼:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L730-766
  2. func (r *raft) campaign(t CampaignType) {
  3.    r.becomeCandidate()
  4.  
  5.    if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
  6.        r.becomeLeader()
  7.        return
  8.    }
  9.    for id := range r.prs {
  10.        if id == r.id {
  11.            continue
  12.        }
  13.  
  14.        r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
  15.    }
  16. }
當前節點會立刻呼叫 becomeCandidate 將當前節點的 Raft 狀態變成候選人;在這之後,它會將票投給自己,如果當前集群只有一個節點,該節點就會直接成為集群中的 Leader 節點。
如果集群中存在了多個節點,就會向集群中的其他節點發出 MsgVote 訊息,請求其他節點投票,在 Step 函式中包含不同狀態的節點接收到訊息時的響應:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L785-927
  2. func (r *raft) Step(m pb.Message) error {
  3.  // ...
  4.  
  5.    switch m.Type {
  6.    case pb.MsgVote, pb.MsgPreVote:
  7.        canVote := r.Vote == m.From || (r.Vote == None && r.lead == None)
  8.        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  9.            r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp})
  10.            r.electionElapsed = 0
  11.            r.Vote = m.From
  12.        } else {
  13.            r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true})
  14.        }
  15.  
  16.    }
  17.  // ...
  18.  return nil
  19. }
如果當前節點投的票就是訊息的來源或者當前節點沒有投票也沒有 Leader,那麼就會向來源的節點投票,否則就會通知該節點當前節點拒絕投票。 
在 stepCandidate 方法中,候選人節點會處理來自其他節點的投票響應訊息,也就是 MsgVoteResp:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L1146-1189
  2. func stepCandidate(r *raft, m pb.Message) error {
  3.    switch m.Type {
  4.    // ...
  5.    case pb.MsgVoteResp:
  6.        gr := r.poll(m.From, m.Type, !m.Reject)
  7.        switch r.quorum() {
  8.        case gr:
  9.            r.becomeLeader()
  10.            r.bcastAppend()
  11.        // ...
  12.        }
  13.    }
  14.    return nil
  15. }
每當收到一個 MsgVoteResp 型別的訊息時,就會設置當前節點持有的 votes 陣列,更新其中儲存的節點投票狀態並傳回投『同意』票的人數,如果獲得的票數大於法定人數 quorum,當前節點就會成為集群的 Leader 並向其他的節點發送當前節點當選的訊息,通知其餘節點更新 Raft 結構體中的 Term 等信息。
節點狀態
對於每一個節點來說,它們根據不同的節點狀態會對網絡層發來的訊息做出不同的響應,我們會分別介紹下麵的四種狀態在 Raft 中對於配置和訊息究竟是如何處理的。 
對於每一個 Raft 的節點狀態來說,它們分別有三個比較重要的區別,其中一個是在改變狀態時呼叫 becomeLeader、becomeCandidate、becomeFollower 和 becomePreCandidate 方法改變 Raft 狀態有比較大的不同,第二是處理訊息時呼叫 stepLeader、stepCandidate 和 stepFollower 時有比較大的不同,最後是幾種不同狀態的節點具有功能不同的定時任務。
對於方法的詳細處理,我們在這一節中不詳細介紹和分析,如果一個節點的狀態是 Follower,那麼當前節點切換到 Follower 一定會通過 becomeFollower 函式,在這個函式中會重置節點持有任期,並且設置處理訊息的函式為 stepFollower:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L671-678
  2. func (r *raft) becomeFollower(term uint64, lead uint64) {
  3.    r.step = stepFollower
  4.    r.reset(term)
  5.    r.tick = r.tickElection
  6.    r.lead = lead
  7.    r.state = StateFollower
  8. }
  9.  
  10. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L636-643
  11. func (r *raft) tickElection() {
  12.    r.electionElapsed++
  13.  
  14.    if r.promotable() && r.pastElectionTimeout() {
  15.        r.electionElapsed = 0
  16.        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  17.    }
  18. }
除此之外,它還會設置一個用於在 Leader 節點宕機時觸發選舉的定時器 tickElection。
Candidate 狀態的節點與 Follower 的配置差不了太多,只是在訊息處理函式 step、任期以及狀態上的設置有一些比較小的區別:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L680-691
  2. func (r *raft) becomeCandidate() {
  3.    r.step = stepCandidate
  4.    r.reset(r.Term + 1)
  5.    r.tick = r.tickElection
  6.    r.Vote = r.id
  7.    r.state = StateCandidate
  8. }
最後的 Leader 就與這兩者有其他的區別了,它不僅設置了處理訊息的函式 step 而且設置了與其他狀態完全不同的 tick 函式:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L708-728
  2. func (r *raft) becomeLeader() {
  3.    r.step = stepLeader
  4.    r.reset(r.Term)
  5.    r.tick = r.tickHeartbeat
  6.    r.lead = r.id
  7.    r.state = StateLeader
  8.  
  9.    r.pendingConfIndex = r.raftLog.lastIndex()
  10.    r.appendEntry(pb.Entry{Data: nil})
  11. }
這裡的 tick 函式 tickHeartbeat 每隔一段時間會通過 Step 方法向集群中的其他節點發送 MsgBeat 訊息:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/raft/raft.go#L646-669
  2. func (r *raft) tickHeartbeat() {
  3.    r.heartbeatElapsed++
  4.    r.electionElapsed++
  5.  
  6.    if r.electionElapsed >= r.electionTimeout {
  7.        r.electionElapsed = 0
  8.        if r.checkQuorum {
  9.            r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
  10.        }
  11.    }
  12.  
  13.    if r.heartbeatElapsed >= r.heartbeatTimeout {
  14.        r.heartbeatElapsed = 0
  15.        r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  16.    }
  17. }
上述代碼中的 MsgBeat 訊息會在 Step 中被轉換成 MsgHeartbeat 最終發送給其他的節點,Leader 節點超時之後的選舉流程我們在前兩節中也已經介紹過了,在這裡就不再重覆了。
儲存

 

etcd 目前支持 V2 和 V3 兩個大版本,這兩個版本在實現上有比較大的不同,一方面是對外提供接口的方式,另一方面就是底層的儲存引擎,V2 版本的實體是一個純記憶體的實現,所有的資料都沒有儲存在磁盤上,而 V3 版本的實體就支持了資料的持久化。 
在這一節中,我們會介紹 V3 版本的 etcd 究竟是通過什麼樣的方式儲存用戶資料的。
後端
在 V3 版本的設計中,etcd 通過 backend 後端這一設計,很好地封裝了儲存引擎的實現細節,為上層提供一個更一致的接口,對於 etcd 的其他模塊來說,它們可以將更多註意力放在接口中的約定上,不過在這裡,我們更關註的是 etcd 對 Backend 接口的實現。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/backend.go#L51-69
  2. type Backend interface {
  3.    ReadTx() ReadTx
  4.    BatchTx() BatchTx
  5.  
  6.    Snapshot() Snapshot
  7.    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
  8.    Size() int64
  9.    SizeInUse() int64
  10.    Defrag() error
  11.    ForceCommit()
  12.    Close() error
  13. }
etcd 底層預設使用的是開源的嵌入式鍵值儲存資料庫 bolt,但是這個專案目前的狀態已經是歸檔不再維護了,如果想要使用這個專案可以使用 CoreOS 的 bbolt 版本。 
這一小節中,我們會簡單介紹 etcd 是如何使用 BoltDB 作為底層儲存的,首先可以先來看一下 pacakge 內部的 backend 結構體,這是一個實現了 Backend 接口的結構:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/backend.go#L80-104
  2. type backend struct {
  3.    size int64
  4.    sizeInUse int64
  5.    commits int64
  6.  
  7.    mu sync.RWMutex
  8.    db *bolt.DB
  9.  
  10.    batchInterval time.Duration
  11.    batchLimit    int
  12.    batchTx       *batchTxBuffered
  13.  
  14.    readTx *readTx
  15.  
  16.    stopc chan struct{}
  17.    donec chan struct{}
  18.  
  19.    lg *zap.Logger
  20. }
從結構體的成員 db 我們就可以看出,它使用了 BoltDB 作為底層儲存,另外的兩個 readTx 和 batchTx 分別實現了 ReadTx 和 BatchTx 接口:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/read_tx.go#L30-36
  2. type ReadTx interface {
  3.    Lock()
  4.    Unlock()
  5.  
  6.    UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  7.    UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
  8. }
  9.  
  10. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L28-38
  11. type BatchTx interface {
  12.    ReadTx
  13.    UnsafeCreateBucket(name []byte)
  14.    UnsafePut(bucketName []byte, key []byte, value []byte)
  15.    UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
  16.    UnsafeDelete(bucketName []byte, key []byte)
  17.    Commit()
  18.    CommitAndStop()
  19. }
從這兩個接口的定義,我們不難發現它們能夠對外提供資料庫的讀寫操作,而 Backend 就能對這兩者提供的方法進行封裝,為上層屏蔽儲存的具體實現: 
每當我們使用 newBackend 創建一個新的 backend 結構時,都會創建一個 readTx 和 batchTx 結構體,這兩者一個負責處理只讀請求,一個負責處理讀寫請求:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/backend.go#L137-176
  2. func newBackend(bcfg BackendConfig) *backend {
  3.    bopts := &bolt.Options{}
  4.    bopts.InitialMmapSize = bcfg.mmapSize()
  5.    db, _ := bolt.Open(bcfg.Path, 0600, bopts)
  6.  
  7.    b := &backend{
  8.        db: db,
  9.        batchInterval: bcfg.BatchInterval,
  10.        batchLimit:    bcfg.BatchLimit,
  11.        readTx: &readTx{
  12.            buf: txReadBuffer{
  13.                txBuffer: txBuffer{make(map[string]*bucketBuffer)},
  14.            },
  15.            buckets: make(map[string]*bolt.Bucket),
  16.        },
  17.        stopc: make(chan struct{}),
  18.        donec: make(chan struct{}),
  19.    }
  20.    b.batchTx = newBatchTxBuffered(b)
  21.    go b.run()
  22.    return b
  23. }
當我們在 newBackend 中進行了初始化 BoltDB、事務等工作後,就會開一個 goroutine 異步的對所有批量讀寫事務進行定時提交:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/backend.go#L289-305
  2. func (b *backend) run() {
  3.    defer close(b.donec)
  4.    t := time.NewTimer(b.batchInterval)
  5.    defer t.Stop()
  6.    for {
  7.        select {
  8.        case t.C:
  9.        case b.stopc:
  10.            b.batchTx.CommitAndStop()
  11.            return
  12.        }
  13.        if b.batchTx.safePending() != 0 {
  14.            b.batchTx.Commit()
  15.        }
  16.        t.Reset(b.batchInterval)
  17.    }
  18. }
對於上層來說,backend 其實只是對底層儲存的一個抽象,很多時候並不會直接跟它打交道,往往都是使用它持有的 ReadTx 和 BatchTx 與資料庫進行交互。
 
只讀事務
目前大多數的資料庫對於只讀型別的事務並沒有那麼多的限制,尤其是在使用了 MVCC 之後,所有的只讀請求幾乎不會被寫請求鎖住,這大大提升了讀的效率,由於在 BoltDB 的同一個 goroutine 中開啟兩個相互依賴的只讀事務和讀寫事務會發生死鎖,為了避免這種情況我們還是引入了 sync.RWLock 保證死鎖不會出現:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/read_tx.go#L38-47
  2. type readTx struct {
  3.    mu  sync.RWMutex
  4.    buf txReadBuffer
  5.  
  6.    txmu    sync.RWMutex
  7.    tx      *bolt.Tx
  8.    buckets map[string]*bolt.Bucket
  9. }
你可以看到在整個結構體中,除了用於保護 tx 的 txmu 讀寫鎖之外,還存在另外一個 mu 讀寫鎖,它的作用是保證 buf 中的資料不會出現問題,buf 和結構體中的 buckets 都是用於加速讀效率的快取。 
對於一個只讀事務來說,它對上層提供了兩個獲取儲存引擎中資料的接口,分別是 UnsafeRange 和 UnsafeForEach,在這裡會重點介紹前面方法的實現細節:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/read_tx.go#L52-90
  2. func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  3.    if endKey == nil {
  4.        limit = 1
  5.    }
  6.    keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
  7.    if int64(len(keys)) == limit {
  8.        return keys, vals
  9.    }
  10.  
  11.    bn := string(bucketName)
  12.    bucket, ok := rt.buckets[bn]
  13.    if !ok {
  14.        bucket = rt.tx.Bucket(bucketName)
  15.        rt.buckets[bn] = bucket
  16.    }
  17.  
  18.    if bucket == nil {
  19.        return keys, vals
  20.    }
  21.    c := bucket.Cursor()
  22.  
  23.    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  24.    return append(k2, keys...), append(v2, vals...)
  25. }
上述代碼中省略了加鎖保護讀快取以及 Bucket 中儲存資料的合法性,也省去了一些引數的檢查,不過方法的整體接口還是沒有太多變化,UnsafeRange 會先從自己持有的快取 txReadBuffer 中讀取資料,如果資料不能夠滿足呼叫者的需求,就會從 buckets 快取中查找對應的 BoltDB bucket 並從 BoltDB 資料庫中讀取。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L121-141
  2. func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
  3.    var isMatch func(b []byte) bool
  4.    if len(endKey) > 0 {
  5.        isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
  6.    } else {
  7.        isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
  8.        limit = 1
  9.    }
  10.  
  11.    for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
  12.        vs = append(vs, cv)
  13.        keys = append(keys, ck)
  14.        if limit == int64(len(keys)) {
  15.            break
  16.        }
  17.    }
  18.    return keys, vs
  19. }
這個包內部的函式 unsafeRange 實際上通過 BoltDB 中的游標來遍歷滿足查詢條件的鍵值對。
到這裡為止,整個只讀事務提供的接口就基本介紹完了,在 etcd 中無論我們想要後去單個 Key 還是一個範圍內的 Key 最終都是通過 Range 來實現的,這其實也是只讀事務的最主要功能。
 
讀寫事務
只讀事務只提供了讀資料的能力,包括 UnsafeRange 和 UnsafeForeach,而讀寫事務 BatchTx 提供的就是讀和寫資料的能力了:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L40-46
  2. type batchTx struct {
  3.    sync.Mutex
  4.    tx      *bolt.Tx
  5.    backend *backend
  6.  
  7.    pending int
  8. }
讀寫事務同時提供了不帶快取的 batchTx 實現以及帶快取的 batchTxBuffered 實現,後者其實『繼承了』前者的結構體,並額外加入了快取 txWriteBuffer 加速讀請求:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L243-246
  2. type batchTxBuffered struct {
  3.    batchTx
  4.    buf txWriteBuffer
  5. }
後者在實現接口規定的方法時,會直接呼叫 batchTx 的同名方法,並將操作造成的副作用的寫入的快取中,在這裡我們並不會展開介紹這一版本的實現,還是以分析 batchTx 的方法為主。
當我們向 etcd 中寫入資料時,最終都會呼叫 batchTx 的 unsafePut 方法將資料寫入到 BoltDB 中:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L65-67
  2. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
  3.    t.unsafePut(bucketName, key, value, false)
  4. }
  5.  
  6. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L74-103
  7. func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
  8.    bucket := t.tx.Bucket(bucketName)
  9.    if err := bucket.Put(key, value); err != nil {
  10.        plog.Fatalf("cannot put key into bucket (%v)", err)
  11.    }
  12.    t.pending++
  13. }
這兩個方法的實現非常清晰,作者覺得他們都並不值得展開詳細介紹,只是呼叫了 BoltDB 提供的 API 操作一下 bucket 中的資料,而另一個刪除方法的實現與這個也差不多:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L144-169
  2. func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
  3.    bucket := t.tx.Bucket(bucketName)
  4.    err := bucket.Delete(key)
  5.    if err != nil {
  6.        plog.Fatalf("cannot delete key from bucket (%v)", err)
  7.    }
  8.    t.pending++
  9. }
它們都是通過 Bolt.Tx 找到對應的 Bucket,然後做出相應的增刪操作,但是這寫請求在這兩個方法執行後其實並沒有提交,我們還需要手動或者等待 etcd 自動將請求提交:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L184-188
  2. func (t *batchTx) Commit() {
  3.    t.Lock()
  4.    t.commit(false)
  5.    t.Unlock()
  6. }
  7.  
  8. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/backend/batch_tx.go#L210-241
  9. func (t *batchTx) commit(stop bool) {
  10.    if t.tx != nil {
  11.        if t.pending == 0 && !stop {
  12.            return
  13.        }
  14.  
  15.        start := time.Now()
  16.  
  17.        err := t.tx.Commit()
  18.  
  19.        rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
  20.        spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
  21.        writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
  22.        commitSec.Observe(time.Since(start).Seconds())
  23.        atomic.AddInt64(&t.backend.commits, 1)
  24.  
  25.        t.pending = 0
  26.    }
  27.    if !stop {
  28.        t.tx = t.backend.begin(true)
  29.    }
  30. }
在每次呼叫 Commit 對讀寫事務進行提交時,都會先檢查是否有等待中的事務,然後會將資料上報至 Prometheus 中,其他的服務就可以將 Prometheus 作為資料源對 etcd 的執行狀況進行監控了。
索引
經常使用 etcd 的開發者可能會瞭解到,它本身對於每一個鍵值對都有一個 revision 的概念,鍵值對的每一次變化都會被 BoltDB 單獨記錄下來,所以想要在儲存引擎中獲取某一個 Key 對應的值,要先獲取 revision,再通過它才能找到對應的值,在里我們想要介紹的其實是 etcd 如何管理和儲存一個 Key 的多個 revision 記錄。 
在 etcd 服務中有一個用於儲存所有的鍵值對 revision 信息的 btree,我們可以通過 index 的 Get 接口獲取一個 Key 對應 Revision 的值:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/index.go#L68-76
  2. func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
  3.    keyi := &keyIndex{key: key}
  4.    if keyi = ti.keyIndex(keyi); keyi == nil {
  5.        return revision{}, revision{}, 0, ErrRevisionNotFound
  6.    }
  7.    return keyi.get(ti.lg, atRev)
  8. }
上述方法通過 keyIndex 方法查找 Key 對應的 keyIndex 結構體,這裡使用的記憶體結構體 btree 是 Google 實現的一個版本:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/index.go#L84-89
  2. func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
  3.    if item := ti.tree.Get(keyi); item != nil {
  4.        return item.(*keyIndex)
  5.    }
  6.    return nil
  7. }
可以看到這裡的實現非常簡單,只是從 treeIndex 持有的成員 btree 中查找 keyIndex,將結果強制轉換成 keyIndex 型別後傳回;獲取 Key 對應 revision 的方式也非常簡單:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/key_index.go#L149-171
  2. func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
  3.    g := ki.findGeneration(atRev)
  4.    if g.isEmpty() {
  5.        return revision{}, revision{}, 0, ErrRevisionNotFound
  6.    }
  7.  
  8.    n := g.walk(func(rev revision) bool { return rev.main > atRev })
  9.    if n != -1 {
  10.        return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
  11.    }
  12.  
  13.    return revision{}, revision{}, 0, ErrRevisionNotFound
  14. }
 
KeyIndex
在我們具體介紹方法實現的細節之前,首先我們需要理解 keyIndex 包含的欄位以及管理同一個 Key 不同版本的方式:
 
每一個 keyIndex 結構體中都包含當前鍵的值以及最後一次修改對應的 revision 信息,其中還儲存了一個 Key 的多個 generation,每一個 generation 都會記錄當前 Key『從生到死』的全部過程,每當一個 Key 被刪除時都會呼叫 timestone 方法向當前的 generation 中追加一個新的墓碑版本:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/key_index.go#L127-145
  2. func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
  3.    if ki.generations[len(ki.generations)-1].isEmpty() {
  4.        return ErrRevisionNotFound
  5.    }
  6.    ki.put(lg, main, sub)
  7.    ki.generations = append(ki.generations, generation{})
  8.    return nil
  9. }
這個 tombstone 版本標識這當前的 Key 已經被刪除了,但是在每次刪除一個 Key 之後,就會在當前的 keyIndex 中創建一個新的 generation 結構用於儲存新的版本信息,其中 ver 記錄當前 generation 包含的修改次數,created 記錄創建 generation 時的 revision 版本,最後的 revs 用於儲存所有的版本信息。
 
讀操作
etcd 中所有的查詢請求,無論是查詢一個還是多個、是數量還是鍵值對,最終都會呼叫 rangeKeys 方法:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore_txn.go#L112-165
  2. func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
  3.    rev := ro.Rev
  4.  
  5.    revpairs := tr.s.kvindex.Revisions(key, end, rev)
  6.    if len(revpairs) == 0 {
  7.        return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
  8.    }
  9.  
  10.    kvs := make([]mvccpb.KeyValue, int(ro.Limit))
  11.    revBytes := newRevBytes()
  12.    for i, revpair := range revpairs[:len(kvs)] {
  13.        revToBytes(revpair, revBytes)
  14.        _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
  15.        kvs[i].Unmarshal(vs[0])
  16.    }
  17.    return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
  18. }
為了獲取一個範圍內的所有鍵值對,我們首先需要通過 Revisions 函式從 btree 中獲取範圍內所有的 keyIndex:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/index.go#L106-120
  2. func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
  3.    if end == nil {
  4.        rev, _, _, err := ti.Get(key, atRev)
  5.        if err != nil {
  6.            return nil
  7.        }
  8.        return []revision{rev}
  9.    }
  10.    ti.visit(key, end, func(ki *keyIndex) {
  11.        if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
  12.            revs = append(revs, rev)
  13.        }
  14.    })
  15.    return revs
  16. }
如果只需要獲取一個 Key 對應的版本,就是直接使用 treeIndex 的方法,但是當上述方法會從 btree 索引中獲取一個連續多個 revision 值時,就會呼叫 keyIndex.get 來遍歷整顆樹並選取合適的版本:
  1. func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
  2.    g := ki.findGeneration(atRev)
  3.    if g.isEmpty() {
  4.        return revision{}, revision{}, 0, ErrRevisionNotFound
  5.    }
  6.  
  7.    n := g.walk(func(rev revision) bool { return rev.main > atRev })
  8.    if n != -1 {
  9.        return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
  10.    }
  11.  
  12.    return revision{}, revision{}, 0, ErrRevisionNotFound
  13. }
因為每一個 Key 的 keyIndex 中其實都儲存著多個 generation,我們需要根據傳入的引數傳回合適的 generation 並從其中傳回主版本大於 atRev 的 revision 結構。
對於上層的鍵值儲存來說,它會利用這裡傳回的 revision 從真正儲存資料的 BoltDB 中查詢當前 Key 對應 revision 的結果。
 
寫操作
當我們向 etcd 中插入資料時,會使用傳入的 key 構建一個 keyIndex 結構體並從樹中獲取相關版本等信息:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/index.go#L53-66
  2. func (ti *treeIndex) Put(key []byte, rev revision) {
  3.    keyi := &keyIndex{key: key}
  4.  
  5.    item := ti.tree.Get(keyi)
  6.    if item == nil {
  7.        keyi.put(ti.lg, rev.main, rev.sub)
  8.        ti.tree.ReplaceOrInsert(keyi)
  9.        return
  10.    }
  11.    okeyi := item.(*keyIndex)
  12.    okeyi.put(ti.lg, rev.main, rev.sub)
  13. }

 

treeIndex.Put 在獲取記憶體中的 keyIndex 結構之後會通過 keyIndex.put 其中加入新的 revision:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/key_index.go#L77-104
  2. func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
  3.    rev := revision{main: main, sub: sub}
  4.  
  5.    if len(ki.generations) == 0 {
  6.        ki.generations = append(ki.generations, generation{})
  7.    }
  8.    g := &ki.generations[len(ki.generations)-1]
  9.    if len(g.revs) == 0 {
  10.        g.created = rev
  11.    }
  12.    g.revs = append(g.revs, rev)
  13.    g.ver++
  14.    ki.modified = rev
  15. }
每一個新 revision 結構體寫入 keyIndex 時,都會改變當前 generation 的 created 和 ver 等引數,從這個方法中我們就可以瞭解到 generation 中的各個成員都是如何被寫入的。
寫入的操作除了增加之外,刪除某一個 Key 的函式也會經常被呼叫:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore_txn.go#L252-309
  2. func (tw *storeTxnWrite) delete(key []byte) {
  3.    ibytes := newRevBytes()
  4.    idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
  5.    revToBytes(idxRev, ibytes)
  6.  
  7.    ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
  8.  
  9.    kv := mvccpb.KeyValue{Key: key}
  10.  
  11.    d, _ := kv.Marshal()
  12.  
  13.    tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
  14.    tw.s.kvindex.Tombstone(key, idxRev)
  15.    tw.changes = append(tw.changes, kv)
  16. }
正如我們在文章前面所介紹的,刪除操作會向結構體中的 generation 追加一個新的 tombstone 標記,用於標識當前的 Key 已經被刪除;除此之外,上述方法還會將每一個更新操作的 revision 存到單獨的 keyBucketName 中。
 
索引的恢復
因為在 etcd 中,所有的 keyIndex 都是在記憶體的 btree 中儲存的,所以在啟動服務時需要從 BoltDB 中將所有的資料都加載到記憶體中,在這裡就會初始化一個新的 btree 索引,然後呼叫 restore 方法開始恢復索引:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore.go#L321-433
  2. func (s *store) restore() error {
  3.    min, max := newRevBytes(), newRevBytes()
  4.    revToBytes(revision{main: 1}, min)
  5.    revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
  6.  
  7.    tx := s.b.BatchTx()
  8.  
  9.    rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
  10.    for {
  11.        keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
  12.        if len(keys) == 0 {
  13.            break
  14.        }
  15.        restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
  16.        newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
  17.        newMin.sub++
  18.        revToBytes(newMin, min)
  19.    }
  20.    close(rkvc)
  21.    s.currentRev = revc
  22.  
  23.    return nil
  24. }
在恢復索引的過程中,有一個用於遍歷不同鍵值的『生產者』迴圈,其中由 UnsafeRange 和 restoreChunk 兩個方法構成,這兩個方法會從 BoltDB 中遍歷資料,然後將鍵值對傳到 rkvc 中,交給 restoreIntoIndex 方法中創建的 goroutine 處理:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore.go#L486-506
  2. func restoreChunk(lg *zap.Logger, kvc chan revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
  3.    for i, key := range keys {
  4.        rkv := r evKeyValue{key: key}
  5.        _ := rkv.kv.Unmarshal(vals[i])
  6.        rkv.kstr = string(rkv.kv.Key)
  7.        if isTombstone(key) {
  8.            delete(keyToLease, rkv.kstr)
  9.        } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
  10.            keyToLease[rkv.kstr] = lid
  11.        } else {
  12.            delete(keyToLease, rkv.kstr)
  13.        }
  14.        kvc rkv
  15.    }
  16. }
先被呼叫的 restoreIntoIndex 方法會創建一個用於接受鍵值對的 Channel,在這之後會在一個 goroutine 中處理從 Channel 接收到的資料,並將這些資料恢復到記憶體里的 btree 中:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore.go#L441-484
  2. func restoreIntoIndex(lg *zap.Logger, idx index) (chan revKeyValue, chan int64) {
  3.    rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
  4.    go func() {
  5.        currentRev := int64(1)
  6.        defer func() { revc currentRev }()
  7.        for rkv := range rkvc {
  8.            ki = &keyIndex{key: rkv.kv.Key}
  9.            ki := idx.KeyIndex(ki)
  10.  
  11.            rev := bytesToRev(rkv.key)
  12.            currentRev = rev.main
  13.            if ok {
  14.                if isTombstone(rkv.key) {
  15.                    ki.tombstone(lg, rev.main, rev.sub)
  16.                    continue
  17.                }
  18.                ki.put(lg, rev.main, rev.sub)
  19.            } else if !isTombstone(rkv.key) {
  20.                ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
  21.                idx.Insert(ki)
  22.            }
  23.        }
  24.    }()
  25.    return rkvc, revc
  26. }

恢復記憶體索引的相關代碼在實現上非常值得學習,兩個不同的函式通過 Channel 進行通信並使用 goroutine 處理任務,能夠很好地將訊息的『生產者』和『消費者』進行分離。

Channel 作為整個恢復索引邏輯的一個訊息中心,它將遍歷 BoltDB 中的資料和恢復索引兩部分代碼進行了分離。
儲存
etcd 的 mvcc 模塊對外直接提供了兩種不同的訪問方式,一種是鍵值儲存 kvstore,另一種是 watchableStore 它們都實現了包內公開的 KV 接口:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kv.go#L100-125
  2. type KV interface {
  3.    ReadView
  4.    WriteView
  5.  
  6.    Read() TxnRead
  7.    Write() TxnWrite
  8.  
  9.    Hash() (hash uint32, revision int64, err error)
  10.    HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
  11.  
  12.    Compact(rev int64) (chan struct{}, error)
  13.    Commit()
  14.    Restore(b backend.Backend) error
  15.    Close() error
  16. }
 
kvstore
 
對於 kvstore 來說,其實沒有太多值得展開介紹的地方,它利用底層的 BoltDB 等基礎設施為上層提供最常見的增傷改查,它組合了下層的 readTx、batchTx 等結構體,將一些執行緒不安全的操作變成執行緒安全的。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/kvstore_txn.go#L32-40
  2. func (s *store) Read() TxnRead {
  3.    s.mu.RLock()
  4.    tx := s.b.ReadTx()
  5.    s.revMu.RLock()
  6.    tx.Lock()
  7.    firstRev, rev := s.compactMainRev, s.currentRev
  8.    s.revMu.RUnlock()
  9.    return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
  10. }
它也負責對記憶體中 btree 索引的維護以及壓縮一些無用或者不常用的資料,幾個對外的接口 Read、Write 就是對 readTx、batchTx 等結構體的組合併將它們的接口暴露給其他的模塊。
 
watchableStore
另外一個比較有意思的儲存就是 watchableStore 了,它是 mvcc 模塊為外界提供 Watch 功能的接口,它負責了註冊、管理以及觸發 Watcher 的功能,我們先來看一下這個結構體的各個欄位:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/watchable_store.go#L45-65
  2. type watchableStore struct {
  3.    *store
  4.  
  5.    mu sync.RWMutex
  6.  
  7.    unsynced watcherGroup
  8.    synced watcherGroup
  9.  
  10.    stopc chan struct{}
  11.    wg    sync.WaitGroup
  12. }
每一個 watchableStore 其實都組合了來自 store 結構體的欄位和方法,除此之外,還有兩個 watcherGroup 型別的欄位,其中 unsynced 用於儲存未同步完成的實體,synced 用於儲存已經同步完成的實體。
在初始化一個新的 watchableStore 時,我們會創建一個用於同步watcherGroup 的 Goroutine,在 syncWatchersLoop 這個迴圈中會每隔 100ms 呼叫一次 syncWatchers 方法,將所有未通知的事件通知給所有的監聽者,這可以說是整個模塊的核心:
  1. func (s *watchableStore) syncWatchers() int {
  2.    curRev := s.store.currentRev
  3.    compactionRev := s.store.compactMainRev
  4.  
  5.    wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
  6.    minBytes, maxBytes := newRevBytes(), newRevBytes()
  7.    revToBytes(revision{main: minRev}, minBytes)
  8.    revToBytes(revision{main: curRev + 1}, maxBytes)
  9.  
  10.    tx := s.store.b.ReadTx()
  11.    revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
  12.    evs := kvsToEvents(nil, wg, revs, vs)
  13.  
  14.    wb := newWatcherBatch(wg, evs)
  15.    for w := range wg.watchers {
  16.        w.minRev = curRev + 1
  17.  
  18.        eb, ok := wb[w]
  19.        if !ok {
  20.            s.synced.add(w)
  21.            s.unsynced.delete(w)
  22.            continue
  23.        }
  24.  
  25.        w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})
  26.  
  27.        s.synced.add(w)
  28.        s.unsynced.delete(w)
  29.    }
  30.  
  31.    return s.unsynced.size()
  32. }
簡化後的 syncWatchers 方法中總共做了三件事情,首先是根據當前的版本從未同步的 watcherGroup 中選出一些待處理的任務,然後從 BoltDB 中後去當前版本範圍內的資料變更並將它們轉換成事件,事件和 watcherGroup 在打包之後會通過 send 方法發送到每一個 watcher 對應的 Channel 中。 
上述圖片中展示了 mvcc 模塊對於向外界提供的監聽某個 Key 和範圍的接口,外部的其他模塊會通過 watchStream.watch 函式與模塊內部進行交互,每一次呼叫 watch 方法最終都會向 watchableStore 持有的 watcherGroup 中添加新的 watcher 結構。
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/watcher.go#L108-135
  2. func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
  3.    if id == AutoWatchID {
  4.        for ws.watchers[ws.nextID] != nil {
  5.            ws.nextID++
  6.        }
  7.        id = ws.nextID
  8.        ws.nextID++
  9.    }
  10.  
  11.    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
  12.  
  13.    ws.cancels[id] = c
  14.    ws.watchers[id] = w
  15.    return id, nil
  16. }
  17.  
  18. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/mvcc/watchable_store.go#L111-142
  19. func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
  20.    wa := &watcher{
  21.        key:    key,
  22.        end:    end,
  23.        minRev: startRev,
  24.        id:     id,
  25.        ch:     ch,
  26.        fcs:    fcs,
  27.    }
  28.  
  29.    synced := startRev > s.store.currentRev || startRev == 0
  30.    if synced {
  31.        s.synced.add(wa)
  32.    } else {
  33.        s.unsynced.add(wa)
  34.    }
  35.  
  36.    return wa, func() { s.cancelWatcher(wa) }
  37. }
當 etcd 服務啟動時,會在服務端運行一個用於處理監聽事件的 watchServer gRPC 服務,客戶端的 Watch 請求最終都會被轉發到這個服務的 Watch 函式中:
  1. // https://sourcegraph.com/github.com/etcd-io/[email protected]/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
  2. func (ws *watchServer) Watch(