Fix oldest xmin in logical decoding.

Offering: openGaussDev

More detail: Fix oldest xmin in logical decoding.

Match-id-d652068e86f96a843c7212f959f72c855635a2ed
This commit is contained in:
openGaussDev 2022-02-28 16:54:59 +08:00 committed by yanghao
parent b3ba5e5db9
commit d45634be91
6 changed files with 40 additions and 2 deletions

View File

@ -9237,6 +9237,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam
InvalidTransactionTimeline, ss_need_sync_wait_all);
/* quickly set my recent global xmin */
u_sess->utils_cxt.RecentGlobalXmin = GetOldestXmin(NULL, true);
u_sess->utils_cxt.RecentGlobalCatalogXmin = GetOldestCatalogXmin();
}
}
/* check gtm mode, remote should be false, local cannot be true */

View File

@ -442,6 +442,7 @@ static void knl_u_utils_init(knl_u_utils_context* utils_cxt)
utils_cxt->RecentDataXmin = FirstNormalTransactionId;
utils_cxt->RecentGlobalXmin = InvalidTransactionId;
utils_cxt->RecentGlobalDataXmin = InvalidTransactionId;
utils_cxt->RecentGlobalCatalogXmin = InvalidTransactionId;
utils_cxt->cn_xc_maintain_mode = false;
utils_cxt->snapshot_source = SNAPSHOT_UNDEFINED;

View File

@ -2373,6 +2373,15 @@ bool heap_fetch(
return false;
}
static TransactionId inline GetOldestXminForHot(Relation relation)
{
if (IsCatalogRelation(relation) || RelationIsAccessibleInLogicalDecoding(relation)) {
return u_sess->utils_cxt.RecentGlobalCatalogXmin;
} else {
return u_sess->utils_cxt.RecentGlobalXmin;
}
}
/*
* heap_hot_search_buffer - search HOT chain for tuple satisfying snapshot
*
@ -2403,6 +2412,7 @@ bool heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, S
bool at_chain_start = false;
bool valid = false;
bool skip = false;
TransactionId oldestXmin;
/* If this is not the first call, previous call returned a (live!) tuple */
if (all_dead != NULL) {
@ -2411,6 +2421,8 @@ bool heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, S
Assert(TransactionIdIsValid(u_sess->utils_cxt.RecentGlobalXmin));
oldestXmin = GetOldestXminForHot(relation);
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = first_call;
@ -2533,7 +2545,7 @@ bool heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, S
* request, check whether all chain members are dead to all
* transactions.
*/
if (all_dead && *all_dead && !HeapTupleIsSurelyDead(heap_tuple, u_sess->utils_cxt.RecentGlobalXmin)) {
if (all_dead && *all_dead && !HeapTupleIsSurelyDead(heap_tuple, oldestXmin)) {
*all_dead = false;
}

View File

@ -1627,6 +1627,21 @@ int GetMaxSnapshotSubxidCount(void)
return TOTAL_MAX_CACHED_SUBXIDS;
}
/*
* returns oldest transaction for catalog that was running when any current transaction was started.
* take replication slot into consideration. please make sure it's safe to read replication_slot_catalog_xmin
* before calling this func.
*/
TransactionId GetOldestCatalogXmin()
{
TransactionId res = u_sess->utils_cxt.RecentGlobalXmin;
TransactionId repSlotCatalogXmin = g_instance.proc_array_idx->replication_slot_catalog_xmin;
if (TransactionIdIsNormal(repSlotCatalogXmin) && TransactionIdPrecedes(repSlotCatalogXmin, res)) {
return repSlotCatalogXmin;
}
return res;
}
/*
* GetSnapshotData -- returns information about running transactions.
*
@ -1720,7 +1735,9 @@ RETRY:
goto RETRY;
}
u_sess->utils_cxt.RecentGlobalXmin = GetOldestXmin(NULL, true);
u_sess->utils_cxt.RecentGlobalCatalogXmin = GetOldestCatalogXmin();
}
return result;
}
}
@ -3683,6 +3700,7 @@ static bool GetSnapshotDataDataNode(Snapshot snapshot)
gtm_snapshot->csn, snapshot->gtm_snapshot_type, SNAPSHOT_DIRECT);
u_sess->utils_cxt.g_GTM_Snapshot->csn = snapshot->snapshotcsn;
u_sess->utils_cxt.RecentGlobalXmin = GetOldestXmin(NULL, true);
u_sess->utils_cxt.RecentGlobalCatalogXmin = GetOldestCatalogXmin();
return true;
}
}
@ -3778,6 +3796,7 @@ static bool GetSnapshotDataCoordinator(Snapshot snapshot)
u_sess->utils_cxt.g_GTM_Snapshot->csn = snapshot->snapshotcsn;
u_sess->utils_cxt.RecentGlobalXmin = GetOldestXmin(NULL, true);
u_sess->utils_cxt.RecentGlobalCatalogXmin = GetOldestCatalogXmin();
}
if (module_logging_is_on(MOD_TRANS_SNAPSHOT)) {
ereport(LOG, (errmodule(MOD_TRANS_SNAPSHOT),
@ -4332,6 +4351,7 @@ Snapshot GetLocalSnapshotData(Snapshot snapshot)
u_sess->utils_cxt.RecentGlobalXmin = replication_slot_xmin;
}
u_sess->utils_cxt.RecentGlobalCatalogXmin = GetOldestCatalogXmin();
u_sess->utils_cxt.RecentXmin = snapxid->xmin;
snapshot->xmin = snapxid->xmin;
snapshot->xmax = snapxid->xmax;

View File

@ -564,7 +564,7 @@ typedef struct knl_u_utils_context {
* for the convenience of TransactionIdIsInProgress: even in bootstrap
* mode, we don't want it to say that BootstrapTransactionId is in progress.
*
* RecentGlobalXmin and RecentGlobalDataXmin are initialized to
* RecentGlobalXmin, RecentGlobalDataXmin, RecentGlobalCatalogXmin are initialized to
* InvalidTransactionId, to ensure that no one tries to use a stale
* value. Readers should ensure that it has been set to something else
* before using it.
@ -579,6 +579,9 @@ typedef struct knl_u_utils_context {
TransactionId RecentGlobalDataXmin;
/* recent global catalog xmin, consider replication slot catalog xmin */
TransactionId RecentGlobalCatalogXmin;
/* Global snapshot data */
bool cn_xc_maintain_mode;
int snapshot_source;

View File

@ -74,6 +74,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourc
extern RunningTransactions GetRunningTransactionData(void);
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestCatalogXmin();
extern TransactionId GetRecentGlobalXmin(void);
extern TransactionId GetOldestXmin(Relation rel, bool bFixRecentGlobalXmin = false,
bool bRecentGlobalXminNoCheck = false);