RabbitMQ 實現多系統間的分佈式事務,保證數據一致性
一、實驗環境
-
Lunix 系統:Centos7.5
-
安裝軟件:rabbitmq
-
開發工具:IDEA
二、實驗目的
Rabbitmq 實現多系統間的分佈式事務,保證數據一致性
三、實驗方案
rabbitmq 作爲消息中間件
訂單中心和運單中心分別作爲消息的生產者和消息的消費者,通過 rabbitmq 傳遞消息
訂單中心作爲生產者,模擬用戶創建訂單,在本地持久化訂單信息,記錄消息的狀態信息,並將消息發送到 rabbitmq,同時開啓 confirm 機制,接收消息中間件 rabbitmq 的響應信息,更新本地消息發送狀態(定時任務輪訓消息狀態信息表,一定時間內未發送成功的數據將再次發起推送,保證 atlestonce.
運單中心作爲消費者,消費 rabbitmq 中的訂單信息,開啓 ack 確認機制,確保不遺漏訂單。並通過消息全局唯一 ID 保證數據的唯一性,不重複處理訂單。
四、實驗步驟
1、消息隊列
1.1 rabbitmq 安裝過程略過。。。。
1.2 創建訂單交換器:orderExchange
1.3 創建訂單隊列:orderQueue
1.4 綁定
2、數據庫準備 2.1 訂單表
2.2 消息發送狀態表
2.3 運單表
3、訂單中心 3.1 訂單中心分析
利用 Rabbitmq 發佈確認機制 (confirm),確保發送成功的數據能被通知到 做個定時任務輪訓發送失敗以及發送後未響應的訂單信息,重新發送。推薦:Java 進階視頻資源
3.2 編寫代碼
3.2.1 Springboot 整合 rabbitmq 和 mysql 數據庫
3.2.1.1 依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>
3.2.1.2 配置文件內容:
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: root123
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
#必須配置這個,生產者纔會確認回調
publisher-confirm-type: correlated
publisher-returns: true
#重要,手動開啓消費者ACK,控制消息在MQ中的刪除、重發
listener:
simple:
acknowledge-mode: MANUAL
3.2.2 訂單中心代碼
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.beans.Transient;
/**
* @Author Lee
* @Description 訂單中心
* @Date 2020/1/30 16:57
* @Version 1.0
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setup(){
//消息發送完成後,則回調此方法,ack代表此方法是否發送成功
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//ack爲true,代表MQ已經準確收到消息
if(!ack){
return;
}
try{
String sql = "update tb_msgstatus set status = 1 where msgid = ?";
int count = jdbcTemplate.update(sql,correlationData.getId());
if(count != 1){
log.warn("本地消息表狀態修改失敗");
}
}catch (Exception e){
log.warn("本息消息表狀態修改異常",e);
}
}
});
}
/**
* 創建訂單信息
* @param order 訂單信息
* @throws Exception
*/
public void createOrder(JSONObject order) throws Exception {
//保存訂單信息
saveOrder(order);
//發送MQ消息,直接發送時不可靠,可能會失敗(發送後根據回執修改狀態表,定時任務掃表讀取失敗數據重新發送)
sendMsg(order);
}
/**
* 發送訂單信息至MQ
* @param order 訂單信息
*/
private void sendMsg(JSONObject order) {
//發送消息到MQ,CorrelationData作用:當收到消息回執時會帶上這個參數
rabbitTemplate.convertAndSend("orderExchange","",order.toJSONString(),new CorrelationData((String) order.get("orderid")));
}
/**
* 保存訂單信息
* @param order 訂單信息
* @throws Exception
*/
@Transient
private void saveOrder(JSONObject order) throws Exception {
String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (? , ? , ? , now())";
//保存訂單信息
int count = jdbcTemplate.update(sql,order.get("orderid"),order.get("userid"),order.get("goodsid"));
if(count != 1){
throw new Exception("訂單創建失敗");
}
//保存消息發送狀態
saveLocalMsg(order);
}
/**
* 記錄消息發送狀態
* @param order 訂單信息
* @throws Exception
*/
private void saveLocalMsg(JSONObject order) throws Exception {
String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (? , ? , 0 , now())";
//記錄消息發送狀態
int count = jdbcTemplate.update(sql,order.get("orderid"),order.toJSONString());
if(count != 1){
throw new Exception("記錄消息發送狀態失敗");
}
}
}
3.3 訂單中心測試
3.3.1 測試代碼
@Autowired
private OrderService orderService;
@Test
public void orderServiceTest() throws Exception {
//生成訂單信息
JSONObject orderinfo = new JSONObject();
orderinfo.put("orderid",UUID.randomUUID().toString());
orderinfo.put("userid",UUID.randomUUID().toString());
orderinfo.put("goodsid",UUID.randomUUID().toString());
orderService.createOrder(orderinfo);
}
3.3.2 測試驗證結果
orderQueue 消息隊列中已經接收到數據
訂單表裏的數據
狀態表數據:
4、運單中心 4.1 運單中心分析
消費者收到消息進行處理,處理成功則發送 ACK 消息通知 MQ 清除該條記錄,否則通知 MQ 重發或者等待 MQ 自動重發。本地維護一個處理次數,如果多次處理仍然失敗,則將該消息丟棄或者加入到死信隊列(DLQ)中。死信隊列中的數據可以人工干預。推薦:Java 進階視頻資源
4.2 編寫代碼
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.beans.Transient;
import java.io.IOException;
/**
* @Author Lee
* @Description 運單系統
* @Date 2020/1/30 21:58
* @Version 1.0
*/
@Slf4j
@Service
public class DispatchService {
@Autowired
private JdbcTemplate jdbcTemplate;
@RabbitListener(queues = "orderQueue")
public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
//MQ裏面的數據轉換成JSON數據
JSONObject orderInfo = JSONObject.parseObject(message);
log.warn("收到MQ裏面的消息:" + orderInfo.toJSONString());
Thread.sleep(1000L);
//執行業務操作,同一個數據不能處理兩次,根據業務情況去重,保證冪等性
String orderid = orderInfo.getString("orderid");
//分配快遞員配送
dispatch(orderid);
//ack 通知MQ數據已經收到
channel.basicAck(tag,false);
}catch (Exception e){
//異常情況,需要根據需求去重發或者丟棄
//重發一定次數後丟棄,日誌告警(rabbitmq沒有設置重發次數功能,重發時需要代碼實現,比如使用redis記錄重發次數,)
channel.basicNack(tag,false,false);
//系統關鍵數據異常,需要人工干預
}
//如果不給確認回覆,就等這個consumer斷開連接後,MQ會繼續推送
}
/**
* 分配快遞員
* @param orderid 訂單編號
*/
@Transient
private void dispatch(String orderid) throws Exception {
String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
int count = jdbcTemplate.update(sql,orderid,"東哥","配送中");
if(count != 1){
throw new Exception("調度數據插入失敗,原因[數據庫操作]");
}
}
}
4.3 訂單中心測試
啓動 springboot 後自動監聽 MQ 中的消息隊列,自動處理
測試結果如下:
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/eKc2WlnAUm4ZX0qzDqXkPA