Go 命令模式實戰

隨着分佈式應用越來越廣泛,數據操作的一致性需要由業務方來保證,兩個操作不在同一個事務操作失敗時,無法依靠數據庫事務直接回滾,這時需要業務方手動進行回滾。

通過命令模式提供新舊數據初始化後,如果執行後有後續操作失敗,業務方可以通過調用 Undo方法進行回滾,無需再編寫數據庫操作代碼

優點是減少了回滾代碼的重複編寫,只要提供新舊數據,通過 Do和 UnDo就可以完成對應操作。

並且有些時候可能 Undo 方法不能保證一定執行成功的話,可以通過結果由業務方決定是否要發送 MQ 來保證數據最終一致性。

並且如果操作之間沒有相互依賴關係,可以通過併發操作來提高運行效率,執行流程圖如下

具體實現

TransactionCommand 接口定義

type TransactionCommand interface {
 Do() (define.ExecuteResult, error)
 UnDo() (define.ExecuteResult, error)
 GetContext() context.Context
}

目前實現了 DB 操作的 Do 和 Undo ,其他存儲類型的正逆向操作同樣進行 接口實現即可

type transactionCommandDBImpl struct {
 OriginData     define.TransactionUpdateData
 NewData        define.TransactionUpdateData
 WhereCondition define.FilterCondition
 ctx            context.Context
 maxRetryCount  int
}

func NewTransactionCommandUpdateDBImpl(
 originData define.TransactionUpdateData,
 newData define.TransactionUpdateData,
 whereCondition define.FilterCondition,
 ctx context.Context,
 maxRetryCnt int,
) *transactionCommandDBImpl {
 return &transactionCommandDBImpl{
  OriginData:     originData,
  NewData:        newData,
  WhereCondition: whereCondition,
  ctx:            ctx,
  maxRetryCount:  maxRetryCnt,
 }
}

func (t *transactionCommandDBImpl) GetContext() context.Context {
 return t.ctx
}

這裏的具體實現是數據庫的操作,如果需要有其他操作,則實現的時候替換成更新其他內容即可

func (t *transactionCommandDBImpl) Do() (define.ExecuteResult, error) {
 engine := t.getDbClient()
 // 通過反射注入where 條件
 engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
 res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.NewData.GetUpdateFields())
 if err := res.GetError(); err != nil {
  return nil, errors.WithStack(err)
 }
 executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
 return executeResult, nil
}

func (t *transactionCommandDBImpl) UnDo() (define.ExecuteResult, error) {
 engine := t.getDbClient()
 // 通過反射注入where 條件
 engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
 res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.OriginData.GetUpdateFields())
 if err := res.GetError(); err != nil {
  return nil, errors.WithStack(err)
 }
 executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
 return executeResult, nil
}

命令執行器 TransactionCommandExecutor

type TransactionCommandExecutor struct {
 commands        []TransactionCommand
 successCommands []TransactionCommand
}

func NewTransactionCommandExecutor(commands []TransactionCommand) *TransactionCommandExecutor {
 return &TransactionCommandExecutor{
  commands:        commands,
  successCommands: make([]TransactionCommand, 0, len(commands)),
 }
}

自動執行所有命令,如果有錯誤則進行返回

func (e *TransactionCommandExecutor) AutoExecute(ctx context.Context) error {
 err := e.Execute(ctx)
 if err != nil || len(e.failCommands) > 0 {
  logger.CtxLogErrorf(ctx, "err : %+v", err)
  if unDoErr := e.UndoSuccessCommand(ctx); unDoErr != nil {
   logger.CtxLogErrorf(ctx, "undo success command err : %+v", unDoErr)
  }
  return err
 }
 return nil
}

併發執行傳入的命令

// Execute
// @Description: 併發執行更新操作,一個發生錯誤則全部undo
func (e *TransactionCommandExecutor) Execute(ctx context.Context) error {
 if len(e.commands) == 0 {
  return nil}

 e.wg.Add(len(e.commands))
 for _, c := range e.commands {
  go func(transactionCommand TransactionCommand) {
   defer e.wg.Done()
   // 如果已經有失敗的命令則立即返回
   if len(e.failCommands) > 0 {
    return
   }
   executeResult, err := transactionCommand.Do()
   if err != nil {
    logger.CtxLogErrorf(transactionCommand.GetContext()"execute : %+v", err)
    e.appendFail(transactionCommand)
    return
   }
   if !executeResult.IsSuccess() {
    e.appendFail(transactionCommand)
    return
   }
   e.appendSuccess(transactionCommand)
  }(c)
 }
 e.wg.Wait()
 return nil
}

將失敗和成功的命令保存下來,用於接下來的回滾操作

func (e *TransactionCommandExecutor) appendSuccess(command TransactionCommand) {
 e.Lock()
 e.successCommands = append(e.successCommands, command)
 e.Unlock()
}

func (e *TransactionCommandExecutor) appendFail(command TransactionCommand) {
 e.Lock()
 e.failCommands = append(e.failCommands, command)
 e.Unlock()
}

回滾所有未成功的命令,這裏假設了各個命令之間沒有先後順序,可以併發執行

由於上面已經保存了成功的命令,所以只需要回滾成功的命令即可,失敗的命令則不需要回滾。

// UndoSuccessCommand
// @Description: 併發回滾操作
func (e *TransactionCommandExecutor) UndoSuccessCommand(ctx context.Context) error {
 if len(e.successCommands) == 0 {
  return nil
 }
 if len(e.failCommands) == 0 {
  return nil
 }
 // 併發進行回滾
 e.wg.Add(len(e.commands))
 for _, c := range e.successCommands {
  go func(transactionCommand TransactionCommand) {
   defer e.wg.Done()
   executeResult, err := transactionCommand.UnDo()
   if err != nil {
    logger.CtxLogErrorf(transactionCommand.GetContext()"execute : %+v", err)
    return
   }
   if !executeResult.IsSuccess() {
    // log or return*
    return
   }
  }(c)
 }
 e.wg.Wait()
 return nil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/S5YqeiFb4T9q-9DRpBtlRw