Golang 狀態機設計模式

本文介紹了 Golang 狀態機模式的一個實現示例,通過該模式,可以解耦調用鏈,有助於實現測試友好的代碼,提高代碼質量。原文: Go State Machine Patterns[1]

導言

在我們開發的許多項目中,都需要依賴某種運行狀態從而實現連續操作。

這方面的例子包括:

很久以前,Rob Pike 有一個關於 Go 中詞法掃描 [2] 的演講,內容很講座,我看了好幾遍才真正理解。但演講中介紹的最基本知識之一就是某個版本的 Go 狀態機。

該狀態機利用了 Go 的能力,即從函數中創建類型並將函數賦值給變量。

他在演講中介紹的狀態機功能強大,打破了讓函數執行 if/else 並調用下一個所需函數的邏輯。取而代之的是,每個狀態都會返回下一個需要調用的函數。

這樣就能將調用鏈分成更容易測試的部分。

調用鏈

下面是一個用簡單的調用鏈來完成任務的例子:

func Caller(args Args) {
  callA(args)
  callB(args)
}

func Caller(args Args) {
  callA(args)
}

func callA(args Args) {
  callB(args)
}

func callB(args Args) {
  return
}

兩種方法都表示調用鏈,其中 Caller() 調用 callA(),並最終調用 callB(),從中可以看到這一系列調用是如何執行的。

當然,這種設計沒有任何問題,但當調用者遠程調用其他系統時,必須對這些遠程調用進行模擬 / 打樁,以提供密封測試。

你可能還想實現條件調用鏈,即根據某些參數或狀態,在特定條件下通過 if/else 調用不同函數。

這就意味着,要對 Caller() 進行密封測試,可能需要處理整個調用鏈中的樁函數。如果有 50 個調用層級,則可能需要對被測函數下面每個層級的所有函數進行模擬 / 打樁。

這正是 Pike 的狀態機設計大顯身手的地方。

狀態機模式

首先定義狀態:

type State[T any] func(ctx context.Context, args T) (T, State[T], error)

狀態表示爲函數 / 方法,接收一組參數(任意類型 T),並返回下一個狀態及其參數或錯誤信息。

如果返回的狀態爲 nil,那麼狀態機將停止運行。如果設置了 error,狀態機也將停止運行。因爲返回的是下一個要運行的狀態,所以根據不同的條件,會有不同的下一個狀態。

這個版本與 Pike 的狀態機的不同之處在於這裏包含了泛型並返回 T。這樣我們就可以創建純粹的函數式狀態機(如果需要的話),可以返回某個類型,並將其傳遞給下一個狀態。Pike 最初實現狀態機設計時還沒有使用泛型。

爲了實現這一目標,需要一個狀態驅動程序:

func Run[T any](ctx context.Context, args T, start State[T] "T any") (T, error) {
  var err error
  current := start
  for {
    if ctx.Err() != nil {
      return args, ctx.Err()
    }
    args, current, err = current(ctx, args)
    if err != nil {
      return args, err
    }
    if current == nil {
      return args, nil
    }
  }
}

寥寥幾行代碼,我們就有了一個功能強大的狀態驅動程序。

下面來看一個例子,在這個例子中,我們爲集羣中的服務關閉操作編寫了狀態機:

package remove

...

// storageClient provides the methods on a storage service
// that must be provided to use Remove().
type storageClient interface {
  RemoveBackups(ctx context.Context, service string, mustKeep int) error
  RemoveContainer(ctx context.Context, service string) error
}

// serviceClient provides methods to do operations for services 
// within a cluster.
type servicesClient interface {
  Drain(ctx context.Context, service string) error
  Remove(ctx context.Context, service string) error
  List(ctx context.Context) ([]string, error)
  HasStorage(ctx context.Context, service string) (bool, error)
}

這裏定義了幾個需要客戶實現的私有接口,以便從集羣中移除服務。

我們定義了私有接口,以防止他人使用我們的定義,但會通過公有變量公開這些接口。這樣,我們就能與客戶保持松耦合,保證只使用我們需要的方法。

// Args are arguments to Service().
type Args struct {
  // Name is the name of the service.
  Name string
  
  // Storage is a client that can remove storage backups and storage
  // containers for a service.
  Storage storageClient
  // Services is a client that allows the draining and removal of
  // a service from the cluster.
  Services servicesClient
}

func (a Args) validate(ctx context.Context) error {
  if a.Name == "" {
    return fmt.Errorf("Name cannot be an empty string")
  }

  if a.Storage == nil {
    return fmt.Errorf("Storage cannot be nil")
  }
  if a.Services == nil {
    return fmt.Errorf("Services cannot be nil")
  }
  return nil
}

這裏設置了要通過狀態傳遞的參數,可以將在一個狀態中設置並傳遞到另一個狀態的私有字段包括在內。

請注意,Args 並非指針。

由於我們修改了 Args 並將其傳遞給每個狀態,因此不需要給垃圾回收器增加負擔。對於像這樣操作來說,這點節約微不足道,但在工作量大的 ETL 管道中,節約的時間可能就很明顯了。

實現中包含 validate() 方法,用於測試參數是否滿足使用的最低基本要求。

// Service removes a service from a cluster and associated storage.
// The last 3 storage backups are retained for whatever the storage retainment
// period is.
func Service(ctx context.Context, args Args) error {
  if err := args.validate(); err != nil {
    return err
  }
  
  start := drainService
  _, err := Run[Args](ctx, args, start "Args")
  if err != nil {
    return fmt.Errorf("problem removing service %q: %w", args.Name, err)
  }
  return nil
}

用戶只需調用 Service(),傳入 Args,如果出錯就會收到錯誤信息。用戶不需要看到狀態機模式,也不需要理解狀態機模式就能執行操作。

我們只需驗證 Args 是否正確,將狀態機的起始狀態設置爲名爲 drainService 的函數,然後調用上面定義的 Run() 函數即可。

func drainService(ctx context.Context, args Args) (Args, State[Args], error) {
  l, err := args.Services.List(ctx)
  if err != nil {
    return args, nil, err
  }

  found := false
  for _, entry := range l {
    if entry == args.Name {
      found = true
      break
    }
  }
  if !found {
    return args, nil, fmt.Errorf("the service was not found")
  }

  if err := args.Services.Drain(ctx, args.Name); err != nil {
    return args, nil, fmt.Errorf("problem draining the service: %w", err)
  }

  return args, removeService, nil
}

我們的第一個狀態叫做 drainService(),實現了上面定義的狀態類型。

它使用 Args 中定義的 Services 客戶端列出集羣中的所有服務,如果找不到服務,就會返回錯誤並結束狀態機。

如果找到服務,就會對服務執行關閉。一旦完成,就進入下一個狀態,即 removeService()

func removeService(ctx context.Context, args Args) (Args, State[Args], error) {
  if err := args.Services.Remove(ctx, args.Name); err != nil {
    return args, nil, fmt.Errorf("could not remove the service: %w", err)
  }

  hasStorage, err := args.Services.HasStorage(ctx, args.Name)
  if err != nil {
    return args, nil, fmt.Errorf("HasStorage() failed: %w", err)
  }
  if hasStorage{
    return args, removeBackups, nil
  }

  return args, nil, nil
}

removeService() 使用我們的 Services 客戶端將服務從羣集中移除。

調用 HasStorage() 方法確定是否有存儲,如果有,就會進入 removeBackups() 狀態,否則就會返回 args, nil, nil,這將導致狀態機在無錯誤的情況下退出。

這個示例說明如何根據 Args 中的信息或代碼中的遠程調用在狀態機中創建分支。

其他狀態調用由你自行決定。我們看看這種設計如何更適合測試此類操作。

測試優勢

這種模式首先鼓勵的是小塊的可測試代碼,模塊變得很容易分割,這樣當代碼塊變得太大時,只需創建新的狀態來隔離代碼塊。

但更大的優勢在於無需進行大規模端到端測試。由於操作流程中的每個階段都需要調用下一階段,因此會出現以下情況:

兩者都會導致某種類型的端到端測試,而這種測試本不需要。

如果我們對頂層調用者方法進行編碼,可能看起來像這樣:

func Service(ctx context.Context, args Args) error {
  ...
  if err := drainService(ctx, args); err != nil {
    return err
  }

  if err := removeService(ctx, args); err != nil {
    return err
  }

  hasStorage, err := args.Services.HasStorage(ctx, args.Name)
  if err != nil {
    return err
  }

  if hasStorage{
    if err := removeBackups(ctx, args); err != nil {
      return err
    }
    if err := removeStorage(ctx, args); err != nil {
      return err
    }
  }
  return nil
}

如你所見,可以爲所有子函數編寫單獨的測試,但要測試 Service(),現在必須對調用的所有客戶端或方法打樁。這看起來就像是端到端測試,而對於這類代碼來說,通常不是好主意。

如果轉到功能調用鏈,情況也不會好到哪裏去:

func Service(ctx context.Context, args Args) error {
  ...
  return drainService(ctx, args)
}

func drainService(ctx context.Context, args Args) (Args, error) {
  ...
  return removeService(ctx, args)
}

func removeService(ctx context.Context, args Args) (Args, error) {
  ...
  hasStorage, err := args.Services.HasStorage(ctx, args.Name)
  if err != nil {
    return args, fmt.Errorf("HasStorage() failed: %w", err)
  }
  
  if hasStorage{
    return removeBackups(ctx, args)
  }

  return nil
}
...

當我們測試時,越接近調用鏈的頂端,測試的實現就變得越困難。在 Service() 中,必須測試 drainService()removeService() 以及下面所有調用。

有幾種方法可以做到,但都不太好。

如果使用狀態機,只需測試每個階段是否按要求運行,並返回想要的下一階段。

頂層調用者甚至不需要測試,它只是調用 validate() 方法,並調用應該能夠被測試的 Run() 函數。

我們爲 drainService() 編寫一個表驅動測試,這裏會拷貝一份 drainService() 代碼,這樣就不用返回到前面看代碼了。

func drainService(ctx context.Context, args Args) (Args, State[Args], error) {
  l, err := args.Services.List(ctx)
  if err != nil {
    return args, nil, err
  }

  found := false
  for _, entry := range l {
    if entry == args.Name {
      found = true
      break
    }
  }
  if !found {
    return args, nil, fmt.Errorf("the service was not found")
  }

  if err := args.Services.Drain(ctx, args.Name); err != nil {
    return args, nil, fmt.Errorf("problem draining the service: %w", err)
  }

  return args, removeService, nil
}

func TestDrainSerivce(t *testing.T) {
  t.Parallel()

  tests := []struct {
    name      string
    args      Args
    wantErr   bool
    wantState State[Args]
  }{
    {
      name: "Error: Services.List() returns an error",
      args: Args{
        Services: &fakeServices{
          list: fmt.Errorf("error"),
        },
      },
      wantErr: true,
    },
    {
      name: "Error: Services.List() didn't contain our service name",
      args: Args{
        Name: "myService",
        Services: &fakeServices{
          list: []string{"nope""this""isn't""it"},
        },
      },
      wantErr: true,
    },
    {
      name: "Error: Services.Drain() returned an error",
      args: Args{
        Name: "myService",
        Services: &fakeServices{
          list:  []string{"yes""mySerivce""is""here"},
          drain: fmt.Errorf("error"),
        },
      },
      wantErr: true,
    },
    {
      name: "Success",
      args: Args{
        Name: "myService",
        Services: &fakeServices{
          list:  []string{"yes""myService""is""here"},
          drain: nil,
        },
      },
      wantState: removeService,
    },
  }

  for _, test := range tests {
    _, nextState, err := drainService(context.Background(), test.args)
    switch {
    case err == nil && test.wantErr:
      t.Errorf("TestDrainService(%s): got err == nil, want err != nil", test.name)
      continue
    case err != nil && !test.wantErr:
      t.Errorf("TestDrainService(%s): got err == %s, want err == nil", test.name, err)
      continue
    case err != nil:
      continue
    }
  
    gotState := methodName(nextState)
    wantState := methodName(test.wantState)
    if gotState != wantState {
      t.Errorf("TestDrainService(%s): got next state %s, want %s", test.name, gotState, wantState)
    }
  }
}

可以在 Go Playground[3] 玩一下。

如你所見,這避免了測試整個調用鏈,同時還能確保測試調用鏈中的下一個函數。

這些測試很容易劃分,維護人員也很容易遵循。

其他可能性

這種模式也有變種,即根據 Args 中設置的字段確定狀態,並跟蹤狀態的執行以防止循環。

在第一種情況下,狀態機軟件包可能是這樣的:

type State[T any] func(ctx context.Context, args T) (T, State[T], error)

type Args[T] struct {
  Data T

  Next State
}


func Run[T any](ctx context.Context, args Args[T], start State[T] "T any") (T, error) {
  var err error
  current := start
  for {
    if ctx.Err() != nil {
      return args, ctx.Err()
    }
    args, current, err = current(ctx, args)
    if err != nil {
      return args, err
    }
    current = args.Next // Set our next stage
    args.Next = nil // Clear this so to prevent infinite loops

    if current == nil {
      return args, nil
    }
  }
}

可以很容易的將分佈式跟蹤或日誌記錄集成到這種設計中。

如果希望推送大量數據並利用併發優勢,不妨試試 stagedpipe 軟件包 [4],其內置了大量高級功能,可以看視頻和 README 學習如何使用。

希望這篇文章能讓你充分了解 Go 狀態機設計模式,現在你的工具箱裏多了一個強大的新工具。


你好,我是俞凡,在 Motorola 做過研發,現在在 Mavenir 做技術工作,對通信、網絡、後端架構、雲原生、DevOps、CICD、區塊鏈、AI 等技術始終保持着濃厚的興趣,平時喜歡閱讀、思考,相信持續學習、終身成長,歡迎一起交流學習。爲了方便大家以後能第一時間看到文章,請朋友們關注公衆號 "DeepNoMind",並設個星標吧,如果能一鍵三連 (轉發、點贊、在看),則能給我帶來更多的支持和動力,激勵我持續寫下去,和大家共同成長進步!

參考資料

[1]

Go State Machine Patterns: https://medium.com/@johnsiilver/go-state-machine-patterns-3b667f345b5e

[2]

Rob Pike 關於詞法掃描的演講: https://www.youtube.com/watch?v=HxaD_trXwRE,

[3]

Go Playground: https://go.dev/play/p/HcgYkQOjeIz

[4]

stagedpipe 軟件包: https://github.com/gostdlib/concurrency/tree/main/pipelines/stagedpipe

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-OYJKu7PBqmaM-uPTgqa3Q