POSTGRESQL 事務控制 -一- -寫着費力,看着費勁系列-

最近發現一個問題, 最近寫的關於感性的文字如 DBA 職業迷茫何去何從, 和另外一篇都是較高的用戶讀取量, 而反觀到技術性的文字, 基本上都不太高, 能到 400 以上就屬於 "上帝幫助" 了

原因我是明白的, 大衆化的東西受衆必然很多, 反而純技術性的文字實撰寫困難, 首先自己要理解, 然後在轉化. 很難摻雜個人感情, 耗費精力大. 縱然如此個人還是喜歡去搞技術性的東西, 主要有兩點  1 要靠這個喫飯, 2 個人興趣. 

這邊準備一個深入系列, 其實這也是費力不討好, 自己寫的費勁, 大家看的也比較費勁, 估計閱讀數也不會太高。


本期主要從事務入手, 從 POSTGRESQL 的事務的原理由淺入深的開始搞搞更深入的東西, 這一個系列注重原理, 源碼, 少操作.

PostgreSQL 的事務的形成和處理是通過 Transaction Block 塊, 這個 Transaction Block 塊中會包含, 一條 SQL , 或者 N 條 SQL . 

下面是 postgresql 的在事務處理中的事務可能處於的狀態, (爲後面和代碼連接做準備)

事務處理的分爲 begin commit  rollback 三個過程, 這裏分別有幾個函數來代表功能的完成, 在事務處理中, 主要分爲上層, 中層, 下層三種函數

上層

1  BeginTransacionBlock

2  EndTransactionBlock

3  UserAbortTransactionBlock

4  DefineSavepoint

5  RollbackToSavepoint

6  ReleaseSavepoint

中層

1 StartTransactionCommand

2 CommitTransactionCommand

3 AbortTransactionCommand

底層

1 StartTransaction

2 CommitTransation

3 AbortTransaction

4 Cleanup Transaction

5 StartSubTransaction

6 CommitSubTransaction

7 AbortSub Transaction

8 CleanupsSub Transaction

下面通過一個事務的實例來看上面的函數和狀態如何應用到事務的處理當中

typedef struct TransactionStateData

{

    TransactionId transactionId;    

    SubTransactionId subTransactionId;     

    char       *name;        

    int         savepointLevel; 

    TransState  state;         

    TBlockState blockState;     

    int         nestingLevel; 

    int         gucNestLevel;   

   MemoryContext curTransactionContext;   

    ResourceOwner curTransactionOwner;  

    TransactionId *childXids;   

    int         nChildXids;    

    int         maxChildXids;   

    Oid         prevUser;      

    int         prevSecContext; 

    bool        prevXactReadOnly;   

    bool        startedInRecovery; 

    bool        didLogXid;     

    int         parallelModeLevel;   

    struct TransactionStateData *parent;    

} TransactionStateData;

以一個最簡單得事務, 會使用如下的流程和相關的函數.

看上圖, 在事務的開始會在 typedef struct TransactionStateData 結構體內修改 transState  狀態, 默認值爲 trans_default,  在執行了 Begin 後, 會開始獲取 transactionID, 然後開始變化結構體的狀態, 將狀態變爲 Trans_start, 然後馬上執行語句, 分配 transactionID, 在將事務的狀態變爲 Trans_inprogress,

在事務運行完畢, 並提交是將事務的狀態轉爲 trans_abort.   

在期間調用 heap_insert 函數, 將數據插入到數據頁面中. 

下圖是證明產生事務後, 也不見得產生事務 ID, 只要整體的事務中沒有任何的 DML 操作, Insert 操作, 則是不會分配事務 ID 的. 

txid_current_if_assigned()

上面的只是非常簡單的事務, 而複雜的事務, 都會包含 子事務, 以及一些回滾點, 如在事務中加載了 save point . 則分配事務 SubTransactionId subTransactionId;  

上面這段代碼的就是爲事務, 以及子事務分配事務號的

if (isSubXact && !TransactionIdIsValid(s->parent->transactionId)

如果是子事務, 並且結構體中中沒有父的事務 ID  則設置初始值

則根據結構體中的 nestingLevel 的級別來分配數組, 

parents = palloc(sizeof(TransactionState) * s->nestingLevel);

然後通過循環得方式, 爲每一個子事務來分配事務 ID

while (p != NULL && !TransactionIdIsValid(p->transactionId))
        {
            parents[parentOffset++] = p;
            p = p->parent;
        }

PG 獲取事務 ID 主要是通過無符號整型事務 ID 的計數器來分配事務 ID ,ID 是一個 32 位的整型遞增的趨勢, 通過

1  結構體

2  緩存計數器

3  分配函數

三個部分組成事務 ID 的分配的任務.

同時由於 FREEZEING 的問題, 在分配事務 ID 的時候還要進行相關的判斷, 判斷當前的分配得事務號, 是否已經到了警戒線.

下面是這段分配事務 ID 的代碼, 及個人理解註解

/*

TransactionId

GetNewTransactionId(bool isSubXact)

{

    TransactionId xid;

    if (IsInParallelMode())

        elog(ERROR, "cannot assign TransactionIds during a parallel operation");

#如果在並行模式, 則不會進行分配事務 ID

    if (IsBootstrapProcessingMode())

    {

        Assert(!isSubXact);

        MyPgXact->xid = BootstrapTransactionId;

        return BootstrapTransactionId;//--> 1

    }

**    #事務初始化之初先進行事務的 ID 的初**始化

    if (RecoveryInProgress())

        elog(ERROR, "cannot assign TransactionIds during recovery");

    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

#如果事務在運行的模式, 則不能分配事務 ID, 否則獲取 LW 鎖, 準備進行事務 ID 的分配

    xid = ShmemVariableCache->nextXid;

# 這裏通過內存中的共享結構分配一個事務 ID

#此時爲了防止 PG 數據庫事務 ID 回捲, 需要開始對分配事務 ID 與現存最大的事務 ID 進行比較

#這裏有三個設置,

#1 xidVacLimit

#2 xidWarnLimit

#3 xidStopLimit

#三個值分別代表, 觸發 xidVacLimit , 系統則自動開始進行 autovacuum 的操作

#xidwarnLimit 則系統開始發出警告

#xidStopLimit 則系統開始停止工作, 進入單用戶模式

    if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))

    {

**        TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;**

**        TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;**

**        TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;**

        Oid         oldest_datoid = ShmemVariableCache->oldestXidDB;

        LWLockRelease(XidGenLock);

#噹噹前的事務 ID 除以 65536 餘數爲 0 則觸發 autovacuum 的機制

        if (IsUnderPostmaster && (xid % 65536) == 0)

            SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

#如果觸發了 xidStgopLimit

        if (IsUnderPostmaster &&

            TransactionIdFollowsOrEquals(xid, xidStopLimit))

        {

        #獲取這個數據庫的 oid 並轉換爲數據庫名, 然後就開始瘋狂的發送告警了

            char       *oldest_datname = get_database_name(oldest_datoid);

            if (oldest_datname)

                ereport(ERROR,

                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

                         errmsg("database is not accepting commands to avoid wraparound data loss in database "%s"",

                                oldest_datname),

                         errhint("Stop the postmaster and vacuum that database in single-user mode.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

            else

                ereport(ERROR,

                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

                         errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",

                                oldest_datoid),

                         errhint("Stop the postmaster and vacuum that database in single-user mode.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

        }

        else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))

        {

            char       *oldest_datname = get_database_name(oldest_datoid);

            if (oldest_datname)

                ereport(WARNING,

                        (errmsg("database "%s"must be vacuumed within %u transactions",

                                oldest_datname,

                                xidWrapLimit - xid),

                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

            else

                ereport(WARNING,

                        (errmsg("database with OID %u must be vacuumed within %u transactions",

                                oldest_datoid,

                                xidWrapLimit - xid),

                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"

                                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));

        }

        /* Re-acquire lock and start over */

  # 在每一次獲取事務 ID 都會觸發一次報警, 警告信息在上面完成

LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

        xid = ShmemVariableCache->nextXid;

    }

    #相關的事務 ID 需要在 CommitTs, WAL LOG 中進行更新

    ExtendCLOG(xid);

    ExtendCommitTs(xid);

    ExtendSUBTRANS(xid);

    #然後將 xid 在緩衝結構中更新爲下一次其他事務獲取 ID 做準備

    TransactionIdAdvance(ShmemVariableCache->nextXid);

#同時需要判斷是否是子事務

if (!isSubXact)

    #不是將就 XID 放入緩衝共享體中, 讓事務開始對其他事務的可見性起作用

        MyPgXact->xid = xid;    

    else               

#如果不是則就是子事務, 那麼就需要進行循環, 爲每一個子事務, 在這個事務上 + 1

    {

        int         nxids = MyPgXact->nxids;

        if (nxids < PGPROC_MAX_CACHED_SUBXIDS)

        {

            MyProc->subxids.xids[nxids] = xid;

            pg_write_barrier();

            MyPgXact->nxids = nxids + 1;

        }

        else

            MyPgXact->overflowed = true;

    }

    LWLockRelease(XidGenLock);

    return xid;

#此時一個整體的事務分配才完成

以下是 shmemVariableCache 的緩存結構

typedef struct VariableCacheData

{

    Oid         nextOid;        

    uint32      oidCount;      

    TransactionId nextXid;     

    TransactionId oldestXid;    #當前最小的 xid

    TransactionId xidVacLimit;  #存儲觸發 autovacuum 的 xid 預設值

    TransactionId xidWarnLimit; #告警的 xid 預設值

    TransactionId xidStopLimit; #設置停止工作的 xid 預設值

   TransactionId xidWrapLimit;  #設置整體系統凍結時的 XID 預設值

    Oid         oldestXidDB;    #當前擁有最老的事務的數據庫 OID

    TransactionId oldestCommitTsXid;  最老的 commit id

    TransactionId newestCommitTsXid;  最新的 commit xid

    TransactionId latestCompletedXid;   

    TransactionId oldestClogXid;    /* oldest it's safe to look up in clog */

} VariableCacheData;

typedef VariableCacheData *VariableCache;

VariableCache ShmemVariableCache = NULL;

通過閱讀代碼可以今天可以瞭解如下

1  數據庫 freeze 的觸發機制是在分配事務 ID 時觸發的

2  數據庫從強制 autovacuum 到 FREEZE 是有一個過程的, 相關的的警告信息會不斷在這期間進行發送

3  觸發事務 autovacuum 的機制          if (IsUnderPostmaster && (xid % 65536) == 0)

**4   儘量不要建立太多的子事務, 原因從分配事務 ID 也可以看出來, save point 的功能怎麼使用心理的有點數
**

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/HFDuWUawjoyVqjIuheefJA