Go 命令模式實戰
隨着分佈式應用越來越廣泛,數據操作的一致性需要由業務方來保證,兩個操作不在同一個事務操作失敗時,無法依靠數據庫事務直接回滾,這時需要業務方手動進行回滾。
通過命令模式提供新舊數據初始化後,如果執行後有後續操作失敗,業務方可以通過調用 Undo
方法進行回滾,無需再編寫數據庫操作代碼
優點是減少了回滾代碼的重複編寫,只要提供新舊數據,通過 Do
和 UnDo
就可以完成對應操作。
並且有些時候可能 Undo
方法不能保證一定執行成功的話,可以通過結果由業務方決定是否要發送 MQ 來保證數據最終一致性。
並且如果操作之間沒有相互依賴關係,可以通過併發操作來提高運行效率,執行流程圖如下
具體實現
TransactionCommand
接口定義
type TransactionCommand interface {
Do() (define.ExecuteResult, error)
UnDo() (define.ExecuteResult, error)
GetContext() context.Context
}
目前實現了 DB 操作的 Do 和 Undo ,其他存儲類型的正逆向操作同樣進行 接口實現即可
type transactionCommandDBImpl struct {
OriginData define.TransactionUpdateData
NewData define.TransactionUpdateData
WhereCondition define.FilterCondition
ctx context.Context
maxRetryCount int
}
func NewTransactionCommandUpdateDBImpl(
originData define.TransactionUpdateData,
newData define.TransactionUpdateData,
whereCondition define.FilterCondition,
ctx context.Context,
maxRetryCnt int,
) *transactionCommandDBImpl {
return &transactionCommandDBImpl{
OriginData: originData,
NewData: newData,
WhereCondition: whereCondition,
ctx: ctx,
maxRetryCount: maxRetryCnt,
}
}
func (t *transactionCommandDBImpl) GetContext() context.Context {
return t.ctx
}
這裏的具體實現是數據庫的操作,如果需要有其他操作,則實現的時候替換成更新其他內容即可
func (t *transactionCommandDBImpl) Do() (define.ExecuteResult, error) {
engine := t.getDbClient()
// 通過反射注入where 條件
engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.NewData.GetUpdateFields())
if err := res.GetError(); err != nil {
return nil, errors.WithStack(err)
}
executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
return executeResult, nil
}
func (t *transactionCommandDBImpl) UnDo() (define.ExecuteResult, error) {
engine := t.getDbClient()
// 通過反射注入where 條件
engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.OriginData.GetUpdateFields())
if err := res.GetError(); err != nil {
return nil, errors.WithStack(err)
}
executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
return executeResult, nil
}
命令執行器 TransactionCommandExecutor
type TransactionCommandExecutor struct {
commands []TransactionCommand
successCommands []TransactionCommand
}
func NewTransactionCommandExecutor(commands []TransactionCommand) *TransactionCommandExecutor {
return &TransactionCommandExecutor{
commands: commands,
successCommands: make([]TransactionCommand, 0, len(commands)),
}
}
自動執行所有命令,如果有錯誤則進行返回
func (e *TransactionCommandExecutor) AutoExecute(ctx context.Context) error {
err := e.Execute(ctx)
if err != nil || len(e.failCommands) > 0 {
logger.CtxLogErrorf(ctx, "err : %+v", err)
if unDoErr := e.UndoSuccessCommand(ctx); unDoErr != nil {
logger.CtxLogErrorf(ctx, "undo success command err : %+v", unDoErr)
}
return err
}
return nil
}
併發執行傳入的命令
// Execute
// @Description: 併發執行更新操作,一個發生錯誤則全部undo
func (e *TransactionCommandExecutor) Execute(ctx context.Context) error {
if len(e.commands) == 0 {
return nil}
e.wg.Add(len(e.commands))
for _, c := range e.commands {
go func(transactionCommand TransactionCommand) {
defer e.wg.Done()
// 如果已經有失敗的命令則立即返回
if len(e.failCommands) > 0 {
return
}
executeResult, err := transactionCommand.Do()
if err != nil {
logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
e.appendFail(transactionCommand)
return
}
if !executeResult.IsSuccess() {
e.appendFail(transactionCommand)
return
}
e.appendSuccess(transactionCommand)
}(c)
}
e.wg.Wait()
return nil
}
將失敗和成功的命令保存下來,用於接下來的回滾操作
func (e *TransactionCommandExecutor) appendSuccess(command TransactionCommand) {
e.Lock()
e.successCommands = append(e.successCommands, command)
e.Unlock()
}
func (e *TransactionCommandExecutor) appendFail(command TransactionCommand) {
e.Lock()
e.failCommands = append(e.failCommands, command)
e.Unlock()
}
回滾所有未成功的命令,這裏假設了各個命令之間沒有先後順序,可以併發執行
由於上面已經保存了成功的命令,所以只需要回滾成功的命令即可,失敗的命令則不需要回滾。
// UndoSuccessCommand
// @Description: 併發回滾操作
func (e *TransactionCommandExecutor) UndoSuccessCommand(ctx context.Context) error {
if len(e.successCommands) == 0 {
return nil
}
if len(e.failCommands) == 0 {
return nil
}
// 併發進行回滾
e.wg.Add(len(e.commands))
for _, c := range e.successCommands {
go func(transactionCommand TransactionCommand) {
defer e.wg.Done()
executeResult, err := transactionCommand.UnDo()
if err != nil {
logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
return
}
if !executeResult.IsSuccess() {
// log or return*
return
}
}(c)
}
e.wg.Wait()
return nil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/S5YqeiFb4T9q-9DRpBtlRw