探索 MySQL Binlog 的奧祕與應用

在 MySQL 的廣袤世界中,有一個至關重要的存在,它宛如數據庫運行軌跡的忠實記錄者,默默見證着每一次數據的變更與操作,它就是 binlog。Binlog 如同一個神祕而強大的寶庫,承載着數據庫操作的關鍵信息,爲數據的恢復、複製以及系統的穩定性提供着堅實的支撐。當我們深入探索 MySQL 的奧祕時,binlog 無疑是其中閃耀着獨特光芒的關鍵一環

Hi,我是 sharkChili ,是個不斷在硬核技術上作死的技術人,是 CSDN 的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C 源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究並輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公衆號: 寫代碼的 SharkChili 。

同時也非常歡迎你 star 我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis

因爲近期收到很多讀者的私信,所以也專門創建了一個交流羣,感興趣的讀者可以通過上方的公衆號獲取筆者的聯繫方式完成好友添加,點擊備註  “加羣”  即可和筆者和筆者的朋友們進行深入交流。

詳解 MySQL 中的 binlog

bin log 是什麼?作用是什麼呢?

bin log實際上是一個物理日誌,當我們對某個數據頁進行修改操作時我們就會將這個操作寫到bin log中,當我們數據庫需要進行主備、主從複製等操作時,都可以基於bin log保證數據一致性。

bin log 緩衝區

bin log緩衝區和我們的redo logundo log緩衝區有所不同,redo logundo log緩存都在存儲引擎的共享緩衝區緩衝區buffer pool中,而bin log則是爲每個工作線程獨立分配一個內存作爲bin log緩衝區:

bin log之所以是在每個線程中,是爲保證不同存儲引擎的兼容性,bin loginnodb獨有的,如果將bin log放到共享緩衝區時很可能導致兼容性問題,將bin log緩衝區設置爲每個線程獨享也保證了事務併發的安全性。

bin.log 對應的 3 種記錄格式

row:這種格式主要用於保證數據實時性的,例如我們執行下面這段SQL

update table set time=now() where id=1;

如果我們將其存到bin log之後很長一段時間才提交事務,那麼時間就會有所延遲,所以MySQL爲了保證數據實時性,就會將寫入bin log中的SQLrow格式,如下圖所示,可以看到row格式的SQL語句時間是當前時間的具體值,並且where條件寫死了當前條件列,確保數據實時一致性:

當然這樣做的缺點也很明顯,如果涉及大批量操作,那麼針對每條數據對應的都會生成對應的 row 語句,那麼對於內存的佔用就很高,進行恢復和同步時的 IO 和 SQL 執行時間也是非常不友好的。

stament:這種同步策略即執行的 SQL 是什麼,對應傳輸過去的時對應的語句就是什麼樣的,這就會導致我們上文所說的一致性問題:

mixed:這種格式就是爲了上述兩種方案的混合體,如果操作可能出現數據不一致問題則用row格式,反之使用stament格式。

bin log 文件日誌格式

我們可以通過下面這條SQL語句看到我們本地的bin log文件:

show binary logs;

輸出結果如下所示,可以看到 bin log 的格式基本都是mysql-bin.0000xxx:

mysql-bin.001606 440052 No
mysql-bin.001607 111520 No

bin log 是如何完成寫入

當我們開始事務時,將修改寫入bin log cache中,一旦事務提交,就會將bin log通過write寫入到文件系統緩存的page cache中,然後根據我們配置的刷盤參數將cache內容調用操作系統內核方法fsync將結果寫入到bin log 物理文件中:

而調用系統函數fsync的實際是根據MySQL系統參數決定的,這個系統變量查詢SQL如下

SHOW VARIABLES LIKE 'sync_binlog';

sync_binlog值分別三種:

  1. 當配置爲了 0 時,每次事務提交都只會 write,fsync 調用時機是由系統決定的。 1. 當配置設置爲 1 時,每次事務提交都會調用 fsync。 N. 當配置爲 N,代表提交了 N 個事務之後就會將 page cache 中的數據通過 fsync 進行刷盤。

bin log 和 redo log 的區別

這個問題我們可以從以下幾個場景來表述一下:

從使用場景來說:bin log常用於數據災備或數據同步到其他異構程序中的場景。redo log常用於故障恢復保證數據持久性。

從數據內容來說:redo log存儲的物理日誌,即修改的數據內容,對應的 redo block 結構體針對各種偏移量和修改涉及的頁都有及其複雜的涉及,這裏就不多做贅述。 而bin log則是記錄可以是statment語句也可以是原生修改的row,具體可以通過查看binlog_format知曉。

生成範圍:bin logMySQL server生成的事務日誌,任何存儲引擎都可以使用redo log只有innodb這個存儲引擎支持。

接下來我們就基於spring boot演示一下如何基於flink cdc訂閱bin.log完成 db 庫中的tb_1tb_2的數據訂閱和同步:

之所以筆者使用flink cdc而不是canel大體有以下幾個原因:

  1. flink cdc 支持全量和增量同步以及斷點續傳等功能,尤其是斷點續傳這一點對於需要保證異構數據庫的數據一致性是非常好的。

  2. 性能表現更出色,按照阿里雲的說法:

我們將全增量一體化框架與 Debezium 1.6 版本做 簡單的 TPC-DS 讀取測試對比,customer 單表數據量 6500 萬,在 Flink CDC 用 8 個併發的情況下,吞吐提升了 6.8 倍,耗時僅 13 分鐘,得益於併發讀取的支持,如果用戶需要更快的讀取速度,用戶可以增加併發實現。

話不說我們給出基礎的集成步驟,首先是引入 flink cdc 和 MySQL 的依賴,這裏筆者爲了文章的簡練只給出的 flink cdc 相關的 pom 依賴:

 <properties>
        <flink.version>1.13.6</flink.version>
    </properties>


<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--mysql -cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>

然後我們在 yml 或者 properties 文件中給出 MySQL 配置即可,然後我們聲明一個 CdcInfo 記錄從 bin.log 中同步的數據:

@Data
publicclass CdcInfo {
    /**
     * 變更前數據
     */
    private JSONObject beforeData;
    /**
     * 變更後數據
     */
    private JSONObject afterData;

    private String operation;
    /**
     * binlog 文件名
     */
    private String binLogName;
    /**
     * binlog當前讀取點位
     */
    private Integer filePos;
    /**
     * 數據庫名
     */
    private String dbName;
    /**
     * 表名
     */
    private String tbName;
    /**
     * 變更時間
     */
    private Long changeTime;

}

然後我們編寫一個關於bin.log通知事件的監聽,針對 flink cdc 配置筆者都基於 CommandLineRunner 這個拓展點完成配置,這裏面涉及衆多的flink cdc配置參數,可以看到筆者的程序同步模式配置的是 initial 即啓動後會進行全量同步再進行增量同步,同時通過表達式 db.tb_[1-2]+ 指明僅僅處理 tb_1 和 tb_2 表的數據更新變化。

@Component
publicclass MysqlCdcEventListener implements CommandLineRunner {

    //數據接收器用於應用架構更改和將更改數據寫入外部系統
    privatefinal CdcSink cdcSink;

    public MysqlCdcEventListener(CdcSink cdcSink) {
        this.cdcSink = cdcSink;
    }


    @Override
    public void run(String... args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置並行度
        env.setParallelism(Runtime.getRuntime().availableProcessors());


        DebeziumSourceFunction<CdcInfo> debeziumSource = buildDebeziumSource();


        DataStream<CdcInfo> streamSource = env
                .addSource(debeziumSource, "mysql-source")
                .setParallelism(1);
        //將流數據交給
        streamSource.addSink(cdcSink);
        env.execute("mysql-stream-cdc");
    }


    /**
     * 構造變更數據源
     */
    private DebeziumSourceFunction<CdcInfo> buildDebeziumSource() {

        Properties debeziumProperties = new Properties();
        //設置快照爲無鎖
        debeziumProperties.put("snapshot.locking.mode""none");

        return MySqlSource.<CdcInfo>builder()
                .hostname("xxxx")
                .port(3306)
                .databaseList("db")
                //監聽db庫中的[1-2]表
                .tableList("db.tb_[1-2]+")
                .username("xxxx")
                .password("xxxx")
                //設置爲 initial:在第一次啓動時對受監視的數據庫表執行初始快照,並繼續讀取最新的 binlog
                .startupOptions(StartupOptions.initial())
                //設置序列化配置
                .deserializer(new MysqlDeserialization())
                .serverTimeZone("GMT+8")
                .debeziumProperties(debeziumProperties)
                .build();
    }


}

關於這些配置的信息建議讀者移步官方文檔的說明:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/

相應的使用配置示例讀者也可以參考 flink cdc 對應的 GitHub 上的說明:https://github.com/gunnarmorling/flink-cdc-connectors

以及一些配置參數上的建議,也建議讀者參考這篇文章:https://blog.csdn.net/weixin_43753599/article/details/144567006

上文代碼示例中給出一個涉及反序列化生產 CdcInfo 的操作,筆者指明瞭 MysqlDeserialization 這裏也給出對應的源碼示例:

public class MysqlDeserialization implements DebeziumDeserializationSchema<CdcInfo> {

    publicstaticfinal String TS_MS = "ts_ms";
    publicstaticfinal String BIN_FILE = "file";
    publicstaticfinal String POS = "pos";
    publicstaticfinal String CREATE = "CREATE";
    publicstaticfinal String BEFORE = "before";
    publicstaticfinal String AFTER = "after";
    publicstaticfinal String SOURCE = "source";
    publicstaticfinal String UPDATE = "UPDATE";


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<CdcInfo> collector) {
        //獲取bin.log訂閱到的信息
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct struct = (Struct) sourceRecord.value();
        final Struct source = struct.getStruct(SOURCE);
        CdcInfo tbCdcInfo = new CdcInfo();
        //獲取前後變化數據
        tbCdcInfo.setBeforeData(convert2JsonObj(struct, BEFORE));
        tbCdcInfo.setAfterData(convert2JsonObj(struct, AFTER));
        //5.獲取操作類型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toUpperCase();

        tbCdcInfo.setOperation(type);
        tbCdcInfo.setBinLogName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
        tbCdcInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
        tbCdcInfo.setDbName(database);
        tbCdcInfo.setTbName(tableName);
        tbCdcInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
        //7.輸出數據
        collector.collect(tbCdcInfo);
    }

    /**
     * 從原始數據獲取出變更之前或之後的數據
     */
    private JSONObject convert2JsonObj(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }


    @Override
    public TypeInformation<CdcInfo> getProducedType() {
        return TypeInformation.of(CdcInfo.class);
    }
}

此時我們啓動程序後針對數據表進行修改操作就會收到數據消息的訂閱了:

訂閱到的數據:CdcInfo(beforeData={"id":1,"name":"xiaoming"}, afterData={"id":1,"name":"xiaoming1"}, operation=UPDATE, binLogName=binlog.000156, filePos=1256, dbName=db, tbName=tb_2, changeTime=1734622269654)

小結

MySQL Binlog(二進制日誌)作爲 MySQL 數據庫中至關重要的組成部分,蘊含着衆多奧祕且具備豐富多樣的應用場景。 從原理層面來看,Binlog 以二進制的形式記錄了數據庫中數據變更的相關事件,如 INSERT、UPDATE、DELETE 等操作。它採用順序追加的方式寫入,這種特性不僅保證了日誌記錄的完整性和連續性,還爲後續的恢復和複製提供了堅實基礎。不同的日誌格式(STATEMENT、ROW、MIXED)各有優劣,開發人員和數據庫管理員可以根據實際需求進行靈活選擇,以平衡數據一致性、性能和存儲空間等多方面因素。 在應用領域,Binlog 展現出了巨大的價值。在數據恢復場景中,基於全量備份結合 Binlog 可以實現精準的時間點恢復(PITR),確保在面對數據丟失或損壞時,能夠將數據庫還原到指定的歷史時刻,最大程度減少數據損失。在主從複製方面,主庫將 Binlog 發送給從庫,從庫通過重放這些日誌來同步數據,從而實現數據的多副本存儲和讀寫分離,提升系統的可用性和性能。此外,Binlog 還在數據遷移、數據審計以及實時數據處理等領域發揮着重要作用。例如,通過解析 Binlog 可以獲取數據的實時變化,將這些變化推送至其他系統進行進一步處理,實現系統間的數據同步和業務邏輯的聯動。 然而,在使用 Binlog 的過程中,也需要關注一些問題。例如,Binlog 的記錄會佔用一定的磁盤空間,需要合理規劃存儲空間和清理策略;同時,在進行主從複製時,Binlog 的傳輸和重放可能會受到網絡延遲、服務器性能等因素的影響,導致數據同步延遲或出現錯誤,這就需要建立有效的監控和故障處理機制。 總之,深入理解 MySQL Binlog 的奧祕,併合理運用其各項特性,對於保障數據庫的高可用性、數據一致性以及實現多樣化的業務需求都具有重要意義。無論是數據庫管理員進行日常運維管理,還是開發人員設計架構和開發應用程序,都應該充分認識到 Binlog 的價值,並謹慎處理與之相關的各種問題。

我是 sharkchili ,CSDN Java 領域博客專家mini-redis 的作者,我想寫一些有意思的東西,希望對你有幫助,如果你想實時收到我寫的硬核的文章也歡迎你關注我的公衆號: 寫代碼的 SharkChili 。

同時也非常歡迎你 star 我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis

參考

Flink CDC 全量和增量同步數據如何保證數據的一致性:https://developer.aliyun.com/ask/588669

數據同步工具之 FlinkCDC/Canal/Debezium 對比:https://cloud.tencent.com/developer/article/1893807

springboot 集成 flink-cdc:https://blog.csdn.net/leilei1366615/article/details/126528452

SpringBoot 集成 Flink-CDC,實現對數據庫數據的監聽 :https://blog.csdn.net/qq_43479493/article/details/137226875

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nysZiQYWUwYskhdEtmDiUw