Go 通過 ETCD 實現應用選主
原理
Go 服務通過 ETCD client 實現有狀態服務的選主,A、B 兩個服務通過生成相同前綴的 key,並且把自己的 IP 值上傳給 ETCD,由於多個服務間會同時上傳,所以按照相同前綴去獲取創建的 key-value,然後通過時間進行排序,最早創建的則成爲 master。
如果成爲 master 的服務一直在線的話,會對 key-value 進行 續約, 當 master 下線不能正常續約的時候,key 會被刪除,後面創建的 key 就變成了最早創建的,從而接任成爲 master.
整體選主流程如下圖:
ETCD Client 選主代碼
Campaign
方法是 ETCD client 封裝的一個選主方法,如果競選成爲主節點成功則正常返回,如果不成功則會阻塞在這個方法裏面,等待前面的主節點失效刪除 key-value,然後自己成功才從方法退出。
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
// 生成相同前綴的Key進行選主
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
// 將續約跟session的lease進行綁定,Session不過期就一直是master
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
// 如果不是首次創建那麼獲取對應key的值用於後續值更新
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
// 如果事務提交沒有成功則通過 Proclaim來宣佈一個最新的值
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
// 如果自己不是主節點則會阻塞,等待前面的key被刪除後自己成爲主節點
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx()) // context被取消的發起新一輪選主
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
Proclaim
在不進行新的一輪選舉下宣佈一個新值,這裏調用主要是用來把最新的值修改成競選的時候傳入的 value
,我們傳入的是 ip,即保持最新的 IP 值,避免選舉成功後 IP
不是最新的值
func (e *Election) Proclaim(ctx context.Context, val string) error {
// ...
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
txn := client.Txn(ctx).If(cmp)
txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
tresp, terr := txn.Commit()
// ...
}
Resign
放棄 leader,將 leader 的 key-value 刪除,重新開始新一輪的選舉
func (e *Election) Resign(ctx context.Context) (err error) {
//...
client := e.session.Client()
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
//...
}
watchDeletes
等待前面的 key 都被刪除,則自己成爲最新的 key 時便成爲 master
代碼如下:
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
// 已經成爲最新的key,直接返回
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
// watch 該key,等待它刪除後再繼續進入下一次循環
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
// ...
// 循環監聽所有事件,如果是delete則直接進行返回,證明key已經被刪除
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
// ...
}
選主實現
-
創建一個 session,並對鏈接進行保活
-
參與選主,沒有成功則阻塞
-
成爲主節點後寫入數據到
notify chan
,外部讀取該管道的數據來執行主節點需要的任務。
func campaign(ctx context.Context, etcdClient *clientv3.Client, notify chan struct{}) (err error) {
session, err := concurrency.NewSession(etcdClient)
if err != nil {
return
}
defer func() {
if err != nil {
fmt.Println(err)
}
if session != nil {
_ = session.Close()
}
}()
election := concurrency.NewElection(session, "/etcd-campaign-demo")
ip, err := net_helper.GetLocalIP()
if err != nil {
return
}
// 成爲leader節點會運行出來,沒選舉成功則會阻塞在這裏
err = election.Campaign(ctx, ip)
if err != nil {
return
}
// 通知程序已經成爲leader,可以避免輪訓leader狀態
for {
select {
case notify <- struct{}{}: // 發送通知說明已經是leader了
case <-session.Done(): // session斷開要重新進行選舉
err = errors.New("session is done")
return
case <-ctx.Done():
// 給context設定了超時時間,避免放棄leader的時間超時
resignCtx, _ := context.WithTimeout(context.Background(), time.Second)
_ = election.Resign(resignCtx)
err = errors.New("ctx is done")
return
}
}
}
成功競選後自動續約如何實現
通過 lease ID 將 key 跟 session 的租約進行綁定,只要 session 不過期,那麼對應的 key 也就不會過期
我們來分析下 session 如何通過 keepalive 機制來進行保活。
整體的流程如下:
NewSession
的代碼邏輯如下:
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
// ...
// 創建租約
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = resp.ID
}
// 調用keepalive 進行保活
keepAlive, err := client.KeepAlive(ctx, id)
return s, nil
}
KeepAlive
代碼邏輯如下
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
// 將需要保活的id進行保存
l.mu.Lock()
ka, ok := l.keepAlives[id]
if !ok {
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
}
l.mu.Unlock()
// 第一次進入的時候會初始化循環進行保活
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
})
return ch, nil
}
receKeepAliveLook
代碼邏輯
func (l *lessor) recvKeepAliveLoop() (gerr error) {
for {
// 這裏面會啓動 sendKeepAliveLoop 用於持續續約
stream, err := l.resetRecv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
} else {
for {
// 循環接收 keepAlive 的請求,用於更新需要下次保活的時間等信息
resp, err := stream.Recv()
l.recvKeepAlive(resp)
}
}
}
}
sendKeepAliveLoop
用來給 ETCD server 發送續約請求
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
for {
var tosend []LeaseID
now := time.Now()
// 獲取所有需要發送保活的id
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.nextKeepAlive.Before(now) {
tosend = append(tosend, id)
}
}
l.mu.Unlock()
// 流式發送數據
for _, id := range tosend {
r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
if err := stream.Send(r); err != nil {
}
}
// ...
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/C0b-VqlKtrwApVHtVGy6yA