Go 通過 ETCD 實現應用選主

原理

Go 服務通過 ETCD client 實現有狀態服務的選主,A、B 兩個服務通過生成相同前綴的 key,並且把自己的 IP 值上傳給 ETCD,由於多個服務間會同時上傳,所以按照相同前綴去獲取創建的 key-value,然後通過時間進行排序,最早創建的則成爲 master。

如果成爲 master 的服務一直在線的話,會對 key-value 進行 續約, 當 master 下線不能正常續約的時候,key 會被刪除,後面創建的 key 就變成了最早創建的,從而接任成爲 master.

整體選主流程如下圖:

ETCD Client 選主代碼

Campaign 方法是 ETCD client 封裝的一個選主方法,如果競選成爲主節點成功則正常返回,如果不成功則會阻塞在這個方法裏面,等待前面的主節點失效刪除 key-value,然後自己成功才從方法退出。

func (e *Election) Campaign(ctx context.Context, val string) error {
 s := e.session
 client := e.session.Client()
 // 生成相同前綴的Key進行選主
 k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k)"=", 0))
 // 將續約跟session的lease進行綁定,Session不過期就一直是master
 txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 // 如果不是首次創建那麼獲取對應key的值用於後續值更新
 txn = txn.Else(v3.OpGet(k))
 resp, err := txn.Commit()
 if err != nil {
  return err
 }
 e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
 
 // 如果事務提交沒有成功則通過 Proclaim來宣佈一個最新的值
 if !resp.Succeeded {
  kv := resp.Responses[0].GetResponseRange().Kvs[0]
  e.leaderRev = kv.CreateRevision
  if string(kv.Value) != val {
   if err = e.Proclaim(ctx, val); err != nil {
    e.Resign(ctx)
    return err
   }
  }
 }
 

 // 如果自己不是主節點則會阻塞,等待前面的key被刪除後自己成爲主節點
 _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
 if err != nil {
  // clean up in case of context cancel
  select {
  case <-ctx.Done():
   e.Resign(client.Ctx()) // context被取消的發起新一輪選主
  default:
   e.leaderSession = nil
  }
  return err
 }
 e.hdr = resp.Header

 return nil
}

Proclaim 在不進行新的一輪選舉下宣佈一個新值,這裏調用主要是用來把最新的值修改成競選的時候傳入的 value ,我們傳入的是 ip,即保持最新的 IP 值,避免選舉成功後 IP不是最新的值

func (e *Election) Proclaim(ctx context.Context, val string) error {
 // ...
 cmp := v3.Compare(v3.CreateRevision(e.leaderKey)"=", e.leaderRev)
 txn := client.Txn(ctx).If(cmp)
 txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
 tresp, terr := txn.Commit()
 // ...
}

Resign 放棄 leader,將 leader 的 key-value 刪除,重新開始新一輪的選舉

func (e *Election) Resign(ctx context.Context) (err error) {
 //...
 client := e.session.Client()
 cmp := v3.Compare(v3.CreateRevision(e.leaderKey)"=", e.leaderRev)
 resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
 //...
}

watchDeletes 等待前面的 key 都被刪除,則自己成爲最新的 key 時便成爲 master

代碼如下:

func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
 getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
 for {
  resp, err := client.Get(ctx, pfx, getOpts...)
  if err != nil {
   return nil, err
  }
  // 已經成爲最新的key,直接返回
  if len(resp.Kvs) == 0 {
   return resp.Header, nil
  }
  // watch 該key,等待它刪除後再繼續進入下一次循環
  lastKey := string(resp.Kvs[0].Key)
  if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
   return nil, err
  }
 }
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
 // ...
 // 循環監聽所有事件,如果是delete則直接進行返回,證明key已經被刪除
 wch := client.Watch(cctx, key, v3.WithRev(rev))
 for wr = range wch {
  for _, ev := range wr.Events {
   if ev.Type == mvccpb.DELETE {
    return nil
   }
  }
 }
 // ...
}

選主實現

  1. 創建一個 session,並對鏈接進行保活

  2. 參與選主,沒有成功則阻塞

  3. 成爲主節點後寫入數據到 notify chan,外部讀取該管道的數據來執行主節點需要的任務。

func campaign(ctx context.Context, etcdClient *clientv3.Client, notify chan struct{}) (err error) {
 session, err := concurrency.NewSession(etcdClient)
 if err != nil {
  return
 }
 defer func() {
  if err != nil {
   fmt.Println(err)
  }
  if session != nil {
   _ = session.Close()
  }
 }()
 election := concurrency.NewElection(session, "/etcd-campaign-demo")
 ip, err := net_helper.GetLocalIP()
 if err != nil {
  return
 }
 // 成爲leader節點會運行出來,沒選舉成功則會阻塞在這裏
 err = election.Campaign(ctx, ip)
 if err != nil {
  return
 }

 // 通知程序已經成爲leader,可以避免輪訓leader狀態
 for {
  select {
  case notify <- struct{}{}: // 發送通知說明已經是leader了
  case <-session.Done(): // session斷開要重新進行選舉
   err = errors.New("session is done")
   return
  case <-ctx.Done():
   // 給context設定了超時時間,避免放棄leader的時間超時
   resignCtx, _ := context.WithTimeout(context.Background(), time.Second)
   _ = election.Resign(resignCtx)
   err = errors.New("ctx is done")
   return
  }
 }
}

成功競選後自動續約如何實現

通過 lease ID 將 key 跟 session 的租約進行綁定,只要 session 不過期,那麼對應的 key 也就不會過期

我們來分析下 session 如何通過 keepalive 機制來進行保活。

整體的流程如下:

NewSession 的代碼邏輯如下:

func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
 // ...
 // 創建租約
 id := ops.leaseID
 if id == v3.NoLease {
  resp, err := client.Grant(ops.ctx, int64(ops.ttl))
  if err != nil {
   return nil, err
  }
  id = resp.ID
 }

 
 // 調用keepalive 進行保活
 keepAlive, err := client.KeepAlive(ctx, id)

 return s, nil
}

KeepAlive 代碼邏輯如下

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

 // 將需要保活的id進行保存
 l.mu.Lock()
 ka, ok := l.keepAlives[id]
 if !ok {
  ka = &keepAlive{
   chs:           []chan<- *LeaseKeepAliveResponse{ch},
   ctxs:          []context.Context{ctx},
   deadline:      time.Now().Add(l.firstKeepAliveTimeout),
   nextKeepAlive: time.Now(),
   donec:         make(chan struct{}),
  }
  l.keepAlives[id] = ka
 }
 l.mu.Unlock()

 // 第一次進入的時候會初始化循環進行保活
 l.firstKeepAliveOnce.Do(func() {
  go l.recvKeepAliveLoop()
 })

 return ch, nil
}

receKeepAliveLook 代碼邏輯

func (l *lessor) recvKeepAliveLoop() (gerr error) {
 for {
  // 這裏面會啓動 sendKeepAliveLoop 用於持續續約
  stream, err := l.resetRecv()
  if err != nil {
   if canceledByCaller(l.stopCtx, err) {
    return err
   }
  } else {
   for {
    // 循環接收 keepAlive 的請求,用於更新需要下次保活的時間等信息
    resp, err := stream.Recv()
    l.recvKeepAlive(resp)
   }
  }

 }
}

sendKeepAliveLoop 用來給 ETCD server 發送續約請求

func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 for {
  var tosend []LeaseID

  now := time.Now()

  // 獲取所有需要發送保活的id
  l.mu.Lock()
  for id, ka := range l.keepAlives {
   if ka.nextKeepAlive.Before(now) {
    tosend = append(tosend, id)
   }
  }
  l.mu.Unlock()

  // 流式發送數據
  for _, id := range tosend {
   r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
   if err := stream.Send(r); err != nil {
   
   }
  }
  // ...
 }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/C0b-VqlKtrwApVHtVGy6yA