diff --git a/cmake/src/build_options.cmake b/cmake/src/build_options.cmake index 536ccac81..3b709d49b 100755 --- a/cmake/src/build_options.cmake +++ b/cmake/src/build_options.cmake @@ -154,14 +154,14 @@ set(CHECK_OPTIONS -Wmissing-format-attribute -Wno-attributes -Wno-unused-but-set set(MACRO_OPTIONS -D_GLIBCXX_USE_CXX11_ABI=0 -DENABLE_GSTRACE -D_GNU_SOURCE -DPGXC -D_POSIX_PTHREAD_SEMANTICS -D_REENTRANT -DSTREAMPLAN -D_THREAD_SAFE ${DB_COMMON_DEFINE}) # Set MAX_ALLOC_SEGNUM size in extreme_rto -if(${WAL_SEGSIZE} LESS 512) +if(${WAL_SEGSIZE} LESS 256) set(MAX_ALLOC_SEGNUM 4) -elseif(${WAL_SEGSIZE} GREATER_EQUAL 512 AND ${WAL_SEGSIZE} LESS 1024) +elseif(${WAL_SEGSIZE} GREATER_EQUAL 256 AND ${WAL_SEGSIZE} LESS 512) set(MAX_ALLOC_SEGNUM 2) -elseif(${WAL_SEGSIZE} GREATER_EQUAL 1024) +elseif(${WAL_SEGSIZE} GREATER_EQUAL 512) set(MAX_ALLOC_SEGNUM 1) else() - message(FATAL_ERROR "error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024.") + message(FATAL_ERROR "error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512.") endif() # libraries need secure options during compling diff --git a/configure b/configure index b1d953c91..e5645dd75 100755 --- a/configure +++ b/configure @@ -3718,9 +3718,8 @@ case ${wal_segsize} in 128) ;; 256) ;; 512) ;; - 1024) ;; - *) { { $as_echo "$as_me:$LINENO: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024." >&5 -$as_echo "$as_me: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024." >&2;} + *) { { $as_echo "$as_me:$LINENO: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512." >&5 +$as_echo "$as_me: error: Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512." >&2;} { (exit 1); exit 1; }; } esac { $as_echo "$as_me:$LINENO: result: ${wal_segsize}MB" >&5 @@ -3732,9 +3731,9 @@ cat >>confdefs.h <<_ACEOF _ACEOF # Set number of MAX_ALLOC_SEGNUM in extreme_rto -if test "${wal_segsize}" -lt 512; then +if test "${wal_segsize}" -lt 256; then MAX_ALLOC_SEGNUM=4 -elif test "${wal_segsize}" -ge 512 -a "${wal_segsize}" -lt 1024; then +elif test "${wal_segsize}" -ge 256 -a "${wal_segsize}" -lt 512; then MAX_ALLOC_SEGNUM=2 else MAX_ALLOC_SEGNUM=1 diff --git a/configure.in b/configure.in index 249d16d65..c0dff8fe3 100644 --- a/configure.in +++ b/configure.in @@ -384,8 +384,7 @@ case ${wal_segsize} in 128) ;; 256) ;; 512) ;; - 1024) ;; - *) AC_MSG_ERROR([Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512,1024.]) + *) AC_MSG_ERROR([Invalid WAL segment size. Allowed values are 1,2,4,8,16,32,64,128,256,512.]) esac AC_MSG_RESULT([${wal_segsize}MB]) @@ -398,9 +397,9 @@ AC_DEFINE_UNQUOTED([XLOG_SEG_SIZE], [(${wal_segsize} * 1024 * 1024)], [ ]) # Set number of MAX_ALLOC_SEGNUM in extreme_rto -if test "${wal_segsize}" -lt 512; then +if test "${wal_segsize}" -lt 256; then MAX_ALLOC_SEGNUM=4 -elif test "${wal_segsize}" -ge 512 -a "${wal_segsize}" -lt 1024; then +elif test "${wal_segsize}" -ge 256 -a "${wal_segsize}" -lt 512; then MAX_ALLOC_SEGNUM=2 else MAX_ALLOC_SEGNUM=1 diff --git a/doc/src/sgml/installation.sgmlin b/doc/src/sgml/installation.sgmlin index cd20ea98b..7615fdf86 100644 --- a/doc/src/sgml/installation.sgmlin +++ b/doc/src/sgml/installation.sgmlin @@ -1186,7 +1186,7 @@ su - gaussdb the size of each individual file in the WAL log. It may be useful to adjust this size to control the granularity of WAL log shipping. The default size is 16 megabytes. - The value must be a power of 2 between 1 and 1024 (megabytes). + The value must be a power of 2 between 1 and 512 (megabytes). Note that changing this value requires an gs_initdb. diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index c0b76f146..49a0e07e0 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -25,6 +25,7 @@ #include "postgres.h" #include "access/xlog.h" +#include "access/multi_redo_api.h" #include "postmaster/postmaster.h" #include "storage/smgr/fd.h" #include "storage/dss/fio_dss.h" @@ -114,8 +115,9 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int { /* Load reader private data */ XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; - int emode = readprivate->emode; - bool randAccess = readprivate->randAccess; + int emode = IsExtremeRedo() ? LOG : readprivate->emode; + bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess; + XLogRecPtr RecPtr = targetPagePtr; uint32 targetPageOff; #ifdef USE_ASSERT_CHECKING @@ -136,6 +138,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int } XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo); + XLByteAdvance(RecPtr, expectReadLen); /* In archive or crash recovery. */ if (t_thrd.xlog_cxt.readFile < 0) { @@ -176,7 +179,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int static_cast(targetPagePtr >> BIT_NUM_INT32), static_cast(targetPagePtr), targetPageOff, expectReadLen))); - ereport(emode_for_corrupt_record(emode, targetPagePtr), + ereport(emode_for_corrupt_record(emode, RecPtr), (errcode_for_file_access(), errmsg("could not read from log file %s to offset %u: %m", XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index 8a0faca11..e8bcdaaf7 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,2 +1,2 @@ -dms_commit_id=739b732fdefe65f110849353773b4b54a33731a9 +dms_commit_id=662fc4c3f44761009a572e3900b38da5b2886d4a dss_commit_id=1b532395350e34db8cd9584597dcb022bed9dd35 diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 0c1269d0f..10939c65e 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -2246,12 +2246,6 @@ int PostmasterMain(int argc, char* argv[]) ExitPostmaster(1); } if (ENABLE_DSS) { - if (g_instance.attr.attr_storage.recovery_parse_workers > 1 || - g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker > 1) { - write_stderr("Not support extreme RTO while DMS and DSS enabled, please cancel rto parameter\n"); - ExitPostmaster(1); - } - if (u_sess->attr.attr_common.XLogArchiveMode || strlen(u_sess->attr.attr_storage.XLogArchiveCommand) != 0) { write_stderr("Not support archive function while DMS and DSS enabled\n"); ExitPostmaster(1); @@ -6833,7 +6827,7 @@ static void reaper(SIGNAL_ARGS) g_instance.pid_cxt.AutoVacPID = initialize_util_thread(AUTOVACUUM_LAUNCHER); if (SS_REFORM_PARTNER) { - write_stderr("%s LOG: I'm still a reform partner waiting for refom finished\n", + write_stderr("%s LOG: I'm still a reform partner waiting for reform finished\n", GetReaperLogPrefix(logBuf, ReaperLogBufSize)); continue; } diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 3aba4214f..41bc46b01 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -53,6 +53,7 @@ #include "access/twophase.h" #include "access/redo_common.h" #include "access/extreme_rto/page_redo.h" +#include "ddes/dms/ss_dms_bufmgr.h" THR_LOCAL RedoParseManager *g_parseManager = NULL; THR_LOCAL RedoBufferManager *g_bufferManager = NULL; @@ -1689,25 +1690,42 @@ void ExtremeRtoFlushBuffer(RedoBufferInfo *bufferinfo, bool updateFsm) if (bufferinfo->pageinfo.page != NULL) { BufferDesc *bufDesc = GetBufferDescriptor(bufferinfo->buf - 1); if (bufferinfo->dirtyflag || XLByteLT(bufDesc->extra->lsn_on_disk, PageGetLSN(bufferinfo->pageinfo.page))) { - MarkBufferDirty(bufferinfo->buf); - if (!bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) { - int mode = WARNING; + if (ENABLE_DMS) { + if ((GetDmsBufCtrl(bufDesc->buf_id)->state & BUF_DIRTY_NEED_FLUSH) && + !bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) { + const uint32 shiftSz = 32; + ereport(PANIC, (errmsg("[SS] extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \ + lsn_page %X/%X, page %u/%u/%u %u", + (uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn), + (uint32)(bufDesc->extra->lsn_on_disk >> shiftSz), + (uint32)(bufDesc->extra->lsn_on_disk), + (uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz), + (uint32)(PageGetLSN(bufferinfo->pageinfo.page)), + bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode, + bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno))); + } + } else { + MarkBufferDirty(bufferinfo->buf); + if (!bufferinfo->dirtyflag && bufferinfo->blockinfo.forknum == MAIN_FORKNUM) { + int mode = WARNING; #ifdef USE_ASSERT_CHECKING - mode = PANIC; + mode = PANIC; +#endif + const uint32 shiftSz = 32; + ereport(mode, (errmsg("extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \ + lsn_page %X/%X, page %u/%u/%u %u", + (uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn), + (uint32)(bufDesc->extra->lsn_on_disk >> shiftSz), + (uint32)(bufDesc->extra->lsn_on_disk), + (uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz), + (uint32)(PageGetLSN(bufferinfo->pageinfo.page)), + bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode, + bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno))); + } +#ifdef USE_ASSERT_CHECKING + bufDesc->lsn_dirty = PageGetLSN(bufferinfo->pageinfo.page); #endif - const uint32 shiftSz = 32; - ereport(mode, (errmsg("extreme_rto not mark dirty:lsn %X/%X, lsn_disk %X/%X, \ - lsn_page %X/%X, page %u/%u/%u %u", - (uint32)(bufferinfo->lsn >> shiftSz), (uint32)(bufferinfo->lsn), - (uint32)(bufDesc->extra->lsn_on_disk >> shiftSz), (uint32)(bufDesc->extra->lsn_on_disk), - (uint32)(PageGetLSN(bufferinfo->pageinfo.page) >> shiftSz), - (uint32)(PageGetLSN(bufferinfo->pageinfo.page)), - bufferinfo->blockinfo.rnode.spcNode, bufferinfo->blockinfo.rnode.dbNode, - bufferinfo->blockinfo.rnode.relNode, bufferinfo->blockinfo.blkno))); } -#ifdef USE_ASSERT_CHECKING - bufDesc->lsn_dirty = PageGetLSN(bufferinfo->pageinfo.page); -#endif } UnlockReleaseBuffer(bufferinfo->buf); /* release buffer */ diff --git a/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp index 79049a699..2839b2ce7 100755 --- a/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp @@ -115,6 +115,7 @@ static void GetSlotIds(XLogReaderState *record); static void GetUndoSlotIds(XLogReaderState *record); STATIC LogDispatcher *CreateDispatcher(); static void DestroyRecoveryWorkers(); +static void SSDestroyRecoveryWorkers(); static void DispatchRecordWithPages(XLogReaderState *, List *); static void DispatchRecordWithoutPage(XLogReaderState *, List *); @@ -353,6 +354,45 @@ void AllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) #endif } +void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) +{ + XLogReaderState *initreader; + errno_t errorno = EOK; + + initreader = GetXlogReader(xlogreader); + initreader->isPRProcess = true; + g_dispatcher->rtoXlogBufState.readWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readPageWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readSource = 0; + g_dispatcher->rtoXlogBufState.failSource = 0; + g_dispatcher->rtoXlogBufState.xlogReadManagerState = READ_MANAGER_RUN; + g_dispatcher->rtoXlogBufState.targetRecPtr = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.expectLsn = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.waitRedoDone = 0; + g_dispatcher->rtoXlogBufState.readBuf = (char *)palloc0(XLOG_BLCKSZ); + g_dispatcher->rtoXlogBufState.readprivate = (void *)palloc0(MAXALIGN(privateLen)); + errorno = memset_s(g_dispatcher->rtoXlogBufState.readprivate, MAXALIGN(privateLen), 0, MAXALIGN(privateLen)); + securec_check(errorno, "", ""); + + g_dispatcher->rtoXlogBufState.errormsg_buf = (char *)palloc0(MAX_ERRORMSG_LEN + 1); + g_dispatcher->rtoXlogBufState.errormsg_buf[0] = '\0'; + + initreader->readBuf = g_dispatcher->rtoXlogBufState.readBuf; + errorno = memcpy_s(initreader->readBuf, XLOG_BLCKSZ, xlogreader->readBuf, xlogreader->readLen); + securec_check(errorno, "", ""); + initreader->errormsg_buf = g_dispatcher->rtoXlogBufState.errormsg_buf; + initreader->private_data = g_dispatcher->rtoXlogBufState.readprivate; + CopyDataFromOldReader(initreader, xlogreader); + g_dispatcher->rtoXlogBufState.initreader = initreader; + + g_recordbuffer = &g_dispatcher->rtoXlogBufState; + g_startupTriggerState = TRIGGER_NORMAL; + g_readManagerTriggerFlag = TRIGGER_NORMAL; +#ifdef USE_ASSERT_CHECKING + InitLsnCheckCtl(xlogreader->ReadRecPtr); +#endif +} + void HandleStartupInterruptsForExtremeRto() { Assert(AmStartupProcess()); @@ -400,7 +440,11 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE * ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker /* alloc for record readbuf */ - AllocRecordReadBuffer(xlogreader, privateLen); + if (ENABLE_DMS && ENABLE_DSS) { + SSAllocRecordReadBuffer(xlogreader, privateLen); + } else { + AllocRecordReadBuffer(xlogreader, privateLen); + } StartPageRedoWorkers(get_real_recovery_parallelism()); ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), @@ -620,7 +664,11 @@ static void StopRecoveryWorkers(int code, Datum arg) pg_atomic_write_u32(&g_dispatcher->rtoXlogBufState.readWorkerState, WORKER_STATE_EXIT); ShutdownWalRcv(); FreeAllocatedRedoItem(); - DestroyRecoveryWorkers(); + if (ENABLE_DSS && ENABLE_DMS) { + SSDestroyRecoveryWorkers(); + } else { + DestroyRecoveryWorkers(); + } g_startupTriggerState = TRIGGER_NORMAL; g_readManagerTriggerFlag = TRIGGER_NORMAL; ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), errmsg("parallel redo(startup) thread exit"))); @@ -667,6 +715,45 @@ static void DestroyRecoveryWorkers() } } +static void SSDestroyRecoveryWorkers() +{ + if (g_dispatcher != NULL) { + SpinLockAcquire(&(g_instance.comm_cxt.predo_cxt.destroy_lock)); + for (uint32 i = 0; i < g_dispatcher->pageLineNum; i++) { + DestroyPageRedoWorker(g_dispatcher->pageLines[i].batchThd); + DestroyPageRedoWorker(g_dispatcher->pageLines[i].managerThd); + for (uint32 j = 0; j < g_dispatcher->pageLines[i].redoThdNum; j++) { + DestroyPageRedoWorker(g_dispatcher->pageLines[i].redoThd[j]); + } + if (g_dispatcher->pageLines[i].chosedRTIds != NULL) { + pfree(g_dispatcher->pageLines[i].chosedRTIds); + } + } + DestroyPageRedoWorker(g_dispatcher->trxnLine.managerThd); + DestroyPageRedoWorker(g_dispatcher->trxnLine.redoThd); + + DestroyPageRedoWorker(g_dispatcher->readLine.managerThd); + DestroyPageRedoWorker(g_dispatcher->readLine.readThd); + pfree(g_dispatcher->rtoXlogBufState.readBuf); + pfree(g_dispatcher->rtoXlogBufState.errormsg_buf); + pfree(g_dispatcher->rtoXlogBufState.readprivate); +#ifdef USE_ASSERT_CHECKING + if (g_dispatcher->originLsnCheckAddr != NULL) { + pfree(g_dispatcher->originLsnCheckAddr); + g_dispatcher->originLsnCheckAddr = NULL; + g_dispatcher->lsnCheckCtl = NULL; + } +#endif + if (get_real_recovery_parallelism() > 1) { + (void)MemoryContextSwitchTo(g_dispatcher->oldCtx); + MemoryContextDelete(g_instance.comm_cxt.predo_cxt.parallelRedoCtx); + g_instance.comm_cxt.predo_cxt.parallelRedoCtx = NULL; + } + g_dispatcher = NULL; + SpinLockRelease(&(g_instance.comm_cxt.predo_cxt.destroy_lock)); + } +} + static bool RmgrRecordInfoValid(XLogReaderState *record, uint8 minInfo, uint8 maxInfo) { uint8 info = (XLogRecGetInfo(record) & (~XLR_INFO_MASK)); @@ -1680,6 +1767,8 @@ void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState, newState->currRecPtr = oldState->currRecPtr; newState->readBuf = oldState->readBuf; newState->readLen = oldState->readLen; + newState->preReadStartPtr = oldState->preReadStartPtr; + newState->preReadBuf = oldState->preReadBuf; newState->decoded_record = NULL; newState->main_data = NULL; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 01bf8d383..59d90e428 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -9912,6 +9912,12 @@ void StartupXLOG(void) } } + if (SSSKIP_REDO_REPLAY && t_thrd.xlog_cxt.InRecovery == true) { + /* do not need replay anything in SS standby mode */ + ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); + t_thrd.xlog_cxt.InRecovery = false; + } + ReadRemainSegsFile(); /* Determine whether it is currently in the switchover of streaming disaster recovery */ checkHadrInSwitchover(); @@ -10324,10 +10330,6 @@ void StartupXLOG(void) } } - if (ENABLE_DMS && SSSKIP_REDO_REPLAY) { - break; - } - /* * ShmemVariableCache->nextXid must be beyond record's xid. * @@ -11880,6 +11882,15 @@ void CreateCheckPoint(int flags) /* allow standby do checkpoint only after it has promoted AND has finished recovery. */ if (ENABLE_DMS && SS_STANDBY_MODE) { + if (shutdown) { + START_CRIT_SECTION(); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + t_thrd.shemem_ptr_cxt.ControlFile->state = DB_SHUTDOWNED; + t_thrd.shemem_ptr_cxt.ControlFile->time = (pg_time_t)time(NULL); + UpdateControlFile(); + LWLockRelease(ControlFileLock); + END_CRIT_SECTION(); + } return; } else if (SSFAILOVER_TRIGGER) { ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS failover] do not do CreateCheckpoint during failover"))); @@ -17158,7 +17169,12 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (readSource & XLOG_FROM_STREAM) { readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); } else { - readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); + if (SSFAILOVER_TRIGGER || SS_STANDBY_PROMOTING) { + readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, + xlogreader->readBuf, readTLI, NULL); + } else { + readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); + } } if (readLen > 0 || t_thrd.xlog_cxt.recoveryTriggered || !t_thrd.xlog_cxt.StandbyMode || DoEarlyExit()) { @@ -17885,7 +17901,7 @@ retry: if (!ss_ret) { ereport(emode_for_corrupt_record(emode, RecPtr), (errcode_for_file_access(), - errmsg("[ss] could not read from log file %s to offset %u: %m", + errmsg("[SS] could not read from log file %s to offset %u: %m", XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), t_thrd.xlog_cxt.readOff))); goto next_record_is_invalid; diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index 9b512f69e..1a9f0a38b 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -11,16 +11,16 @@ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PSL v2 for more details. - * --------------------------------------------------------------------------------------- + * ------------------------------------------------------------------------- * * dms_api.h * - * IDENTIFICATION - * src/interface/dms_api.h * - * --------------------------------------------------------------------------------------- + * IDENTIFICATION + * src/interface/dms_api.h + * + * ------------------------------------------------------------------------- */ - #ifndef __DMS_API_H__ #define __DMS_API_H__ @@ -28,7 +28,6 @@ extern "C" { #endif - #define DMS_SUCCESS 0 #define DMS_ERROR (-1) #ifdef OPENGAUSS @@ -382,6 +381,12 @@ typedef enum en_dms_cm_stat { DMS_CM_RES_STATE_COUNT = 3, } dms_cm_stat_t; +typedef struct st_dw_recovery_info { + unsigned long long bitmap_old_join; // the old-join-inst bitmap in dw_recovery phase + unsigned long long bitmap_old_remove; // the old-remove-inst bitmap in dw_recovery phase + unsigned long long bitmap_new_join; // the new-join-inst bitmap in dw_recovery phase +} dw_recovery_info_t; + typedef struct st_inst_list { unsigned char inst_id_list[DMS_MAX_INSTANCES]; unsigned char inst_id_count; @@ -473,17 +478,18 @@ typedef struct st_dcs_batch_buf { typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id); typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id, - unsigned int save_ctrl); + unsigned int save_ctrl); typedef int(*dms_get_dms_status)(void *db_handle); typedef void(*dms_set_dms_status)(void *db_handle, int status); typedef int(*dms_confirm_converting)(void *db_handle, char *pageid, unsigned char smon_chk, unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn, unsigned int *ver); typedef int(*dms_confirm_owner)(void *db_handle, char *pageid, unsigned char *lock_mode, unsigned char *is_edp, - unsigned long long *lsn); + unsigned long long *lsn); typedef int(*dms_flush_copy)(void *db_handle, char *pageid); typedef int(*dms_edp_lsn)(void *db_handle, char *pageid, unsigned long long *lsn); typedef int(*dms_disk_lsn)(void *db_handle, char *pageid, unsigned long long *lsn); typedef int(*dms_recovery)(void *db_handle, void *recovery_list, int is_reformer); +typedef int(*dms_dw_recovery)(void *db_handle, void *recovery_list, int is_reformer); typedef int(*dms_opengauss_startup)(void *db_handle); typedef int(*dms_opengauss_recovery_standby)(void *db_handle, int inst_id); typedef int(*dms_opengauss_recovery_primary)(void *db_handle, int inst_id); @@ -506,9 +512,9 @@ typedef unsigned long long(*dms_get_global_scn)(void *db_handle); typedef unsigned long long(*dms_get_global_lsn)(void *db_handle); typedef unsigned long long(*dms_get_global_flushed_lfn)(void *db_handle); typedef int(*dms_read_local_page4transfer)(void *db_handle, char pageid[DMS_PAGEID_SIZE], - dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); + dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); typedef int(*dms_try_read_local_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], - dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); + dms_lock_mode_t mode, dms_buf_ctrl_t **buf_ctrl); typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl); typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl); typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size); @@ -518,20 +524,20 @@ typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index); typedef void *(*dms_stack_push_cr_cursor)(void *db_handle); typedef void (*dms_stack_pop_cr_cursor)(void *db_handle); typedef void(*dms_init_cr_cursor)(void *cr_cursor, char pageid[DMS_PAGEID_SIZE], char xid[DMS_XID_SIZE], - unsigned long long query_scn, unsigned int ssn); + unsigned long long query_scn, unsigned int ssn); typedef void(*dms_init_index_cr_cursor)(void *cr_cursor, char pageid[DMS_PAGEID_SIZE], char xid[DMS_XID_SIZE], unsigned long long query_scn, unsigned int ssn, char entry[DMS_PAGEID_SIZE], char *index_profile); typedef void(*dms_init_check_cr_cursor)(void *cr_cursor, char rowid[DMS_ROWID_SIZE], char xid[DMS_XID_SIZE], - unsigned long long query_scn, unsigned int ssn); + unsigned long long query_scn, unsigned int ssn); typedef char *(*dms_get_wxid_from_cr_cursor)(void *cr_cursor); typedef unsigned char(*dms_get_instid_of_xid_from_cr_cursor)(void *db_handle, void *cr_cursor); -typedef int (*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page, +typedef int(*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page, unsigned char *is_empty_txn_list, unsigned char *exist_waiting_txn); -typedef int (*dms_reorganize_heap_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page, +typedef int(*dms_reorganize_heap_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page, unsigned char *fb_mark); -typedef int (*dms_reorganize_index_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page); +typedef int(*dms_reorganize_index_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page); typedef int(*dms_check_heap_page_visible_with_undo_snapshot)(void *db_handle, void *cr_cursor, void *page, - unsigned char *is_found); + unsigned char *is_found); typedef void(*dms_set_page_force_request)(void *db_handle, char pageid[DMS_PAGEID_SIZE]); typedef void(*dms_get_entry_pageid_from_cr_cursor)(void *cr_cursor, char index_entry_pageid[DMS_PAGEID_SIZE]); typedef void(*dms_get_index_profile_from_cr_cursor)(void *cr_cursor, char index_profile[DMS_INDEX_PROFILE_SIZE]); @@ -544,22 +550,22 @@ typedef void(*dms_mem_free)(void *context, void *ptr); typedef void(*dms_mem_reset)(void *context); // The maximum length of output_msg is 128 bytes. typedef int (*dms_process_broadcast)(void *db_handle, char *data, unsigned int len, char *output_msg, - unsigned int *output_msg_len); + unsigned int *output_msg_len); typedef int (*dms_process_broadcast_ack)(void *db_handle, char *data, unsigned int len); typedef int(*dms_get_txn_info)(void *db_handle, unsigned long long xid, - unsigned char is_scan, dms_txn_info_t *txn_info); + unsigned char is_scan, dms_txn_info_t *txn_info); typedef int(*dms_get_opengauss_xid_csn)(void *db_handle, dms_opengauss_xid_csn_t *csn_req, - dms_opengauss_csn_result_t *csn_ack); + dms_opengauss_csn_result_t *csn_ack); typedef int(*dms_get_opengauss_update_xid)(void *db_handle, unsigned long long xid, - unsigned int t_infomask, unsigned int t_infomask2, unsigned long long *uxid); + unsigned int t_infomask, unsigned int t_infomask2, unsigned long long *uxid); typedef int(*dms_get_opengauss_txn_status)(void *db_handle, unsigned long long xid, unsigned char type, - unsigned char* status); + unsigned char* status); typedef int(*dms_opengauss_lock_buffer)(void *db_handle, int buffer, unsigned char lock_mode, - unsigned char* curr_mode); + unsigned char* curr_mode); typedef int(*dms_get_txn_snapshot)(void *db_handle, unsigned int xmap, dms_txn_snapshot_t *txn_snapshot); typedef int(*dms_get_opengauss_txn_snapshot)(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot); typedef void (*dms_log_output)(dms_log_id_t log_type, dms_log_level_t log_level, const char *code_file_name, - unsigned int code_line_num, const char *module_name, const char *format, ...); + unsigned int code_line_num, const char *module_name, const char *format, ...); typedef int (*dms_log_flush)(void *db_handle, unsigned long long *lsn); typedef int(*dms_process_edp)(void *db_handle, dms_edp_info_t *pages, unsigned int count); typedef void (*dms_clean_ctrl_edp)(void *db_handle, dms_buf_ctrl_t *dms_ctrl); @@ -567,6 +573,7 @@ typedef char *(*dms_display_pageid)(char *display_buf, unsigned int count, char typedef char *(*dms_display_xid)(char *display_buf, unsigned int count, char *xid); typedef char *(*dms_display_rowid)(char *display_buf, unsigned int count, char *rowid); typedef int (*dms_drc_buf_res_rebuild)(void *db_handle); +typedef int (*dms_drc_buf_res_rebuild_parallel)(void *db_handle, unsigned char thread_index, unsigned char thread_num); typedef unsigned char(*dms_ckpt_session)(void *db_handle); typedef void (*dms_check_if_build_complete)(void *db_handle, unsigned int *build_complete); typedef int (*dms_db_is_primary)(void *db_handle); @@ -590,11 +597,11 @@ typedef void (*dms_get_rowid_by_rmid)(void *db_handle, unsigned short rmid, char typedef void (*dms_get_sql_from_session)(void *db_handle, unsigned short sid, char *sql_str, unsigned int sql_str_len); typedef void (*dms_get_itl_lock_by_xid)(void *db_handle, char xid[DMS_XID_SIZE], char *ilock, unsigned int ilock_len); typedef void (*dms_check_tlock_status)(void *db_handle, unsigned int type, unsigned short sid, - unsigned long long tableid, unsigned int *in_use); + unsigned long long tableid, unsigned int *in_use); typedef void (*dms_get_tlock_msg_by_tid)(void *db_handle, unsigned long long table_id, unsigned int type, char *rsp, - unsigned int rsp_len, unsigned int *tlock_cnt); + unsigned int rsp_len, unsigned int *tlock_cnt); typedef void (*dms_get_tlock_msg_by_rm)(void *db_handle, unsigned short sid, unsigned short rmid, int type, char *tlock, - unsigned int tlock_len); + unsigned int tlock_len); typedef int (*dms_switchover_demote)(void *db_handle); typedef int (*dms_switchover_promote)(void *db_handle); @@ -618,6 +625,7 @@ typedef struct st_dms_callback { dms_edp_lsn edp_lsn; dms_disk_lsn disk_lsn; dms_recovery recovery; + dms_dw_recovery dw_recovery; dms_db_is_primary db_is_primary; dms_get_open_status get_open_status; dms_undo_init undo_init; @@ -626,6 +634,7 @@ typedef struct st_dms_callback { dms_tx_rollback_finish tx_rollback_finish; dms_recovery_in_progress recovery_in_progress; dms_drc_buf_res_rebuild dms_reform_rebuild_buf_res; + dms_drc_buf_res_rebuild_parallel dms_reform_rebuild_parallel; dms_check_if_build_complete check_if_build_complete; // used in reform for opengauss @@ -763,10 +772,10 @@ typedef struct st_dms_profile { unsigned char rdma_rpc_bind_core_end; char ock_log_path[DMS_OCK_LOG_PATH_LEN]; unsigned char enable_reform; - //ock scrlock configs + // ock scrlock configs unsigned char enable_scrlock; unsigned int primary_inst_id; - unsigned char enable_ssl; + unsigned char enable_ssl; unsigned int scrlock_log_level; unsigned char enable_scrlock_worker_bind_core; unsigned int scrlock_worker_cnt; @@ -776,6 +785,7 @@ typedef struct st_dms_profile { unsigned char enable_scrlock_server_sleep_mode; unsigned char scrlock_server_bind_core_start; unsigned char scrlock_server_bind_core_end; + unsigned char parallel_thread_num; } dms_profile_t; typedef struct st_logger_param { @@ -793,7 +803,7 @@ typedef struct st_logger_param { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 51 +#define DMS_LOCAL_VERSION 52 #ifdef __cplusplus } diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm index 084754ae6..ded34711f 100644 --- a/src/tools/msvc/Solution.pm +++ b/src/tools/msvc/Solution.pm @@ -52,7 +52,7 @@ sub _new $options->{wal_segsize} = 16 unless $options->{wal_segsize}; # undef or 0 means default die "Bad wal_segsize $options->{wal_segsize}" - unless grep { $_ == $options->{wal_segsize} } (1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024); + unless grep { $_ == $options->{wal_segsize} } (1, 2, 4, 8, 16, 32, 64, 128, 256, 512); $self->DeterminePlatform();