forked from openGauss-Ecosystem/openGauss-server
!3038 【共享存储】在页面淘汰过程中,遇到DMS特定的待刷盘的页面,选择刷下去,避免无buffer可用
Merge pull request !3038 from 董宁/dn_fix9_up
This commit is contained in:
commit
93cf8543c1
|
@ -30,6 +30,7 @@
|
|||
#include "ddes/dms/ss_dms_bufmgr.h"
|
||||
#include "securec_check.h"
|
||||
#include "miscadmin.h"
|
||||
#include "access/double_write.h"
|
||||
|
||||
void InitDmsBufCtrl(void)
|
||||
{
|
||||
|
@ -765,3 +766,98 @@ bool DmsCheckBufAccessible()
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool SSTryFlushBuffer(BufferDesc *buf)
|
||||
{
|
||||
//copy from BufferAlloc
|
||||
if (!backend_can_flush_dirty_page()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
|
||||
if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) {
|
||||
if (!free_space_enough(buf->buf_id)) {
|
||||
LWLockRelease(buf->content_lock);
|
||||
return false;
|
||||
}
|
||||
uint32 pos = 0;
|
||||
pos = first_version_dw_single_flush(buf);
|
||||
t_thrd.proc->dw_pos = pos;
|
||||
FlushBuffer(buf, NULL);
|
||||
g_instance.dw_single_cxt.single_flush_state[pos] = true;
|
||||
t_thrd.proc->dw_pos = -1;
|
||||
} else {
|
||||
FlushBuffer(buf, NULL);
|
||||
}
|
||||
LWLockRelease(buf->content_lock);
|
||||
ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool SSTrySegFlushBuffer(BufferDesc* buf)
|
||||
{
|
||||
//copy from SegBufferAlloc
|
||||
if (!backend_can_flush_dirty_page()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED)) {
|
||||
FlushOneSegmentBuffer(buf->buf_id + 1);
|
||||
LWLockRelease(buf->content_lock);
|
||||
ScheduleBufferTagForWriteback(t_thrd.storage_cxt.BackendWritebackContext, &buf->tag);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** true :1)this buffer dms think need flush, and flush success
|
||||
* 2) no need flush
|
||||
* false: this flush dms think need flush, but cannot flush
|
||||
*/
|
||||
bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc)
|
||||
{
|
||||
if (!ENABLE_DMS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (IsInitdb) {
|
||||
return true;
|
||||
}
|
||||
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||
// wait dw_init finish
|
||||
while (!g_instance.dms_cxt.dw_init) {
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
|
||||
XLogRecPtr pagelsn = BufferGetLSN(buf_desc);
|
||||
if (!SS_IN_REFORM) {
|
||||
ereport(PANIC,
|
||||
(errmsg("[SS] this buffer should not exist with BUF_DIRTY_NEED_FLUSH but not in reform, "
|
||||
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u",
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno,
|
||||
buf_desc->extra->seg_blockno)));
|
||||
}
|
||||
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||
ereport(LOG,
|
||||
(errmsg("[SS flush copy] ready to flush buffer with need flush, "
|
||||
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||
"is in flush_copy:%d, in recovery:%d",
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||
(unsigned long long)pagelsn, (unsigned int)buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno,
|
||||
in_flush_copy, in_recovery)));
|
||||
if (IsSegmentBufferID(buf_desc->buf_id)) {
|
||||
return SSTrySegFlushBuffer(buf_desc);
|
||||
} else {
|
||||
return SSTryFlushBuffer(buf_desc);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
|
@ -1283,6 +1283,11 @@ static int CBFlushCopy(void *db_handle, char *pageid)
|
|||
smgrcloseall();
|
||||
}
|
||||
|
||||
// only 1) primary restart 2) failover need flush_copy
|
||||
if (SS_REFORM_REFORMER && g_instance.dms_cxt.dms_status == DMS_STATUS_IN && !SS_STANDBY_FAILOVER) {
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
BufferTag* tag = (BufferTag*)pageid;
|
||||
Buffer buffer;
|
||||
SegSpace *spc = NULL;
|
||||
|
@ -1304,7 +1309,7 @@ static int CBFlushCopy(void *db_handle, char *pageid)
|
|||
ErrorData* edata = CopyErrorData();
|
||||
FlushErrorState();
|
||||
FreeErrorData(edata);
|
||||
ereport(PANIC, (errmsg("[SS Flush Copy] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
|
||||
ereport(PANIC, (errmsg("[SS flush copy] Error happend, spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
}
|
||||
|
@ -1417,6 +1422,7 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char
|
|||
if (ss_reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_failover = true;
|
||||
if (role == DMS_ROLE_REFORMER) {
|
||||
g_instance.dms_cxt.dw_init = false;
|
||||
// variable set order: SharedRecoveryInProgress -> failover_triggered -> dms_role
|
||||
volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
|
|
|
@ -327,7 +327,6 @@ void ss_failover_dw_init()
|
|||
}
|
||||
}
|
||||
ckpt_shutdown_pagewriter();
|
||||
g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false;
|
||||
ss_failover_dw_init_internal();
|
||||
g_instance.dms_cxt.dw_init = true;
|
||||
}
|
||||
|
@ -346,5 +345,6 @@ void ss_switchover_promoting_dw_init()
|
|||
dw_exit(false);
|
||||
dw_ext_init();
|
||||
dw_init();
|
||||
g_instance.dms_cxt.dw_init = true;
|
||||
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover] dw init finished")));
|
||||
}
|
|
@ -2746,7 +2746,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
|
|||
/* Pin the buffer and then release the buffer spinlock */
|
||||
PinBuffer_Locked(buf);
|
||||
|
||||
if (!SSPageCheckIfCanEliminate(buf)) {
|
||||
if (!SSHelpFlushBufferIfNeed(buf)) {
|
||||
// for dms this page cannot eliminate, get another one
|
||||
UnpinBuffer(buf, true);
|
||||
continue;
|
||||
|
@ -6350,6 +6350,7 @@ void CheckIOState(volatile void *buf_desc)
|
|||
bool StartBufferIO(BufferDesc *buf, bool for_input)
|
||||
{
|
||||
uint32 buf_state;
|
||||
bool dms_need_flush = false; // used in dms
|
||||
|
||||
Assert(!t_thrd.storage_cxt.InProgressBuf);
|
||||
|
||||
|
@ -6390,8 +6391,15 @@ bool StartBufferIO(BufferDesc *buf, bool for_input)
|
|||
WaitIO(buf);
|
||||
}
|
||||
|
||||
if (ENABLE_DMS) {
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||
dms_need_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
/* Once we get here, there is definitely no I/O active on this buffer */
|
||||
if (for_input ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) {
|
||||
if (for_input ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY) && !dms_need_flush) {
|
||||
/* someone else already did the I/O */
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
LWLockRelease(buf->io_in_progress_lock);
|
||||
|
@ -6483,6 +6491,23 @@ static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint32 s
|
|||
if (ENABLE_INCRE_CKPT) {
|
||||
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
||||
remove_dirty_page_from_queue(buf);
|
||||
} else if (ENABLE_DMS) {
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||
if (!(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
|
||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||
}
|
||||
buf_ctrl->state &= ~BUF_DIRTY_NEED_FLUSH;
|
||||
XLogRecPtr pagelsn = BufferGetLSN(buf);
|
||||
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||
ereport(LOG,
|
||||
(errmsg("[SS flush copy] finish flush buffer with need flush, "
|
||||
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||
"is in flush_copy:%d, in recovery:%d",
|
||||
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode,
|
||||
buf->tag.forkNum, buf->tag.blockNum, (unsigned long long)pagelsn,
|
||||
(unsigned int)buf->extra->seg_fileno, buf->extra->seg_blockno, in_flush_copy, in_recovery)));
|
||||
} else {
|
||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||
|
|
|
@ -75,6 +75,7 @@ void AbortSegBufferIO(void)
|
|||
static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
|
||||
{
|
||||
uint32 buf_state;
|
||||
bool dms_need_flush = false; // used in dms
|
||||
|
||||
SegmentCheck(!InProgressBuf);
|
||||
|
||||
|
@ -98,7 +99,14 @@ static bool SegStartBufferIO(BufferDesc *buf, bool forInput)
|
|||
WaitIO(buf);
|
||||
}
|
||||
|
||||
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) {
|
||||
if (ENABLE_DMS) {
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||
if (buf_ctrl->state & BUF_DIRTY_NEED_FLUSH) {
|
||||
dms_need_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY) && !dms_need_flush) {
|
||||
/* IO finished */
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
LWLockRelease(buf->io_in_progress_lock);
|
||||
|
@ -128,6 +136,23 @@ void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bit
|
|||
if (ENABLE_INCRE_CKPT) {
|
||||
if (!XLogRecPtrIsInvalid(pg_atomic_read_u64(&buf->extra->rec_lsn))) {
|
||||
remove_dirty_page_from_queue(buf);
|
||||
} else if (ENABLE_DMS) {
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf->buf_id);
|
||||
if (!(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
|
||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||
(errmsg("buffer is dirty but not in dirty page queue in SegTerminateBufferIO"))));
|
||||
}
|
||||
buf_ctrl->state &= ~BUF_DIRTY_NEED_FLUSH;
|
||||
XLogRecPtr pagelsn = BufferGetLSN(buf);
|
||||
bool in_flush_copy = SS_IN_FLUSHCOPY;
|
||||
bool in_recovery = !g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag;
|
||||
ereport(LOG,
|
||||
(errmsg("[SS flush copy] finish seg flush buffer with need flush, "
|
||||
"spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u, page lsn (0x%llx), seg info:%u-%u, reform phase "
|
||||
"is in flush_copy:%d, in recovery:%d",
|
||||
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode,
|
||||
buf->tag.forkNum, buf->tag.blockNum, (unsigned long long)pagelsn,
|
||||
(unsigned int)buf->extra->seg_fileno, buf->extra->seg_blockno, in_flush_copy, in_recovery)));
|
||||
} else {
|
||||
ereport(PANIC, (errmodule(MOD_INCRE_CKPT), errcode(ERRCODE_INVALID_BUFFER),
|
||||
(errmsg("buffer is dirty but not in dirty page queue in TerminateBufferIO_common"))));
|
||||
|
@ -704,7 +729,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
|
|||
|
||||
SegPinBufferLocked(buf, &new_tag);
|
||||
|
||||
if (!SSPageCheckIfCanEliminate(buf)) {
|
||||
if (!SSHelpFlushBufferIfNeed(buf)) {
|
||||
SegUnpinBuffer(buf);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -77,5 +77,6 @@ void BufValidateDrc(BufferDesc *buf_desc);
|
|||
bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc);
|
||||
bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer);
|
||||
bool DmsCheckBufAccessible();
|
||||
bool SSHelpFlushBufferIfNeed(BufferDesc* buf_desc);
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue