RocketMQ 事務消息解析!
單體架構下的事務
在單體系統的開發過程中,假如某個場景下需要對數據庫的多張表進行操作,爲了保證數據的一致性,一般會使用事務,將所有的操作全部提交或者在出錯的時候全部回滾。
以創建訂單爲例,假設下單後需要做兩個操作:
在訂單表生成訂單。
在積分表增加本次訂單增加的積分記錄。
在單體架構下只需使用@Transactional
開啓事務,就可以保證數據的一致性。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 生成訂單
orderService.createOrder(orderId);
// 增加積分
creditService.addCredits(orderId);
}
但在分佈式架構下,訂單系統和積分系統可能是兩個獨立的服務,此時就不能使用上述的方法開啓事務了,因爲它們不處於同一個事務中。
- 在出錯的情況下,無法進行全部回滾,只能對當前服務的事務進行回滾。
所以就有可能出現訂單生成成功但是積分服務增加積分失敗的情況(也可能相反),此時數據處於不一致的狀態。
分佈式架構下的事務
以下單流程爲例,在分佈式架構下的處理流程如下:
訂單服務生成訂單。
發送訂單生成的
MQ
消息,積分服務訂閱消息,有新的訂單生成之後消費消息,增加對應的積分記錄。
普通 MQ 消息存在的問題
假如訂單創建成功,
MQ
消息發送成功,但是order
方法在返回的前一刻,服務突然宕機。由於開啓了事務,事務還未提交(方法結束後纔會正常提交)。
所以訂單表並未生成記錄,但是
MQ
卻已經發送成功並且被積分服務消費,此時就會存在訂單未創建但是積分記錄增加的情況。假如先發送
MQ
消息再創建訂單,如果 MQ 消息發送成功,創建訂單失敗,那麼同樣處於不一致的狀態。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 創建訂單
Order order = orderService.createOrder(orderDTO.getOrderId());
// 發送訂單創建的MQ消息
sendOrderMessge(order);
return;
}
可以使用RocketMQ
事務消息解決上述問題。
RocketMQ 事務消息基礎流程
Apache RocketMQ
在4.3.0
版中已經支持分佈式事務消息。事務消息是
RocketMQ
提供的一種消息類型,支持在分佈式場景下保障消息生產和本地事務的最終一致性。
RocketMQ
採用了2PC
的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息。
基本流程
第一階段:
發送 Message,
Half Message
,即半事務消息。此類型的
Message
是不會被Consumer
消費。第二階段:如果半事務消息投遞成功,則會開始執行本地事務。
分爲如下三種
Case
:
本地事務執行成功:
會向
Broker
發送commit
消息,被commit
過後的Message
才能被Consumer
消費到。本地事務執行失敗:
會向
Broker
發送rollback
消息,Broker
則會將剛剛投遞的半事務消息刪除,從而保證上下游數據的一致性。如果
Producer
實例或者網絡出現了問題,Producer
沒能及時地將本地事務執行的結果通知Broker
。
Broker
會通過掃描發現某條Message
長時間處於半事務消息狀態。
Broker
會主動地向Producer
詢問此Message
對應的事務狀態。
值得注意的是:
RocketMQ
並不會無休止的的信息事務狀態回查,默認回查 15 次。如果 15 次回查還是無法得知事務狀態,
RocketMQ
默認回滾該消息。
RocketMQ 事務消息使用限制
事務消息不支持延時消息和批量消息。
事務性消息可能不止一次被檢查或消費,所以消費者端需要做好消費冪等。
事務消息的生產者
ID
不能與其他類型消息的生產者ID
共享。
- 與其他類型的消息不同,事務消息允許反向查詢、
MQ
服務器能通過它們的生產者ID
查詢到消費者。
RocketMQ 事務消息基本原理
採用2PC
兩階段設計。
將
Message
原本真實的Topic
和MessageQueue
進行備份。
- 放入到
PROPERTY_REAL_TOPIC
、PROPERTY_REAL_QUEUE_ID
中保存。將消息投遞到一個內部
Topic
中RMQ_SYS_TRANS_HALF_TOPIC
,該隊列專門存儲事務消息。所有的
Half Message
全部都寫入到queueId
爲 0 的MessageQueue
。因爲一個
Topic
下只有 1 個MessageQueue
:
- 這個
Topic
下的所有Message
就是全局有序的,它們會按照先來後到的順序被消費。如果本地事務執行成功進行
Commit
,則將RMQ_SYS_TRANS_HALF_TOPIC
隊列中的消息投遞到真實的Topic
中,供後續流程執行。
- 並刪除這條
Half Message
,但刪除也是假刪除,只是給Message
打上一個刪除的Tag
。如果本地事務執行失敗進行
rollback
,則直接刪除這條Half Message
,但刪除也是假刪除。如果本地事務遲遲沒有返回結果 (默認時間是 6s),則會觸發事務回查機制
執行回查之前需要校驗檢查次數是否到達了最大值(需要手動設置,沒有默認值)。
或者是當前
Half Message
存在是否超過了Message
保存的上限,即 3 天。如果滿足上面條件中的一種
Half Message
會被放進TRANS_CHECK_MAX_TIME_TOPIC
Topic 當中。一旦判定爲需要執行事務回查邏輯,那麼當前這條
Half Message
就算已經被消費了。在沒達到最大的校驗次數之前,都還需要將其投遞到事務隊列當中,以便下次重試時再次執行
Check
邏輯。如果回查成功則刪除投遞的
Half Message
。
源碼解讀
發送事務消息調用的是TransactionMQProducer
的sendMessageInTransaction
方法:
主要有以下幾個步驟:
獲取事務監聽器
TransactionListener
,如果獲取爲空或者本地事務執行器LocalTransactionExecuter
爲空將拋出異常。因爲需要通過
TransactionListener
或者LocalTransactionExecuter
來執行本地事務,所以不能爲空。在消息中設置
prepared
屬性,此時與普通消息(非事務消息)相比多了PROPERTY_TRANSACTION_PREPARED
屬性。調用
send
方法發送prepared
消息也就是half
消息,發送消息的流程與普通消息一致。根據消息的發送結果判斷:
如果發送成功執行本地事務,並返回本地事務執行結果狀態,如果返回的執行狀態結果爲空,將本地事務狀態設置爲
UNKNOW
。發送成功之外的其他情況,包括
FLUSH_DISK_TIMEOUT
刷盤超時、FLUSH_SLAVE_TIMEOUT
和SLAVE_NOT_AVAILABLE
從節點不可用三種情況。此時意味着
half
消息發送失敗,本地事務狀態置爲ROLLBACK_MESSAGE
回滾消息。調用
endTransaction
方法結束事務。
參考
《RocketMQ 技術內幕》
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/wdb2JP7n0TuOLnrIlr6SUQ