!2971 【共享存储】适配极致RTO

Merge pull request !2971 from 陈栋/rto
This commit is contained in:
opengauss-bot 2023-02-27 12:57:41 +00:00 committed by Gitee
commit 3fee79a080
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
12 changed files with 207 additions and 79 deletions

View File

@ -154,14 +154,14 @@ set(CHECK_OPTIONS -Wmissing-format-attribute -Wno-attributes -Wno-unused-but-set
set(MACRO_OPTIONS -D_GLIBCXX_USE_CXX11_ABI=0 -DENABLE_GSTRACE -D_GNU_SOURCE -DPGXC -D_POSIX_PTHREAD_SEMANTICS -D_REENTRANT -DSTREAMPLAN -D_THREAD_SAFE ${DB_COMMON_DEFINE})
# Set MAX_ALLOC_SEGNUM size in extreme_rto
if(${WAL_SEGSIZE} LESS 512)
if(${WAL_SEGSIZE} LESS 256)
set(MAX_ALLOC_SEGNUM 4)
elseif(${WAL_SEGSIZE} GREATER_EQUAL 512 AND ${WAL_SEGSIZE} LESS 1024)
elseif(${WAL_SEGSIZE} GREATER_EQUAL 256 AND ${WAL_SEGSIZE} LESS 512)
set(MAX_ALLOC_SEGNUM 2)
elseif(${WAL_SEGSIZE} GREATER_EQUAL 1024)
elseif(${WAL_SEGSIZE} GREATER_EQUAL 512)
set(MAX_ALLOC_SEGNUM 1)
else()
message(FATAL_ERROR "error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024.")
message(FATAL_ERROR "error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512.")
endif()
# libraries need secure options during compling

9
configure vendored
View File

@ -3718,9 +3718,8 @@ case ${wal_segsize} in
128) ;;
256) ;;
512) ;;
1024) ;;
*) { { $as_echo "$as_me:$LINENO: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024." >&5
$as_echo "$as_me: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024." >&2;}
*) { { $as_echo "$as_me:$LINENO: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512." >&5
$as_echo "$as_me: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512." >&2;}
{ (exit 1); exit 1; }; }
esac
{ $as_echo "$as_me:$LINENO: result: ${wal_segsize}MB" >&5
@ -3732,9 +3731,9 @@ cat >>confdefs.h <<_ACEOF
_ACEOF
# Set number of MAX_ALLOC_SEGNUM in extreme_rto
if test "${wal_segsize}" -lt 512; then
if test "${wal_segsize}" -lt 256; then
MAX_ALLOC_SEGNUM=4
elif test "${wal_segsize}" -ge 512 -a "${wal_segsize}" -lt 1024; then
elif test "${wal_segsize}" -ge 256 -a "${wal_segsize}" -lt 512; then
MAX_ALLOC_SEGNUM=2
else
MAX_ALLOC_SEGNUM=1

View File

@ -384,8 +384,7 @@ case ${wal_segsize} in
128) ;;
256) ;;
512) ;;
1024) ;;
*) AC_MSG_ERROR([Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024.])
*) AC_MSG_ERROR([Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512.])
esac
AC_MSG_RESULT([${wal_segsize}MB])
@ -398,9 +397,9 @@ AC_DEFINE_UNQUOTED([XLOG_SEG_SIZE], [(${wal_segsize} * 1024 * 1024)], [
])
# Set number of MAX_ALLOC_SEGNUM in extreme_rto
if test "${wal_segsize}" -lt 512; then
if test "${wal_segsize}" -lt 256; then
MAX_ALLOC_SEGNUM=4
elif test "${wal_segsize}" -ge 512 -a "${wal_segsize}" -lt 1024; then
elif test "${wal_segsize}" -ge 256 -a "${wal_segsize}" -lt 512; then
MAX_ALLOC_SEGNUM=2
else
MAX_ALLOC_SEGNUM=1

View File

@ -1186,7 +1186,7 @@ su - gaussdb
the size of each individual file in the WAL log. It may be useful
to adjust this size to control the granularity of WAL log shipping.
The default size is 16 megabytes.
The value must be a power of 2 between 1 and 1024 (megabytes).
The value must be a power of 2 between 1 and 512 (megabytes).
Note that changing this value requires an gs_initdb.
</para>
</listitem>

View File

@ -25,6 +25,7 @@
#include "postgres.h"
#include "access/xlog.h"
#include "access/multi_redo_api.h"
#include "postmaster/postmaster.h"
#include "storage/smgr/fd.h"
#include "storage/dss/fio_dss.h"
@ -114,8 +115,9 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int
{
/* Load reader private data */
XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data;
int emode = readprivate->emode;
bool randAccess = readprivate->randAccess;
int emode = IsExtremeRedo() ? LOG : readprivate->emode;
bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess;
XLogRecPtr RecPtr = targetPagePtr;
uint32 targetPageOff;
#ifdef USE_ASSERT_CHECKING
@ -136,6 +138,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int
}
XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo);
XLByteAdvance(RecPtr, expectReadLen);
/* In archive or crash recovery. */
if (t_thrd.xlog_cxt.readFile < 0) {
@ -176,7 +179,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int
static_cast<uint32>(targetPagePtr >> BIT_NUM_INT32),
static_cast<uint32>(targetPagePtr), targetPageOff,
expectReadLen)));
ereport(emode_for_corrupt_record(emode, targetPagePtr),
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),

View File

@ -1,2 +1,2 @@
dms_commit_id=739b732fdefe65f110849353773b4b54a33731a9
dms_commit_id=662fc4c3f44761009a572e3900b38da5b2886d4a
dss_commit_id=1b532395350e34db8cd9584597dcb022bed9dd35

View File

@ -2246,12 +2246,6 @@ int PostmasterMain(int argc, char* argv[])
ExitPostmaster(1);
}
if (ENABLE_DSS) {
if (g_instance.attr.attr_storage.recovery_parse_workers > 1 ||
g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker > 1) {
write_stderr("Not support extreme RTO while DMS and DSS enabled, please cancel rto parameter\n");
ExitPostmaster(1);
}
if (u_sess->attr.attr_common.XLogArchiveMode || strlen(u_sess->attr.attr_storage.XLogArchiveCommand) != 0) {
write_stderr("Not support archive function while DMS and DSS enabled\n");
ExitPostmaster(1);
@ -6833,7 +6827,7 @@ static void reaper(SIGNAL_ARGS)
g_instance.pid_cxt.AutoVacPID = initialize_util_thread(AUTOVACUUM_LAUNCHER);
if (SS_REFORM_PARTNER) {
write_stderr("%s LOG: I'm still a reform partner waiting for refom finished\n",
write_stderr("%s LOG: I'm still a reform partner waiting for reform finished\n",
GetReaperLogPrefix(logBuf, ReaperLogBufSize));
continue;
}

View File

@ -53,6 +53,7 @@
#include "access/twophase.h"
#include "access/redo_common.h"
#include "access/extreme_rto/page_redo.h"
#include "ddes/dms/ss_dms_bufmgr.h"
THR_LOCAL RedoParseManager *g_parseManager = NULL;
THR_LOCAL RedoBufferManager *g_bufferManager = NULL;
@ -1689,25 +1690,42 @@ void ExtremeRtoFlushBuffer(RedoBufferInfo *bufferinfo, bool updateFsm)
if (bufferinfo->pageinfo.page != NULL) {
BufferDesc *bufDesc = GetBufferDescriptor(bufferinfo->buf - 1);
if (bufferinfo->dirtyflag || XLByteLT(bufDesc->extra->lsn_on_disk, PageGetLSN(bufferinfo->pageinfo.page))) {
MarkBufferDirty(bufferinfo->buf);
if (!bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) {
int mode = WARNING;
if (ENABLE_DMS) {
if ((GetDmsBufCtrl(bufDesc->buf_id)->state & BUF_DIRTY_NEED_FLUSH) &&
!bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) {
const uint32 shiftSz = 32;
ereport(PANIC, (errmsg("[SS] extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \
lsn_page %X/%X, page %u/%u/%u %u",
(uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn),
(uint32)(bufDesc->extra->lsn_on_disk >> shiftSz),
(uint32)(bufDesc->extra->lsn_on_disk),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page)),
bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode,
bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno)));
}
} else {
MarkBufferDirty(bufferinfo->buf);
if (!bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) {
int mode = WARNING;
#ifdef USE_ASSERT_CHECKING
mode = PANIC;
mode = PANIC;
#endif
const uint32 shiftSz = 32;
ereport(mode, (errmsg("extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \
lsn_page %X/%X, page %u/%u/%u %u",
(uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn),
(uint32)(bufDesc->extra->lsn_on_disk >> shiftSz),
(uint32)(bufDesc->extra->lsn_on_disk),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page)),
bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode,
bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno)));
}
#ifdef USE_ASSERT_CHECKING
bufDesc->lsn_dirty = PageGetLSN(bufferinfo->pageinfo.page);
#endif
const uint32 shiftSz = 32;
ereport(mode, (errmsg("extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \
lsn_page %X/%X, page %u/%u/%u %u",
(uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn),
(uint32)(bufDesc->extra->lsn_on_disk >> shiftSz), (uint32)(bufDesc->extra->lsn_on_disk),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz),
(uint32)(PageGetLSN(bufferinfo->pageinfo.page)),
bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode,
bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno)));
}
#ifdef USE_ASSERT_CHECKING
bufDesc->lsn_dirty = PageGetLSN(bufferinfo->pageinfo.page);
#endif
}
UnlockReleaseBuffer(bufferinfo->buf); /* release buffer */

View File

@ -115,6 +115,7 @@ static void GetSlotIds(XLogReaderState *record);
static void GetUndoSlotIds(XLogReaderState *record);
STATIC LogDispatcher *CreateDispatcher();
static void DestroyRecoveryWorkers();
static void SSDestroyRecoveryWorkers();
static void DispatchRecordWithPages(XLogReaderState *, List *);
static void DispatchRecordWithoutPage(XLogReaderState *, List *);
@ -353,6 +354,45 @@ void AllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen)
#endif
}
void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen)
{
XLogReaderState *initreader;
errno_t errorno = EOK;
initreader = GetXlogReader(xlogreader);
initreader->isPRProcess = true;
g_dispatcher->rtoXlogBufState.readWorkerState = WORKER_STATE_STOP;
g_dispatcher->rtoXlogBufState.readPageWorkerState = WORKER_STATE_STOP;
g_dispatcher->rtoXlogBufState.readSource = 0;
g_dispatcher->rtoXlogBufState.failSource = 0;
g_dispatcher->rtoXlogBufState.xlogReadManagerState = READ_MANAGER_RUN;
g_dispatcher->rtoXlogBufState.targetRecPtr = InvalidXLogRecPtr;
g_dispatcher->rtoXlogBufState.expectLsn = InvalidXLogRecPtr;
g_dispatcher->rtoXlogBufState.waitRedoDone = 0;
g_dispatcher->rtoXlogBufState.readBuf = (char *)palloc0(XLOG_BLCKSZ);
g_dispatcher->rtoXlogBufState.readprivate = (void *)palloc0(MAXALIGN(privateLen));
errorno = memset_s(g_dispatcher->rtoXlogBufState.readprivate, MAXALIGN(privateLen), 0, MAXALIGN(privateLen));
securec_check(errorno, "", "");
g_dispatcher->rtoXlogBufState.errormsg_buf = (char *)palloc0(MAX_ERRORMSG_LEN + 1);
g_dispatcher->rtoXlogBufState.errormsg_buf[0] = '\0';
initreader->readBuf = g_dispatcher->rtoXlogBufState.readBuf;
errorno = memcpy_s(initreader->readBuf, XLOG_BLCKSZ, xlogreader->readBuf, xlogreader->readLen);
securec_check(errorno, "", "");
initreader->errormsg_buf = g_dispatcher->rtoXlogBufState.errormsg_buf;
initreader->private_data = g_dispatcher->rtoXlogBufState.readprivate;
CopyDataFromOldReader(initreader, xlogreader);
g_dispatcher->rtoXlogBufState.initreader = initreader;
g_recordbuffer = &g_dispatcher->rtoXlogBufState;
g_startupTriggerState = TRIGGER_NORMAL;
g_readManagerTriggerFlag = TRIGGER_NORMAL;
#ifdef USE_ASSERT_CHECKING
InitLsnCheckCtl(xlogreader->ReadRecPtr);
#endif
}
void HandleStartupInterruptsForExtremeRto()
{
Assert(AmStartupProcess());
@ -400,7 +440,11 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen)
g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE *
ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker
/* alloc for record readbuf */
AllocRecordReadBuffer(xlogreader, privateLen);
if (ENABLE_DMS && ENABLE_DSS) {
SSAllocRecordReadBuffer(xlogreader, privateLen);
} else {
AllocRecordReadBuffer(xlogreader, privateLen);
}
StartPageRedoWorkers(get_real_recovery_parallelism());
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
@ -620,7 +664,11 @@ static void StopRecoveryWorkers(int code, Datum arg)
pg_atomic_write_u32(&g_dispatcher->rtoXlogBufState.readWorkerState, WORKER_STATE_EXIT);
ShutdownWalRcv();
FreeAllocatedRedoItem();
DestroyRecoveryWorkers();
if (ENABLE_DSS && ENABLE_DMS) {
SSDestroyRecoveryWorkers();
} else {
DestroyRecoveryWorkers();
}
g_startupTriggerState = TRIGGER_NORMAL;
g_readManagerTriggerFlag = TRIGGER_NORMAL;
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), errmsg("parallel redo(startup) thread exit")));
@ -667,6 +715,45 @@ static void DestroyRecoveryWorkers()
}
}
static void SSDestroyRecoveryWorkers()
{
if (g_dispatcher != NULL) {
SpinLockAcquire(&(g_instance.comm_cxt.predo_cxt.destroy_lock));
for (uint32 i = 0; i < g_dispatcher->pageLineNum; i++) {
DestroyPageRedoWorker(g_dispatcher->pageLines[i].batchThd);
DestroyPageRedoWorker(g_dispatcher->pageLines[i].managerThd);
for (uint32 j = 0; j < g_dispatcher->pageLines[i].redoThdNum; j++) {
DestroyPageRedoWorker(g_dispatcher->pageLines[i].redoThd[j]);
}
if (g_dispatcher->pageLines[i].chosedRTIds != NULL) {
pfree(g_dispatcher->pageLines[i].chosedRTIds);
}
}
DestroyPageRedoWorker(g_dispatcher->trxnLine.managerThd);
DestroyPageRedoWorker(g_dispatcher->trxnLine.redoThd);
DestroyPageRedoWorker(g_dispatcher->readLine.managerThd);
DestroyPageRedoWorker(g_dispatcher->readLine.readThd);
pfree(g_dispatcher->rtoXlogBufState.readBuf);
pfree(g_dispatcher->rtoXlogBufState.errormsg_buf);
pfree(g_dispatcher->rtoXlogBufState.readprivate);
#ifdef USE_ASSERT_CHECKING
if (g_dispatcher->originLsnCheckAddr != NULL) {
pfree(g_dispatcher->originLsnCheckAddr);
g_dispatcher->originLsnCheckAddr = NULL;
g_dispatcher->lsnCheckCtl = NULL;
}
#endif
if (get_real_recovery_parallelism() > 1) {
(void)MemoryContextSwitchTo(g_dispatcher->oldCtx);
MemoryContextDelete(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
g_instance.comm_cxt.predo_cxt.parallelRedoCtx = NULL;
}
g_dispatcher = NULL;
SpinLockRelease(&(g_instance.comm_cxt.predo_cxt.destroy_lock));
}
}
static bool RmgrRecordInfoValid(XLogReaderState *record, uint8 minInfo, uint8 maxInfo)
{
uint8 info = (XLogRecGetInfo(record) & (~XLR_INFO_MASK));
@ -1680,6 +1767,8 @@ void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState,
newState->currRecPtr = oldState->currRecPtr;
newState->readBuf = oldState->readBuf;
newState->readLen = oldState->readLen;
newState->preReadStartPtr = oldState->preReadStartPtr;
newState->preReadBuf = oldState->preReadBuf;
newState->decoded_record = NULL;
newState->main_data = NULL;

View File

@ -9912,6 +9912,12 @@ void StartupXLOG(void)
}
}
if (SSSKIP_REDO_REPLAY && t_thrd.xlog_cxt.InRecovery == true) {
/* do not need replay anything in SS standby mode */
ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode")));
t_thrd.xlog_cxt.InRecovery = false;
}
ReadRemainSegsFile();
/* Determine whether it is currently in the switchover of streaming disaster recovery */
checkHadrInSwitchover();
@ -10324,10 +10330,6 @@ void StartupXLOG(void)
}
}
if (ENABLE_DMS && SSSKIP_REDO_REPLAY) {
break;
}
/*
* ShmemVariableCache->nextXid must be beyond record's xid.
*
@ -11880,6 +11882,15 @@ void CreateCheckPoint(int flags)
/* allow standby do checkpoint only after it has promoted AND has finished recovery. */
if (ENABLE_DMS && SS_STANDBY_MODE) {
if (shutdown) {
START_CRIT_SECTION();
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
t_thrd.shemem_ptr_cxt.ControlFile->state = DB_SHUTDOWNED;
t_thrd.shemem_ptr_cxt.ControlFile->time = (pg_time_t)time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
END_CRIT_SECTION();
}
return;
} else if (SSFAILOVER_TRIGGER) {
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] do not do CreateCheckpoint during failover")));
@ -17158,7 +17169,12 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (readSource & XLOG_FROM_STREAM) {
readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
} else {
readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
if (SSFAILOVER_TRIGGER || SS_STANDBY_PROMOTING) {
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
xlogreader->readBuf, readTLI, NULL);
} else {
readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
}
}
if (readLen > 0 || t_thrd.xlog_cxt.recoveryTriggered || !t_thrd.xlog_cxt.StandbyMode || DoEarlyExit()) {
@ -17885,7 +17901,7 @@ retry:
if (!ss_ret) {
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("[ss] could not read from log file %s to offset %u: %m",
errmsg("[SS] could not read from log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),
t_thrd.xlog_cxt.readOff)));
goto next_record_is_invalid;

View File

@ -11,16 +11,16 @@
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
* -------------------------------------------------------------------------
*
* dms_api.h
*
* IDENTIFICATION
* src/interface/dms_api.h
*
* ---------------------------------------------------------------------------------------
* IDENTIFICATION
* src/interface/dms_api.h
*
* -------------------------------------------------------------------------
*/
#ifndef __DMS_API_H__
#define __DMS_API_H__
@ -28,7 +28,6 @@
extern "C" {
#endif
#define DMS_SUCCESS 0
#define DMS_ERROR (-1)
#ifdef OPENGAUSS
@ -382,6 +381,12 @@ typedef enum en_dms_cm_stat {
DMS_CM_RES_STATE_COUNT = 3,
} dms_cm_stat_t;
typedef struct st_dw_recovery_info {
unsigned long long bitmap_old_join; // the old-join-inst bitmap in dw_recovery phase
unsigned long long bitmap_old_remove; // the old-remove-inst bitmap in dw_recovery phase
unsigned long long bitmap_new_join; // the new-join-inst bitmap in dw_recovery phase
} dw_recovery_info_t;
typedef struct st_inst_list {
unsigned char inst_id_list[DMS_MAX_INSTANCES];
unsigned char inst_id_count;
@ -473,17 +478,18 @@ typedef struct st_dcs_batch_buf {
typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id);
typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id,
unsigned int save_ctrl);
unsigned int save_ctrl);
typedef int(*dms_get_dms_status)(void *db_handle);
typedef void(*dms_set_dms_status)(void *db_handle, int status);
typedef int(*dms_confirm_converting)(void *db_handle, char *pageid, unsigned char smon_chk,
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn, unsigned int *ver);
typedef int(*dms_confirm_owner)(void *db_handle, char *pageid, unsigned char *lock_mode, unsigned char *is_edp,
unsigned long long *lsn);
unsigned long long *lsn);
typedef int(*dms_flush_copy)(void *db_handle, char *pageid);
typedef int(*dms_edp_lsn)(void *db_handle, char *pageid, unsigned long long *lsn);
typedef int(*dms_disk_lsn)(void *db_handle, char *pageid, unsigned long long *lsn);
typedef int(*dms_recovery)(void *db_handle, void *recovery_list, int is_reformer);
typedef int(*dms_dw_recovery)(void *db_handle, void *recovery_list, int is_reformer);
typedef int(*dms_opengauss_startup)(void *db_handle);
typedef int(*dms_opengauss_recovery_standby)(void *db_handle, int inst_id);
typedef int(*dms_opengauss_recovery_primary)(void *db_handle, int inst_id);
@ -506,9 +512,9 @@ typedef unsigned long long(*dms_get_global_scn)(void *db_handle);
typedef unsigned long long(*dms_get_global_lsn)(void *db_handle);
typedef unsigned long long(*dms_get_global_flushed_lfn)(void *db_handle);
typedef int(*dms_read_local_page4transfer)(void *db_handle, char pageid[DMS_PAGEID_SIZE],
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
typedef int(*dms_try_read_local_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE],
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl);
typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size);
@ -518,20 +524,20 @@ typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index);
typedef void *(*dms_stack_push_cr_cursor)(void *db_handle);
typedef void (*dms_stack_pop_cr_cursor)(void *db_handle);
typedef void(*dms_init_cr_cursor)(void *cr_cursor, char pageid[DMS_PAGEID_SIZE], char xid[DMS_XID_SIZE],
unsigned long long query_scn, unsigned int ssn);
unsigned long long query_scn, unsigned int ssn);
typedef void(*dms_init_index_cr_cursor)(void *cr_cursor, char pageid[DMS_PAGEID_SIZE], char xid[DMS_XID_SIZE],
unsigned long long query_scn, unsigned int ssn, char entry[DMS_PAGEID_SIZE], char *index_profile);
typedef void(*dms_init_check_cr_cursor)(void *cr_cursor, char rowid[DMS_ROWID_SIZE], char xid[DMS_XID_SIZE],
unsigned long long query_scn, unsigned int ssn);
unsigned long long query_scn, unsigned int ssn);
typedef char *(*dms_get_wxid_from_cr_cursor)(void *cr_cursor);
typedef unsigned char(*dms_get_instid_of_xid_from_cr_cursor)(void *db_handle, void *cr_cursor);
typedef int (*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page,
typedef int(*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page,
unsigned char *is_empty_txn_list, unsigned char *exist_waiting_txn);
typedef int (*dms_reorganize_heap_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page,
typedef int(*dms_reorganize_heap_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page,
unsigned char *fb_mark);
typedef int (*dms_reorganize_index_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page);
typedef int(*dms_reorganize_index_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page);
typedef int(*dms_check_heap_page_visible_with_undo_snapshot)(void *db_handle, void *cr_cursor, void *page,
unsigned char *is_found);
unsigned char *is_found);
typedef void(*dms_set_page_force_request)(void *db_handle, char pageid[DMS_PAGEID_SIZE]);
typedef void(*dms_get_entry_pageid_from_cr_cursor)(void *cr_cursor, char index_entry_pageid[DMS_PAGEID_SIZE]);
typedef void(*dms_get_index_profile_from_cr_cursor)(void *cr_cursor, char index_profile[DMS_INDEX_PROFILE_SIZE]);
@ -544,22 +550,22 @@ typedef void(*dms_mem_free)(void *context, void *ptr);
typedef void(*dms_mem_reset)(void *context);
// The maximum length of output_msg is 128 bytes.
typedef int (*dms_process_broadcast)(void *db_handle, char *data, unsigned int len, char *output_msg,
unsigned int *output_msg_len);
unsigned int *output_msg_len);
typedef int (*dms_process_broadcast_ack)(void *db_handle, char *data, unsigned int len);
typedef int(*dms_get_txn_info)(void *db_handle, unsigned long long xid,
unsigned char is_scan, dms_txn_info_t *txn_info);
unsigned char is_scan, dms_txn_info_t *txn_info);
typedef int(*dms_get_opengauss_xid_csn)(void *db_handle, dms_opengauss_xid_csn_t *csn_req,
dms_opengauss_csn_result_t *csn_ack);
dms_opengauss_csn_result_t *csn_ack);
typedef int(*dms_get_opengauss_update_xid)(void *db_handle, unsigned long long xid,
unsigned int t_infomask, unsigned int t_infomask2, unsigned long long *uxid);
unsigned int t_infomask, unsigned int t_infomask2, unsigned long long *uxid);
typedef int(*dms_get_opengauss_txn_status)(void *db_handle, unsigned long long xid, unsigned char type,
unsigned char* status);
unsigned char* status);
typedef int(*dms_opengauss_lock_buffer)(void *db_handle, int buffer, unsigned char lock_mode,
unsigned char* curr_mode);
unsigned char* curr_mode);
typedef int(*dms_get_txn_snapshot)(void *db_handle, unsigned int xmap, dms_txn_snapshot_t *txn_snapshot);
typedef int(*dms_get_opengauss_txn_snapshot)(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot);
typedef void (*dms_log_output)(dms_log_id_t log_type, dms_log_level_t log_level, const char *code_file_name,
unsigned int code_line_num, const char *module_name, const char *format, ...);
unsigned int code_line_num, const char *module_name, const char *format, ...);
typedef int (*dms_log_flush)(void *db_handle, unsigned long long *lsn);
typedef int(*dms_process_edp)(void *db_handle, dms_edp_info_t *pages, unsigned int count);
typedef void (*dms_clean_ctrl_edp)(void *db_handle, dms_buf_ctrl_t *dms_ctrl);
@ -567,6 +573,7 @@ typedef char *(*dms_display_pageid)(char *display_buf, unsigned int count, char
typedef char *(*dms_display_xid)(char *display_buf, unsigned int count, char *xid);
typedef char *(*dms_display_rowid)(char *display_buf, unsigned int count, char *rowid);
typedef int (*dms_drc_buf_res_rebuild)(void *db_handle);
typedef int (*dms_drc_buf_res_rebuild_parallel)(void *db_handle, unsigned char thread_index, unsigned char thread_num);
typedef unsigned char(*dms_ckpt_session)(void *db_handle);
typedef void (*dms_check_if_build_complete)(void *db_handle, unsigned int *build_complete);
typedef int (*dms_db_is_primary)(void *db_handle);
@ -590,11 +597,11 @@ typedef void (*dms_get_rowid_by_rmid)(void *db_handle, unsigned short rmid, char
typedef void (*dms_get_sql_from_session)(void *db_handle, unsigned short sid, char *sql_str, unsigned int sql_str_len);
typedef void (*dms_get_itl_lock_by_xid)(void *db_handle, char xid[DMS_XID_SIZE], char *ilock, unsigned int ilock_len);
typedef void (*dms_check_tlock_status)(void *db_handle, unsigned int type, unsigned short sid,
unsigned long long tableid, unsigned int *in_use);
unsigned long long tableid, unsigned int *in_use);
typedef void (*dms_get_tlock_msg_by_tid)(void *db_handle, unsigned long long table_id, unsigned int type, char *rsp,
unsigned int rsp_len, unsigned int *tlock_cnt);
unsigned int rsp_len, unsigned int *tlock_cnt);
typedef void (*dms_get_tlock_msg_by_rm)(void *db_handle, unsigned short sid, unsigned short rmid, int type, char *tlock,
unsigned int tlock_len);
unsigned int tlock_len);
typedef int (*dms_switchover_demote)(void *db_handle);
typedef int (*dms_switchover_promote)(void *db_handle);
@ -618,6 +625,7 @@ typedef struct st_dms_callback {
dms_edp_lsn edp_lsn;
dms_disk_lsn disk_lsn;
dms_recovery recovery;
dms_dw_recovery dw_recovery;
dms_db_is_primary db_is_primary;
dms_get_open_status get_open_status;
dms_undo_init undo_init;
@ -626,6 +634,7 @@ typedef struct st_dms_callback {
dms_tx_rollback_finish tx_rollback_finish;
dms_recovery_in_progress recovery_in_progress;
dms_drc_buf_res_rebuild dms_reform_rebuild_buf_res;
dms_drc_buf_res_rebuild_parallel dms_reform_rebuild_parallel;
dms_check_if_build_complete check_if_build_complete;
// used in reform for opengauss
@ -763,10 +772,10 @@ typedef struct st_dms_profile {
unsigned char rdma_rpc_bind_core_end;
char ock_log_path[DMS_OCK_LOG_PATH_LEN];
unsigned char enable_reform;
//ock scrlock configs
// ock scrlock configs
unsigned char enable_scrlock;
unsigned int primary_inst_id;
unsigned char enable_ssl;
unsigned char enable_ssl;
unsigned int scrlock_log_level;
unsigned char enable_scrlock_worker_bind_core;
unsigned int scrlock_worker_cnt;
@ -776,6 +785,7 @@ typedef struct st_dms_profile {
unsigned char enable_scrlock_server_sleep_mode;
unsigned char scrlock_server_bind_core_start;
unsigned char scrlock_server_bind_core_end;
unsigned char parallel_thread_num;
} dms_profile_t;
typedef struct st_logger_param {
@ -793,7 +803,7 @@ typedef struct st_logger_param {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 51
#define DMS_LOCAL_VERSION 52
#ifdef __cplusplus
}

View File

@ -52,7 +52,7 @@ sub _new
$options->{wal_segsize} = 16
unless $options->{wal_segsize}; # undef or 0 means default
die "Bad wal_segsize $options->{wal_segsize}"
unless grep { $_ == $options->{wal_segsize} } (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024);
unless grep { $_ == $options->{wal_segsize} } (1, 2, 4, 8, 16, 32, 64, 128, 256, 512);
$self->DeterminePlatform();