


- 
etcd is a distributed reliable key-value store for the most critical data of a distributed system… 
- 
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. 
- service KV {
- rpc Range(RangeRequest) returns (RangeResponse) {
- option (google.api.http) = {
- post: "/v3beta/kv/range"
- body: "*"
- };
- }
- rpc Put(PutRequest) returns (PutResponse) {
- option (google.api.http) = {
- post: "/v3beta/kv/put"
- body: "*"
- };
- }
- }





- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L190-225
- func StartNode(c *Config, peers []Peer) Node {
- r := newRaft(c)
- r.becomeFollower(1, None)
- r.raftLog.committed = r.raftLog.lastIndex()
- for _, peer := range peers {
- r.addNode(peer.ID)
- }
- n := newNode()
- go n.run(r)
- return &n
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L291-423
- func (n *node) run(r *raft) {
- lead := None
- for {
- if lead != r.lead {
- lead = r.lead
- }
- select {
- case m := n.recvc:
- r.Step(m)
- case n.tickc:
- r.tick()
- case n.stop:
- close(n.done)
- return
- }
- }
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
- func (r *raft) tickElection() {
- r.electionElapsed++
- if r.promotable() && r.pastElectionTimeout() {
- r.electionElapsed = 0
- r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
- }
- }
- func (r *raft) tickHeartbeat() {
- r.heartbeatElapsed++
- r.electionElapsed++
- if r.heartbeatElapsed >= r.heartbeatTimeout {
- r.heartbeatElapsed = 0
- r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L931-1142
- func stepLeader(r *raft, m pb.Message) error {
- switch m.Type {
- case pb.MsgBeat:
- r.bcastHeartbeat()
- return nil
- // ...
- }
- //...
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L518-534
- func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
- commit := min(r.getProgress(to).Match, r.raftLog.committed)
- m := pb.Message{
- To: to,
- Type: pb.MsgHeartbeat,
- Commit: commit,
- Context: ctx,
- }
- r.send(m)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1191-1247
- func stepFollower(r *raft, m pb.Message) error {
- switch m.Type {
- case pb.MsgHeartbeat:
- r.electionElapsed = 0
- r.lead = m.From
- r.handleHeartbeat(m)
- // ...
- }
- return nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
- func stepCandidate(r *raft, m pb.Message) error {
- // ...
- switch m.Type {
- case pb.MsgHeartbeat:
- r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
- r.handleHeartbeat(m)
- }
- // ...
- return nil
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
- func (r *raft) tickElection() {
- r.electionElapsed++
- if r.promotable() && r.pastElectionTimeout() {
- r.electionElapsed = 0
- r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
- func (r *raft) Step(m pb.Message) error {
- // ...
- switch m.Type {
- case pb.MsgHup:
- if r.state != StateLeader {
- if r.preVote {
- r.campaign(campaignPreElection)
- } else {
- r.campaign(campaignElection)
- }
- } else {
- r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
- }
- }
- // ...
- return nil
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L730-766
- func (r *raft) campaign(t CampaignType) {
- r.becomeCandidate()
- if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
- r.becomeLeader()
- return
- }
- for id := range r.prs {
- if id == r.id {
- continue
- }
- r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
- func (r *raft) Step(m pb.Message) error {
- // ...
- switch m.Type {
- case pb.MsgVote, pb.MsgPreVote:
- canVote := r.Vote == m.From || (r.Vote == None && r.lead == None)
- if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
- r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp})
- r.electionElapsed = 0
- r.Vote = m.From
- } else {
- r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true})
- }
- }
- // ...
- return nil
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
- func stepCandidate(r *raft, m pb.Message) error {
- switch m.Type {
- // ...
- case pb.MsgVoteResp:
- gr := r.poll(m.From, m.Type, !m.Reject)
- switch r.quorum() {
- case gr:
- r.becomeLeader()
- r.bcastAppend()
- // ...
- }
- }
- return nil
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L671-678
- func (r *raft) becomeFollower(term uint64, lead uint64) {
- r.step = stepFollower
- r.reset(term)
- r.tick = r.tickElection
- r.lead = lead
- r.state = StateFollower
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
- func (r *raft) tickElection() {
- r.electionElapsed++
- if r.promotable() && r.pastElectionTimeout() {
- r.electionElapsed = 0
- r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L680-691
- func (r *raft) becomeCandidate() {
- r.step = stepCandidate
- r.reset(r.Term + 1)
- r.tick = r.tickElection
- r.Vote = r.id
- r.state = StateCandidate
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L708-728
- func (r *raft) becomeLeader() {
- r.step = stepLeader
- r.reset(r.Term)
- r.tick = r.tickHeartbeat
- r.lead = r.id
- r.state = StateLeader
- r.pendingConfIndex = r.raftLog.lastIndex()
- r.appendEntry(pb.Entry{Data: nil})
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L646-669
- func (r *raft) tickHeartbeat() {
- r.heartbeatElapsed++
- r.electionElapsed++
- if r.electionElapsed >= r.electionTimeout {
- r.electionElapsed = 0
- if r.checkQuorum {
- r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
- }
- }
- if r.heartbeatElapsed >= r.heartbeatTimeout {
- r.heartbeatElapsed = 0
- r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
- }
- }


- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L51-69
- type Backend interface {
- ReadTx() ReadTx
- BatchTx() BatchTx
- Snapshot() Snapshot
- Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
- Size() int64
- SizeInUse() int64
- Defrag() error
- ForceCommit()
- Close() error
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104
- type backend struct {
- size int64
- sizeInUse int64
- commits int64
- mu sync.RWMutex
- db *bolt.DB
- batchInterval time.Duration
- batchLimit int
- batchTx *batchTxBuffered
- readTx *readTx
- stopc chan struct{}
- donec chan struct{}
- lg *zap.Logger
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36
- type ReadTx interface {
- Lock()
- Unlock()
- UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
- UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38
- type BatchTx interface {
- ReadTx
- UnsafeCreateBucket(name []byte)
- UnsafePut(bucketName []byte, key []byte, value []byte)
- UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
- UnsafeDelete(bucketName []byte, key []byte)
- Commit()
- CommitAndStop()
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176
- func newBackend(bcfg BackendConfig) *backend {
- bopts := &bolt.Options{}
- bopts.InitialMmapSize = bcfg.mmapSize()
- db, _ := bolt.Open(bcfg.Path, 0600, bopts)
- b := &backend{
- db: db,
- batchInterval: bcfg.BatchInterval,
- batchLimit: bcfg.BatchLimit,
- readTx: &readTx{
- buf: txReadBuffer{
- txBuffer: txBuffer{make(map[string]*bucketBuffer)},
- },
- buckets: make(map[string]*bolt.Bucket),
- },
- stopc: make(chan struct{}),
- donec: make(chan struct{}),
- }
- b.batchTx = newBatchTxBuffered(b)
- go b.run()
- return b
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305
- func (b *backend) run() {
- defer close(b.donec)
- t := time.NewTimer(b.batchInterval)
- defer t.Stop()
- for {
- select {
- case t.C:
- case b.stopc:
- b.batchTx.CommitAndStop()
- return
- }
- if b.batchTx.safePending() != 0 {
- b.batchTx.Commit()
- }
- t.Reset(b.batchInterval)
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L38-47
- type readTx struct {
- mu sync.RWMutex
- buf txReadBuffer
- txmu sync.RWMutex
- tx *bolt.Tx
- buckets map[string]*bolt.Bucket
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L52-90
- func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
- if endKey == nil {
- limit = 1
- }
- keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
- if int64(len(keys)) == limit {
- return keys, vals
- }
- bn := string(bucketName)
- bucket, ok := rt.buckets[bn]
- if !ok {
- bucket = rt.tx.Bucket(bucketName)
- rt.buckets[bn] = bucket
- }
- if bucket == nil {
- return keys, vals
- }
- c := bucket.Cursor()
- k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
- return append(k2, keys...), append(v2, vals...)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L121-141
- func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
- var isMatch func(b []byte) bool
- if len(endKey) > 0 {
- isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
- } else {
- isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
- limit = 1
- }
- for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
- vs = append(vs, cv)
- keys = append(keys, ck)
- if limit == int64(len(keys)) {
- break
- }
- }
- return keys, vs
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L40-46
- type batchTx struct {
- sync.Mutex
- tx *bolt.Tx
- backend *backend
- pending int
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L243-246
- type batchTxBuffered struct {
- batchTx
- buf txWriteBuffer
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L65-67
- func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
- t.unsafePut(bucketName, key, value, false)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L74-103
- func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
- bucket := t.tx.Bucket(bucketName)
- if err := bucket.Put(key, value); err != nil {
- plog.Fatalf("cannot put key into bucket (%v)", err)
- }
- t.pending++
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L144-169
- func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
- bucket := t.tx.Bucket(bucketName)
- err := bucket.Delete(key)
- if err != nil {
- plog.Fatalf("cannot delete key from bucket (%v)", err)
- }
- t.pending++
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L184-188
- func (t *batchTx) Commit() {
- t.Lock()
- t.commit(false)
- t.Unlock()
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L210-241
- func (t *batchTx) commit(stop bool) {
- if t.tx != nil {
- if t.pending == 0 && !stop {
- return
- }
- start := time.Now()
- err := t.tx.Commit()
- rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
- spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
- writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
- commitSec.Observe(time.Since(start).Seconds())
- atomic.AddInt64(&t.backend.commits, 1)
- t.pending = 0
- }
- if !stop {
- t.tx = t.backend.begin(true)
- }
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L68-76
- func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
- keyi := &keyIndex{key: key}
- if keyi = ti.keyIndex(keyi); keyi == nil {
- return revision{}, revision{}, 0, ErrRevisionNotFound
- }
- return keyi.get(ti.lg, atRev)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89
- func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
- if item := ti.tree.Get(keyi); item != nil {
- return item.(*keyIndex)
- }
- return nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L149-171
- func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
- g := ki.findGeneration(atRev)
- if g.isEmpty() {
- return revision{}, revision{}, 0, ErrRevisionNotFound
- }
- n := g.walk(func(rev revision) bool { return rev.main > atRev })
- if n != -1 {
- return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
- }
- return revision{}, revision{}, 0, ErrRevisionNotFound
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145
- func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
- if ki.generations[len(ki.generations)-1].isEmpty() {
- return ErrRevisionNotFound
- }
- ki.put(lg, main, sub)
- ki.generations = append(ki.generations, generation{})
- return nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165
- func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
- rev := ro.Rev
- revpairs := tr.s.kvindex.Revisions(key, end, rev)
- if len(revpairs) == 0 {
- return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
- }
- kvs := make([]mvccpb.KeyValue, int(ro.Limit))
- revBytes := newRevBytes()
- for i, revpair := range revpairs[:len(kvs)] {
- revToBytes(revpair, revBytes)
- _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
- kvs[i].Unmarshal(vs[0])
- }
- return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120
- func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
- if end == nil {
- rev, _, _, err := ti.Get(key, atRev)
- if err != nil {
- return nil
- }
- return []revision{rev}
- }
- ti.visit(key, end, func(ki *keyIndex) {
- if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
- revs = append(revs, rev)
- }
- })
- return revs
- }
- func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
- g := ki.findGeneration(atRev)
- if g.isEmpty() {
- return revision{}, revision{}, 0, ErrRevisionNotFound
- }
- n := g.walk(func(rev revision) bool { return rev.main > atRev })
- if n != -1 {
- return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
- }
- return revision{}, revision{}, 0, ErrRevisionNotFound
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66
- func (ti *treeIndex) Put(key []byte, rev revision) {
- keyi := &keyIndex{key: key}
- item := ti.tree.Get(keyi)
- if item == nil {
- keyi.put(ti.lg, rev.main, rev.sub)
- ti.tree.ReplaceOrInsert(keyi)
- return
- }
- okeyi := item.(*keyIndex)
- okeyi.put(ti.lg, rev.main, rev.sub)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104
- func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
- rev := revision{main: main, sub: sub}
- if len(ki.generations) == 0 {
- ki.generations = append(ki.generations, generation{})
- }
- g := &ki.generations[len(ki.generations)-1]
- if len(g.revs) == 0 {
- g.created = rev
- }
- g.revs = append(g.revs, rev)
- g.ver++
- ki.modified = rev
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309
- func (tw *storeTxnWrite) delete(key []byte) {
- ibytes := newRevBytes()
- idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
- revToBytes(idxRev, ibytes)
- ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
- kv := mvccpb.KeyValue{Key: key}
- d, _ := kv.Marshal()
- tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
- tw.s.kvindex.Tombstone(key, idxRev)
- tw.changes = append(tw.changes, kv)
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433
- func (s *store) restore() error {
- min, max := newRevBytes(), newRevBytes()
- revToBytes(revision{main: 1}, min)
- revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
- tx := s.b.BatchTx()
- rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
- for {
- keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
- if len(keys) == 0 {
- break
- }
- restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
- newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
- newMin.sub++
- revToBytes(newMin, min)
- }
- close(rkvc)
- s.currentRev = revc
- return nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L486-506
- func restoreChunk(lg *zap.Logger, kvc chan revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
- for i, key := range keys {
- rkv := r evKeyValue{key: key}
- _ := rkv.kv.Unmarshal(vals[i])
- rkv.kstr = string(rkv.kv.Key)
- if isTombstone(key) {
- delete(keyToLease, rkv.kstr)
- } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
- keyToLease[rkv.kstr] = lid
- } else {
- delete(keyToLease, rkv.kstr)
- }
- kvc rkv
- }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L441-484
- func restoreIntoIndex(lg *zap.Logger, idx index) (chan revKeyValue, chan int64) {
- rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
- go func() {
- currentRev := int64(1)
- defer func() { revc currentRev }()
- for rkv := range rkvc {
- ki = &keyIndex{key: rkv.kv.Key}
- ki := idx.KeyIndex(ki)
- rev := bytesToRev(rkv.key)
- currentRev = rev.main
- if ok {
- if isTombstone(rkv.key) {
- ki.tombstone(lg, rev.main, rev.sub)
- continue
- }
- ki.put(lg, rev.main, rev.sub)
- } else if !isTombstone(rkv.key) {
- ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
- idx.Insert(ki)
- }
- }
- }()
- return rkvc, revc
- }
恢復記憶體索引的相關程式碼在實現上非常值得學習,兩個不同的函式透過 Channel 進行通訊並使用 goroutine 處理任務,能夠很好地將訊息的『生產者』和『消費者』進行分離。

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kv.go#L100-125
- type KV interface {
- ReadView
- WriteView
- Read() TxnRead
- Write() TxnWrite
- Hash() (hash uint32, revision int64, err error)
- HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
- Compact(rev int64) (chan struct{}, error)
- Commit()
- Restore(b backend.Backend) error
- Close() error
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L32-40
- func (s *store) Read() TxnRead {
- s.mu.RLock()
- tx := s.b.ReadTx()
- s.revMu.RLock()
- tx.Lock()
- firstRev, rev := s.compactMainRev, s.currentRev
- s.revMu.RUnlock()
- return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L45-65
- type watchableStore struct {
- *store
- mu sync.RWMutex
- unsynced watcherGroup
- synced watcherGroup
- stopc chan struct{}
- wg sync.WaitGroup
- }
- func (s *watchableStore) syncWatchers() int {
- curRev := s.store.currentRev
- compactionRev := s.store.compactMainRev
- wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
- minBytes, maxBytes := newRevBytes(), newRevBytes()
- revToBytes(revision{main: minRev}, minBytes)
- revToBytes(revision{main: curRev + 1}, maxBytes)
- tx := s.store.b.ReadTx()
- revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
- evs := kvsToEvents(nil, wg, revs, vs)
- wb := newWatcherBatch(wg, evs)
- for w := range wg.watchers {
- w.minRev = curRev + 1
- eb, ok := wb[w]
- if !ok {
- s.synced.add(w)
- s.unsynced.delete(w)
- continue
- }
- w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})
- s.synced.add(w)
- s.unsynced.delete(w)
- }
- return s.unsynced.size()
- }

- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watcher.go#L108-135
- func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
- if id == AutoWatchID {
- for ws.watchers[ws.nextID] != nil {
- ws.nextID++
- }
- id = ws.nextID
- ws.nextID++
- }
- w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
- ws.cancels[id] = c
- ws.watchers[id] = w
- return id, nil
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142
- func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
- wa := &watcher{
- key: key,
- end: end,
- minRev: startRev,
- id: id,
- ch: ch,
- fcs: fcs,
- }
- synced := startRev > s.store.currentRev || startRev == 0
- if synced {
- s.synced.add(wa)
- } else {
- s.unsynced.add(wa)
- }
- return wa, func() { s.cancelWatcher(wa) }
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
- func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
- sws := serverWatchStream{
- // ...
- gRPCStream: stream,
- watchStream: ws.watchable.NewWatchStream(),
- ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
- }
- sws.wg.Add(1)
- go func() {
- sws.sendLoop()
- sws.wg.Done()
- }()
- go func() {
- sws.recvLoop()
- }()
- sws.wg.Wait()
- return err
- }
- // https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334
- func (sws *serverWatchStream) recvLoop() error {
- for {
- req, err := sws.gRPCStream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- switch uv := req.RequestUnion.(type) {
- case *pb.WatchRequest_CreateRequest:
- creq := uv.CreateRequest
- filters := FiltersFromRequest(creq)
- wsrev := sws.watchStream.Rev()
- rev := creq.StartRevision
- id, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wsrev),
- WatchId: int64(id),
- Created: true,
- Canceled: err != nil,
- }
- select {
- case sws.ctrlStream wr:
- case sws.closec:
- return nil
- }
- case *pb.WatchRequest_CancelRequest: // ...
- case *pb.WatchRequest_ProgressRequest: // ...
- default:
- continue
- }
- }
- }
- func (sws *serverWatchStream) sendLoop() {
- for {
- select {
- case wresp, ok := sws.watchStream.Chan():
- evs := wresp.Events
- events := make([]*mvccpb.Event, len(evs))
- for i := range evs {
- events[i] = &evs[i] }
- canceled := wresp.CompactRevision != 0
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wresp.Revision),
- WatchId: int64(wresp.WatchID),
- Events: events,
- CompactRevision: wresp.CompactRevision,
- Canceled: canceled,
- }
- sws.gRPCStream.Send(wr)
- case c, ok := sws.ctrlStream: // ...
- case progressTicker.C: // ...
- case sws.closec:
- return
- }
- }
- }




 知識星球
知識星球
