命令分發模式
命令分發模式 (command dispatcher pattern) 不屬於 23 種經典的設計模式。它是一種不太爲人所知的設計模式, 它主要用於構建可擴展、可插拔的系統架構, 將請求與執行請求的操作對象解耦。它類似於命令模式 (Command Pattern), 但更加靈活和動態。
雖然 Command Dispatch Pattern 不屬於那 23 種經典模式, 但它確實是一種很有價值的模式, 可以應用於需要在運行時動態添加、修改或刪除操作的系統中, 使系統更加靈活和可擴展。
這種模式通過允許方便的添加、替換或移除任何命令處理器, 非常的靈活,將命令調用和命令處理解耦。而且每個命令可以由單獨的命令處理器處理,代碼組織和維護也很方便。
實際上,對於 Gopher 來講,必然已經接觸到這個模式了,只不過少有人指出或者梳理這種模式,但是在標準庫和一些知名的項目中,其實已經自然的應用了,而且看起來整個架構也非常的清爽。
讓我們看一個標準庫實現 web 服務的例子:
package main
import (
"fmt"
"net/http"
)
// 定義 HTTP 處理程序函數
func homeHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "Welcome to the home page!")
}
func aboutHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "This is the about page.")
}
func main() {
// 創建一個新的 ServeMux 實例
mux := http.NewServeMux()
// 註冊處理程序函數
mux.HandleFunc("/", homeHandler)
mux.HandleFunc("/about", aboutHandler)
// 啓動 HTTP 服務器
fmt.Println("Starting server at :8080")
http.ListenAndServe(":8080", mux)
}
在這個示例中, 我們可以把 http.ServeMux
看作是 Command Dispatcher, 而 HTTP 請求就是一個命令。
通過 mux.HandleFunc
,將命令 (path,請求路徑) 與命令處理程序 (handler, 處理程序函數) 進行綁定的過程。
新增一個命令,就註冊一個新的處理程序。 移除一條命令,就刪除那一條綁定語句。 修改一條命令,就替換對應的處理程序。
Http server 收到 HTTP 請求後,解析出路徑信息,然後從註冊的信息中找到這個路徑對應的處理程序,然後調用這個處理程序。
這是一個經典的命令分發模式的應用。對於貝爾實驗室出來的 Rob Pike、Russ Cox 來說,它們和 GoF 設計模式的這一派屬於兩個門派,所以在 Go 語言中 Rob Pike、Russ Cox 很少會講到面向對象的設計模式,但是這種 HTTP 的這種實現方法我們可以把它歸類爲命令分發模式,而且是一個標準的模式實現。
接下來,我再給你介紹幾種知名項目中使用這種模式的例子。畢竟,這種設計模式應用的場景之一就是微服務:
-
分佈式系統:在分佈式系統中,命令分發模式可以用於將命令分配到不同的服務或節點進行處理。
-
微服務架構:在微服務架構中,命令分發模式可以用於協調不同微服務之間的命令處理。
-
複雜應用:在複雜應用中,命令分發模式可以用於解耦命令的發出者和處理者,提高系統的靈活性和可維護性。
Rpc 中處理
Go 生態圈中知名的微服務框架之一 rpcx[1] 實現了兩種命令分發方式:
-
類似 Go 標準庫的 rpc, 通過發射的方式找到對應的微服務方法,然後調用
-
第二種是類似 Go 標準庫這種路由綁定的方法,通過綁定 handler 方式
這兩種方式都屬於命令分發的設計模式,但是第二種更直觀。比如下面一個微服務的例子:
package main
import (
"flag"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func mul(ctx *server.Context) error {
var args example.Args
err := ctx.Bind(&args)
if err != nil {
return err
}
var reply example.Reply
reply.C = args.A * args.B
ctx.Write(reply)
return nil
}
func main() {
flag.Parse()
s := server.NewServer()
s.AddHandler("Arith", "Mul", mul)
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}
這裏例子以一個 乘法
的微服務做例子。函數 mul
是命令 (微服務) 處理函數。s.AddHandler
將微服務和命令處理函數進行綁定。這和 HTTP 的例子非常的類似。
server
就是一個分發器,它收到客戶端的微服務請求,解析出微服務的名稱 (名字和方法名),找到註冊的 handler, 把參數傳給 handler 去處理。
lesismal 實現的一個高性能的 Go 微服務框架 arpc[2] 也是採用的這種方式:
package main
import (
"log"
"github.com/lesismal/arpc"
)
func main() {
svr := arpc.NewServer()
// register router
svr.Handler.Handle("/echo/sync", func(ctx *arpc.Context) {
str := ""
err := ctx.Bind(&str)
ctx.Write(str)
log.Printf("/echo/sync: \"%v\", error: %v", str, err)
})
// register router
svr.Handler.Handle("/echo/async", func(ctx *arpc.Context) {
str := ""
err := ctx.Bind(&str)
go ctx.Write(str)
log.Printf("/echo/async: \"%v\", error: %v", str, err)
})
svr.Run("localhost:8888")
}
處理微服務、web 路由等這些場景。在基礎架構的軟件開發中,這種模式對於實現命令式的基礎服務組件,也非常的合適,接下來我給你介紹實現自研 memcached 和 redis 服務的基於命令分發模式實現的架構。
在自研基礎架構產品中的應用
自研類 memcached 的緩存系統
Memcached 是一種分佈式內存對象緩存系統, 用於加速動態 Web 應用程序的響應速度。它基於一種高效的基於內存的鍵值對存儲, 設計用於緩存小的數據塊。
Memcached 的主要優勢包括:
-
高性能: 基於內存操作, 能夠提供非常高的讀寫性能。
-
減少服務器負載: 通過緩存數據減輕了數據庫的訪問壓力。
-
可擴展性: 支持分佈式集羣部署, 能夠線性擴展。
Memcached 的作者是 Brad Fitzpatrick。也曾是 Go 開發團隊成員之一,維護 Go HTTP 庫等。也是 Go memcached client 庫 bradfitz/gomemcache:[3] 的作者。
這裏我們介紹的是 Go memcached 服務端的庫,在你實現類似 Memcached 服務的時候很有用。
Memcached 有文本和二進制兩種協議,這裏我們介紹的是文本協議,它比較簡單,而且也方便使用 telenet 等命令測試。下面是一些常用的 Memcached 命令:
- 存儲命令:
-
set key flags exptime bytes [noreply]
: 存儲一個鍵值對, 並設置可選的標誌、過期時間、數據長度和 noreply。 -
add key flags exptime bytes [noreply]
: 僅當鍵不存在時添加一個新的鍵值對。 -
replace key flags exptime bytes [noreply]
: 僅當鍵已存在時替換鍵的值。
- 檢索命令:
-
get key [key ...]
: 獲取一個或多個鍵的值。 -
gets key
: 獲取帶有 CAS 令牌的鍵值對, 用於檢查並設置操作。
- 操作命令:
-
incr key value [noreply]
: 將鍵的數值增加給定的值。 -
decr key value [noreply]
: 將鍵的數值減少給定的值。 -
append key flags exptime bytes [noreply]
: 將數據追加到一個已存在的鍵的值中。 -
prepend key flags exptime bytes [noreply]
: 將數據添加到一個現有鍵的值的開始部分。 -
cas keyflags exptime bytes unique-cas-token [noreply]
: 使用 CAS 令牌實現檢查並設置操作。
- 刪除命令:
delete key [noreply]
: 刪除一個鍵值對。
- 統計命令:
-
stats
: 獲取 Memcached 服務器統計信息。 -
stats reset
: 重置 Memcached 服務器統計信息。
- 其他命令:
-
flush_all [delay] [noreply]
: 清空整個 Memcached 服務器中的所有鍵值對數據。 -
version
: 獲取 Memcached 服務器版本信息。 -
quit
: 關閉 Memcached 連接。
這些命令通過 TCP 連接以文本形式發送給 Memcached 服務器, 服務器也以文本形式返回響應結果。例如, 成功的響應以 "OK" 開頭, 錯誤響應以 "ERROR" 或 "SERVER_ERROR" 開頭。
smallnest/gomemcached[4] 是實現 memcache server 端的一個庫,我們來看它的一個簡單例子:
mockServer = NewServer(addr)
mockServer.RegisterFunc("get", DefaultGet)
mockServer.RegisterFunc("gets", DefaultGet)
mockServer.RegisterFunc("set", DefaultSet)
mockServer.RegisterFunc("delete", DefaultDelete)
mockServer.RegisterFunc("incr", DefaultIncr)
mockServer.RegisterFunc("flush_all", DefaultFlushAll)
mockServer.RegisterFunc("version", DefaultVersion)
mockServer.Start()
Server 是一個命令分發器,你可以註冊你實現的命令處理函數。 你甚至擴展,爲你的緩存產品增加 memcached 不支持的命令,比如 auth
等等。
自研類 Redis 的系統
如果你要開發類似 Redis 的服務,也有一個非常知名的庫,甚至可以說是 Go 生態圈的首選,就是 tidwall/redcon[5]
它基於命令分發模式,提供了一個通用的 Redis 服務端框架,下面是一個它的例子:
package main
import (
"log"
"strings"
"sync"
"github.com/tidwall/redcon"
)
var addr = ":6380"
func main() {
var mu sync.RWMutex
var items = make(map[string][]byte)
var ps redcon.PubSub
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
items[string(cmd.Args[1])] = cmd.Args[2]
mu.Unlock()
conn.WriteString("OK")
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.RLock()
val, ok := items[string(cmd.Args[1])]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteBulk(val)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
_, ok := items[string(cmd.Args[1])]
delete(items, string(cmd.Args[1]))
mu.Unlock()
if !ok {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
case "publish":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
conn.WriteInt(ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])))
case "subscribe", "psubscribe":
if len(cmd.Args) < 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(conn, string(cmd.Args[i]))
} else {
ps.Subscribe(conn, string(cmd.Args[i]))
}
}
}
},
func(conn redcon.Conn) bool {
// Use this function to accept or deny the connection.
// log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
// This is called when the connection has been closed
// log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}
redcon.ListenAndServe 是一個命令分發器,你可以註冊你實現的命令處理函數。 在上面的例子中,每一個switch case
是一種命令的處理,這裏的例子很簡單,就沒有把每種命令的處理邏輯寫成一個個獨立的 handler, 而是直接在 case 中直接處理。如果我們要實現一個類 Redis 系統,最好的方式是把每種命令的處理邏輯寫成一個個獨立的 handler,這樣代碼更清晰,更易維護。 這些 handler 最好按照 redis 命令的分類,分成幾個文件,分別處理 string、set 等類型。
這樣的設計,不僅符合命令分發模式,而且符合單一職責原則,代碼更易維護。
從上面的幾個例子來看,命令分發模式在實現類似 Memcached、Redis 這種基礎架構產品中非常適用,它可以幫助我們實現一個靈活、可擴展的系統架構。本週的另外一篇文章,介紹一個基於 SQLite 的類 Redisa 的實現,也是採用了命令分發模式, 請關注 “鳥窩聊技術” 公衆號,及時獲取最新的文章。
參考資料
[1]
rpcx: https://rpcx.io
[2]
arpc: https://github.com/lesismal/arpc
[3]
bradfitz/gomemcache:: https://github.com/bradfitz/gomemcache
[4]
smallnest/gomemcached: https://github.com/smallnest/gomemcached
[5]
tidwall/redcon: https://github.com/tidwall/redcon
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/3E2UvCIx9_XpLbmBmu-yUg