Golang 實現異步隊列

源代碼已託管至 Github: https://github.com/gofish2020/easyqueue

爲什麼需要異步隊列?

在高併發系統中,如果同時有大量的請求 / 任務需要進行處理,由於系統的負載能力有限,一次性能夠同時處理的請求 / 任務是有限的。那麼這些任務就需要進行排隊,先來的先處理,後來的後處理。如果超過了整個隊列能夠容納的請求 / 任務,那麼後續的請求 / 任務直接拒絕掉(丟棄,是不是覺得這裏說的不對?),保證整個業務系統的正常運行。

舉個例子:比如我們要做一個秒殺的項目,有個商品數量只有 300 個,但是同一時刻,有 300w 個用戶來搶商品。對於系統來說,我們需要這 300w 個用戶同時進入到數據庫中搶商品嗎?肯定是不現實的,數據庫是會承受不了的(呀咩爹)。因爲只有 300 個商品,理論上我們只需要讓 300 個(不同的)用戶進來即可,剩下的 299.9700w 個用戶直接拒絕掉。將這 300 個用戶處理完以後,可能商品還有剩餘(比如有人退款了),搶購肯定是一直有人在搶,那麼繼續再來 300 個用戶,繼續搶購剩下的商品,直到商品全部被搶購完。

這裏的 300 只是個比喻,比如你有 500 個商品,每次讓 100 個用戶來搶,每搶完一批,再放 100 用戶進來,直到商品搶完(或搶購到期自動結束)。

我們一般的http請求,來 300w 個請求就會直接處理 300w 請求,並不能做到讓請求阻塞排隊,並異步處理的效果。所以,需要設計一個隊列,讓請求阻塞,並通過異步處理的方式處理http請求,這樣我們就可以控制同時讀寫數據庫的請求數量,不至於把數據庫打崩。

邏輯架構

整個的模型就是【生產者】【消費者】模型。這種模型真的好通用,如果看過之前的EasyRedis系列,裏面的很多設計都是類似的。

隊列數量和 worker 數量都是可配置的

測試 Demo

用 gin 框架演示異步處理http請求:

說明:這裏的測試 demo 只是單機版的,所以庫存放在本地。如果是實際的業務系統(會啓動多個 http 服務器)每個請求會負載到不同的 web 服務器上處,庫存信息是要放到Redis or Mysql中,那就需要分佈式鎖保證扣減庫存不會存在超賣問題。

package main

import (
	"net/http"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/gofish2020/easyqueue"
)

// 創建隊列(1個隊列,容量 100,消費協程 1個)
var g_Queue = easyqueue.CreateEasyQueue(easyqueue.SetQueueParttion(1), easyqueue.SetQueueCapacity(100), easyqueue.SetWorkerNum(1))

var cacheNum atomic.Int64
var errCount atomic.Int64
var sucessCount atomic.Int64
var selloutCount atomic.Int64

var pingCount atomic.Int64

func main() {

	// 20個庫存的商品
	cacheNum.Store(20)

	r := gin.Default()

	// 結果統計
	r.GET("/cache", func(c *gin.Context) {

		c.JSON(http.StatusOK, gin.H{
			"商品剩餘數量":      strconv.Itoa(int(cacheNum.Load())),
			"售罄->沒買到的用戶":  strconv.Itoa(int(selloutCount.Load())),
			"隊列滿->丟棄的用戶數": strconv.Itoa(int(errCount.Load())),
			"成功->搶購的用戶數":  strconv.Itoa(int(sucessCount.Load())),
			"搶購鏈接->總調用次數": strconv.Itoa(int(pingCount.Load())),
		})
	})

	// 增加商品庫存
	r.GET("/add", func(c *gin.Context) {
		cacheNum.Add(20)

		c.JSON(http.StatusOK, gin.H{
			"message": "success",
		})
	})

	// 秒殺商品
	r.GET("/ping", func(c *gin.Context) {

		pingCount.Add(1)

		// 如果已經售罄直接返回
		if cacheNum.Load() == 0 {
			selloutCount.Add(1)
			c.JSON(http.StatusOK, gin.H{
				"message": "商品售罄",
			})

			return
		}

		// 進入隊列
		waitJob := g_Queue.Push(func() {

			// 寫http請求中自己的業務邏輯....

			//1. 先判斷是否售罄
			if cacheNum.Load() == 0 {
				selloutCount.Add(1)
				c.JSON(http.StatusOK, gin.H{
					"message": "商品售罄",
				})
			} else { //2.未售罄

				// 3.扣減庫存
				cacheNum.Add(-1)
				// 4.下單邏輯..(模擬業務有點慢)
				time.Sleep(500 * time.Millisecond)
				// 5.告訴客戶搶購成功
				sucessCount.Add(1)
				c.JSON(http.StatusOK, gin.H{
					"message": "搶購成功",
				})
			}

		})

		// 阻塞等待
		<-waitJob.Done()

		// 如果阻塞返回,err不爲nil,說明上面的隊列任務沒有執行,這裏做補償邏輯
		if waitJob.Err() == easyqueue.ErrOverFlow { // 表示進入隊列失敗(隊列滿了)
			errCount.Add(1)
			c.JSON(http.StatusOK, gin.H{
				"message": "搶購人數過多,請重試...",
			})
		}
	})
	r.Run()
}

ab 壓測命令:

ab是一個壓測工具,MacOS系統自帶該工具,你可以本地直接ab -V查看是否存在。

-c 表示併發請求數,這裏是 100(如果想更大的併發請求數,需要調高操作系統的文件句柄數,不然 ab 工具會報錯 too many files) 

-n 表示總共發送多少請求,這裏是 1w

ab -c 100 -n 10000 http://127.0.0.1:8080/ping

然後瀏覽器訪問 http://127.0.0.1:8080/cache查看秒殺統計結果

你也可以繼續調用http://127.0.0.1:8080/add增加庫存,繼續重複上面【秒殺 + 查看秒殺結果】

代碼邏輯概述

EasyQueue結構體,是對 queue Queuewg *workerGroup的一個額外的包裝(目的讓使用的時候更方便),本身唯一的一個函數Push也是調用的隊列eq.queue.Push(job)Push方法。

所以只需要看 createMultiJobQueuecreateWorkerMange方法即可。

type EasyQueue struct {
	config Config       // 配置
	queue  Queue        // 隊列
	wg     *workerGroup // 消費組
}

func (eq *EasyQueue) Push(fn func()) WaitJob {
	job := newJob(fn)
	eq.queue.Push(job)
	return job
}

func CreateEasyQueue(cfs ...configFunc) *EasyQueue {

	conf := Config{}
	for _, cf := range cfs {
		cf(&conf)
	}

	eq := EasyQueue{
		config: conf,
	}

	eq.queue = createMultiJobQueue(conf.QueuePartition, conf.QueueCapacity)
	eq.wg = createWorkerMange(eq.queue, conf.WorkersNum)

	return &eq
}

createMultiJobQueue

該函數本質就是初始化一個切片queues []*jobQueue。基於配置的隊列個數,創建partition個隊列createJobQueue(queueCapacity),每個隊列有個容量。所以,總容量就是【隊列個數 * 單個隊列的容量】

Push方法基於自增 id 對隊列個數取模,將job按照輪詢的方式放入隊列中。保證隊列數據的負載均衡

// 多隊列
type multiJobQueue struct {
	queues   []*jobQueue
	parition int

	pushIdx *idGenerator
	popIdx  *idGenerator
}

// partition * perCap = All Capacity
func createMultiJobQueue(partition, queueCapacity int) *multiJobQueue {

	if partition < 1 {
		panic("partition must bigger than 0")
	}

	multi := &multiJobQueue{
		parition: partition,
		pushIdx:  newIDGenerator(),
		popIdx:   newIDGenerator(),
	}

	for i := 0; i < partition; i++ {
		multi.queues = append(multi.queues, createJobQueue(queueCapacity))
	}
	return multi
}

// 按照 Round-Robin的方式選擇隊列,並放入任務
func (mjq *multiJobQueue) Push(jb JobInterface) {
	mjq.queues[mjq.pushIdx.Next()%uint64(mjq.parition)].Push(jb)
}

createWorkerMange

基於配置的工作協程數量,創建workerNum個工作協程createWorker並啓動worker.Run()消費邏輯。將所有的協程都保存到切片workers []*worker

Adjust目的想動態的調整工作協程的數量,加鎖(避免競態)修改切片 workers    []*worker

停止協程只需調用wm.workers[i].Stop()方法給協程打個標記,協程會自動停止。

// 消費組
type workerGroup struct {
	mu         sync.Mutex
	workers    []*worker
	workersNum int
	queue      Queue
}

func createWorkerMange(queue Queue, workerNum int) *workerGroup {
	mange := workerGroup{
		workersNum: workerNum,
		queue:      queue,
	}
	for i := 0; i < workerNum; i++ {
		worker := createWorker(queue)
		worker.Run()
		mange.workers = append(mange.workers, worker)
	}
	return &mange
}


// 動態調整消費者的數量
func (wm *workerGroup) Adjust(workerNum int) {
	if workerNum == wm.workersNum {
		return// do nothing
	}

	wm.mu.Lock()
	defer wm.mu.Unlock()

	if workerNum > wm.workersNum { // 增加
		for i := wm.workersNum; i < workerNum; i++ {
			worker := createWorker(wm.queue)
			worker.Run()
			wm.workers = append(wm.workers, worker)
		}

	} else { // 減少
		for i := workerNum; i < wm.workersNum; i++ {
			wm.workers[i].Stop()
		}
		wm.workers = wm.workers[:wm.workersNum-workerNum]
	}

	wm.workersNum = workerNum
}

消費邏輯

隊列和工作組都創建完成後,只需要看func (w *worker) Run()方法即可知道協程是怎麼消費的。

本質就是調用隊列的w.queue.PopTimeout方法,彈出任務然後執行任務job.DoJob()

這裏之所以調用帶超時時間的。因爲如果本協程消費的隊列中一直沒有任務,那麼本協程會一直阻塞不返回,但是其他隊列中可能有任務,本協程因爲阻塞在隊列上,是不可能去消費其他隊列的。那就會出現假死的現象。

所以,我們需要在本隊列沒有任務的時候,延遲一會放棄本隊列,然後通過 Round-Robin 的策略選擇下一個隊列去消費。

type worker struct {
	queue  Queue        // 工作協程監視的隊列
	closed atomic.Int32 // 是否停止工作協程標識
}

func createWorker(queue Queue) *worker {
	return &worker{
		queue:  queue,
		closed: atomic.Int32{},
	}
}

func (w *worker) Run() {

	w.closed.Store(0)
	gofunc() {
		for {

			if w.closed.Load() > 0 { // 關閉worker
				break
			}

			job := w.queue.PopTimeout(1 * time.Millisecond)
			//job := w.queue.Pop()
			if job != nil {
				job.DoJob()
			}
		}
	}()
}

func (w *worker) Stop() {
	w.closed.Store(1)
}

擴展思考

上面的這種方案是針對請求是可以丟棄的。如果你做的系統比如微博 / 論壇類型,有個發帖功能,做了一個活動,發帖有獎金,那麼可能會有 300w 個人同時在線發帖,那麼這些 http 請求都是有效的請求,一般情況下我們是不能丟棄的。在高併發系統中,我們可以做限流,比如用戶 5s 內只能發 1 篇帖子。但是對個人限流完以後,可能還會存在 200w 的有效請求,此時我們可以這麼做(如下圖),也是通過異步的方式(解耦)。如果採用同步的方式保存到數據庫中:

都需要耗費大量的時間,對用戶的體驗就不好,會感覺系統很卡頓。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/j21kENP0RUYlS2kchH8d2w