RabbitMQ 實現多系統間的分佈式事務,保證數據一致性

一、實驗環境

二、實驗目的

Rabbitmq 實現多系統間的分佈式事務,保證數據一致性

三、實驗方案

rabbitmq 作爲消息中間件

訂單中心和運單中心分別作爲消息的生產者和消息的消費者,通過 rabbitmq 傳遞消息

訂單中心作爲生產者,模擬用戶創建訂單,在本地持久化訂單信息,記錄消息的狀態信息,並將消息發送到 rabbitmq,同時開啓 confirm 機制,接收消息中間件 rabbitmq 的響應信息,更新本地消息發送狀態(定時任務輪訓消息狀態信息表,一定時間內未發送成功的數據將再次發起推送,保證 atlestonce.

運單中心作爲消費者,消費 rabbitmq 中的訂單信息,開啓 ack 確認機制,確保不遺漏訂單。並通過消息全局唯一 ID 保證數據的唯一性,不重複處理訂單。

四、實驗步驟

1、消息隊列

1.1 rabbitmq 安裝過程略過。。。。

1.2 創建訂單交換器:orderExchange

1.3 創建訂單隊列:orderQueue

1.4 綁定

2、數據庫準備 2.1 訂單表

2.2 消息發送狀態表

2.3 運單表

3、訂單中心 3.1 訂單中心分析

利用 Rabbitmq 發佈確認機制 (confirm),確保發送成功的數據能被通知到 做個定時任務輪訓發送失敗以及發送後未響應的訂單信息,重新發送。推薦:Java 進階視頻資源

3.2 編寫代碼

3.2.1 Springboot 整合 rabbitmq 和 mysql 數據庫

3.2.1.1 依賴如下:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mysql-->
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <optional>true</optional>
</dependency>
<!--jdbc-->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--fastjson-->
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.17</version>
</dependency>

3.2.1.2 配置文件內容:

server:
  port: 8080

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: root
    password: root123
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    #必須配置這個,生產者纔會確認回調
    publisher-confirm-type: correlated
    publisher-returns: true
    #重要,手動開啓消費者ACK,控制消息在MQ中的刪除、重發
    listener:
      simple:
        acknowledge-mode: MANUAL

3.2.2 訂單中心代碼

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.beans.Transient;

/**
 * @Author Lee
 * @Description 訂單中心
 * @Date 2020/1/30 16:57
 * @Version 1.0
 */
@Slf4j
@Service
public class OrderService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setup(){
        //消息發送完成後,則回調此方法,ack代表此方法是否發送成功
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //ack爲true,代表MQ已經準確收到消息
                if(!ack){
                    return;
                }

                try{
                    String sql = "update tb_msgstatus set status = 1 where msgid = ?";
                    int count = jdbcTemplate.update(sql,correlationData.getId());
                    if(count != 1){
                        log.warn("本地消息表狀態修改失敗");
                    }
                }catch (Exception e){
                    log.warn("本息消息表狀態修改異常",e);
                }
            }
        });
    }

    /**
     * 創建訂單信息
     * @param order 訂單信息
     * @throws Exception
     */
    public void createOrder(JSONObject order) throws Exception {
        //保存訂單信息
        saveOrder(order);

        //發送MQ消息,直接發送時不可靠,可能會失敗(發送後根據回執修改狀態表,定時任務掃表讀取失敗數據重新發送)
        sendMsg(order);
    }

    /**
     * 發送訂單信息至MQ
     * @param order 訂單信息
     */
    private void sendMsg(JSONObject order) {
        //發送消息到MQ,CorrelationData作用:當收到消息回執時會帶上這個參數
        rabbitTemplate.convertAndSend("orderExchange","",order.toJSONString(),new CorrelationData((String) order.get("orderid")));
    }

    /**
     * 保存訂單信息
     * @param order 訂單信息
     * @throws Exception
     */
    @Transient
    private void saveOrder(JSONObject order) throws Exception {
        String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (? , ? , ? , now())";

        //保存訂單信息
        int count = jdbcTemplate.update(sql,order.get("orderid"),order.get("userid"),order.get("goodsid"));
        if(count != 1){
            throw new Exception("訂單創建失敗");
        }

        //保存消息發送狀態
        saveLocalMsg(order);
    }

    /**
     * 記錄消息發送狀態
     * @param order 訂單信息
     * @throws Exception
     */
    private void saveLocalMsg(JSONObject order) throws Exception {
        String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (? , ? , 0 , now())";

        //記錄消息發送狀態
        int count = jdbcTemplate.update(sql,order.get("orderid"),order.toJSONString());
        if(count != 1){
            throw new Exception("記錄消息發送狀態失敗");
        }
    }
}

3.3 訂單中心測試

3.3.1 測試代碼

@Autowired
private OrderService orderService;

@Test
public void orderServiceTest() throws Exception {
    //生成訂單信息
    JSONObject orderinfo = new JSONObject();
    orderinfo.put("orderid",UUID.randomUUID().toString());
    orderinfo.put("userid",UUID.randomUUID().toString());
    orderinfo.put("goodsid",UUID.randomUUID().toString());
    orderService.createOrder(orderinfo);
}

3.3.2 測試驗證結果

orderQueue 消息隊列中已經接收到數據

訂單表裏的數據

狀態表數據:

4、運單中心 4.1 運單中心分析

消費者收到消息進行處理,處理成功則發送 ACK 消息通知 MQ 清除該條記錄,否則通知 MQ 重發或者等待 MQ 自動重發。本地維護一個處理次數,如果多次處理仍然失敗,則將該消息丟棄或者加入到死信隊列(DLQ)中。死信隊列中的數據可以人工干預。推薦:Java 進階視頻資源

4.2 編寫代碼

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import java.beans.Transient;
import java.io.IOException;


/**
 * @Author Lee
 * @Description 運單系統
 * @Date 2020/1/30 21:58
 * @Version 1.0
 */
@Slf4j
@Service
public class DispatchService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitListener(queues = "orderQueue")
    public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try{
            //MQ裏面的數據轉換成JSON數據
            JSONObject orderInfo = JSONObject.parseObject(message);
            log.warn("收到MQ裏面的消息:" + orderInfo.toJSONString());
            Thread.sleep(1000L);

            //執行業務操作,同一個數據不能處理兩次,根據業務情況去重,保證冪等性
            String orderid = orderInfo.getString("orderid");
            //分配快遞員配送
            dispatch(orderid);

            //ack 通知MQ數據已經收到
            channel.basicAck(tag,false);
        }catch (Exception e){
            //異常情況,需要根據需求去重發或者丟棄
            //重發一定次數後丟棄,日誌告警(rabbitmq沒有設置重發次數功能,重發時需要代碼實現,比如使用redis記錄重發次數,)
            channel.basicNack(tag,false,false);
            //系統關鍵數據異常,需要人工干預
        }
        //如果不給確認回覆,就等這個consumer斷開連接後,MQ會繼續推送
    }

    /**
     * 分配快遞員
     * @param orderid 訂單編號
     */
    @Transient
    private void dispatch(String orderid) throws Exception {
        String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
        int count = jdbcTemplate.update(sql,orderid,"東哥","配送中");
        if(count != 1){
            throw new Exception("調度數據插入失敗,原因[數據庫操作]");
        }
    }
}

4.3 訂單中心測試

啓動 springboot 後自動監聽 MQ 中的消息隊列,自動處理

測試結果如下:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eKc2WlnAUm4ZX0qzDqXkPA