!1590 修复创建多个订阅端,后创建的订阅端生效耗时较长的问题

Merge pull request !1590 from pengjiong/fix_snap_too_long
This commit is contained in:
opengauss-bot 2022-03-16 02:06:11 +00:00 committed by Gitee
commit f3e27c51bc
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 90 additions and 64 deletions

View File

@ -1050,7 +1050,7 @@ static void knl_t_autovacuum_init(knl_t_autovacuum_context* autovacuum_cxt)
static void KnlTApplyLauncherInit(knl_t_apply_launcher_context* applyLauncherCxt) static void KnlTApplyLauncherInit(knl_t_apply_launcher_context* applyLauncherCxt)
{ {
applyLauncherCxt->got_SIGHUP = false; applyLauncherCxt->got_SIGHUP = false;
applyLauncherCxt->got_SIGUSR2 = false; applyLauncherCxt->newWorkerRequest = false;
applyLauncherCxt->got_SIGTERM = false; applyLauncherCxt->got_SIGTERM = false;
applyLauncherCxt->onCommitLauncherWakeup = false; applyLauncherCxt->onCommitLauncherWakeup = false;
applyLauncherCxt->applyLauncherShm = NULL; applyLauncherCxt->applyLauncherShm = NULL;

View File

@ -59,6 +59,8 @@ static const int DEFAULT_NAPTIME_PER_CYCLE = 180000L;
static const int wal_retrieve_retry_interval = 5000; static const int wal_retrieve_retry_interval = 5000;
static const int PG_STAT_GET_SUBSCRIPTION_COLS = 7; static const int PG_STAT_GET_SUBSCRIPTION_COLS = 7;
static const int WAIT_SUB_WORKER_ATTACH_CYCLE = 50000L; /* 50ms */
static const int WAIT_SUB_WORKER_ATTACH_TIMEOUT = 1000000L; /* 1s */
static void ApplyLauncherWakeup(void); static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_launcher_onexit(int code, Datum arg);
@ -175,6 +177,54 @@ static LogicalRepWorker *logicalrep_worker_find(Oid subid)
return res; return res;
} }
/*
* We can't start another apply worker when another one is still
* starting up (or failed while doing so), so just sleep for a bit
* more; that worker will wake us up again as soon as it's ready.
* We will only wait 1 seconds for this to happen however. Note that
* failure to connect to a particular database is not a problem here,
* because the worker removes itself from the startingWorker
* pointer before trying to connect. Problems detected by the
* The problems that may cause this code to fire are errors
* in the earlier sections of ApplyWorkerMain, before the worker
* removes the LogicalRepWorker from the startingWorker pointer.
*/
static void WaitForReplicationWorkerAttach()
{
int timeout = WAIT_SUB_WORKER_ATTACH_TIMEOUT;
while (timeout > 0) {
CHECK_FOR_INTERRUPTS();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker == NULL) {
/* worker has started, we are done */
LWLockRelease(LogicalRepWorkerLock);
break;
}
LWLockRelease(LogicalRepWorkerLock);
pg_usleep(WAIT_SUB_WORKER_ATTACH_CYCLE);
timeout -= WAIT_SUB_WORKER_ATTACH_CYCLE;
}
if (timeout <= 0) {
/* worker took too long time to start */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker;
ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long time to start, so canceled it",
worker->subid)));
worker->dbid = InvalidOid;
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->proc = NULL;
worker->workerLaunchTime = 0;
t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL;
LWLockRelease(LogicalRepWorkerLock);
}
}
/* /*
* Start new apply background worker. * Start new apply background worker.
*/ */
@ -226,6 +276,7 @@ static void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, O
LWLockRelease(LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock);
SendPostmasterSignal(PMSIGNAL_START_APPLY_WORKER); SendPostmasterSignal(PMSIGNAL_START_APPLY_WORKER);
WaitForReplicationWorkerAttach();
} }
/* /*
@ -380,7 +431,9 @@ static void logicalrep_worker_onexit(int code, Datum arg)
{ {
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
logicalrep_worker_detach(); logicalrep_worker_detach();
ApplyLauncherWakeup(); if (t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid != 0) {
gs_signal_send(t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid, SIGUSR1);
}
} }
/* SIGHUP: set flag to reload configuration at next convenient time */ /* SIGHUP: set flag to reload configuration at next convenient time */
@ -395,18 +448,35 @@ static void logicalrepLauncherSighub(SIGNAL_ARGS)
errno = saveErrno; errno = saveErrno;
} }
/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */ /*
* SIGUSR2: request to start a new worker, for CREATE/ALTER subscription.
* we will loop pg_subscription and launch worker immediately.
*/
static void logicalrep_launcher_sigusr2(SIGNAL_ARGS) static void logicalrep_launcher_sigusr2(SIGNAL_ARGS)
{ {
int save_errno = errno; int save_errno = errno;
t_thrd.applylauncher_cxt.got_SIGUSR2 = true; t_thrd.applylauncher_cxt.newWorkerRequest = true;
if (t_thrd.proc) if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch); SetLatch(&t_thrd.proc->procLatch);
}
errno = save_errno; errno = save_errno;
} }
/*
* SIGUSR1: a worker just finished, or failed to start.
* set latch, but we may not try to launch worker immediately, cause we don't
* want a abnormal worker restart too fast to cause too many error log.
*/
static void LogicalrepLauncherSigusr1(SIGNAL_ARGS)
{
int save_errno = errno;
if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch);
}
errno = save_errno;
}
/* /*
* ApplyLauncherShmemSize * ApplyLauncherShmemSize
@ -483,7 +553,6 @@ void ApplyLauncherMain()
sigjmp_buf localSigjmpBuf; sigjmp_buf localSigjmpBuf;
TimestampTz last_start_time = 0; TimestampTz last_start_time = 0;
char username[NAMEDATALEN]; char username[NAMEDATALEN];
int nextLaunchSub = 0;
/* we are a postmaster subprocess now */ /* we are a postmaster subprocess now */
IsUnderPostmaster = true; IsUnderPostmaster = true;
@ -525,7 +594,7 @@ void ApplyLauncherMain()
gspqsignal(SIGALRM, handle_sig_alarm); gspqsignal(SIGALRM, handle_sig_alarm);
gspqsignal(SIGPIPE, SIG_IGN); gspqsignal(SIGPIPE, SIG_IGN);
gspqsignal(SIGUSR1, procsignal_sigusr1_handler); gspqsignal(SIGUSR1, LogicalrepLauncherSigusr1);
gspqsignal(SIGUSR2, logicalrep_launcher_sigusr2); gspqsignal(SIGUSR2, logicalrep_launcher_sigusr2);
gspqsignal(SIGFPE, FloatExceptionHandler); gspqsignal(SIGFPE, FloatExceptionHandler);
gspqsignal(SIGCHLD, SIG_DFL); gspqsignal(SIGCHLD, SIG_DFL);
@ -588,60 +657,19 @@ void ApplyLauncherMain()
MemoryContext oldctx; MemoryContext oldctx;
TimestampTz now; TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE; long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
bool canLaunch = true;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
now = GetCurrentTimestamp(); now = GetCurrentTimestamp();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); /*
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) { * Limit the start retry to once a wal_retrieve_retry_interval, but if it's a request from
LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker; * CREATE/ALTER subscription, we will try to launch worker immediately.
int waitTime = 1 * 1000; */
if (t_thrd.applylauncher_cxt.newWorkerRequest ||
/* TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) {
* We can't start another apply worker when another one is still if (t_thrd.applylauncher_cxt.newWorkerRequest) {
* starting up (or failed while doing so), so just sleep for a bit t_thrd.applylauncher_cxt.newWorkerRequest = false;
* more; that worker will wake us up again as soon as it's ready.
* We will only wait 1 seconds (up to a maximum
* of 60 seconds) for this to happen however. Note that failure
* to connect to a particular database is not a problem here,
* because the worker removes itself from the startingWorker
* pointer before trying to connect. Problems detected by the
* postmaster (like fork() failure) are also reported and handled
* differently. The only problems that may cause this code to
* fire are errors in the earlier sections of ApplyWorkerMain,
* before the worker removes the LogicalRepWorker from the
* startingWorker pointer.
*/
if (TimestampDifferenceExceeds(worker->workerLaunchTime, now, waitTime)) {
LWLockRelease(LogicalRepWorkerLock);
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
/*
* No other process can put a worker in starting mode, so if
* startingWorker is still INVALID after exchanging our lock,
* we assume it's the same one we saw above (so we don't
* recheck the launch time).
*/
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) {
worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker;
ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long "
"time to start, so canceled it",
worker->subid)));
worker->dbid = InvalidOid;
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->proc = NULL;
worker->workerLaunchTime = 0;
t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL;
}
} else {
canLaunch = false;
} }
}
LWLockRelease(LogicalRepWorkerLock); /* either shared or exclusive */
/* Limit the start retry to once a wal_retrieve_retry_interval */
if (canLaunch && TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) {
/* Use temporary context for the database list and worker info. */ /* Use temporary context for the database list and worker info. */
subctx = AllocSetContextCreate(TopMemoryContext, "Logical Replication Launcher sublist", subctx = AllocSetContextCreate(TopMemoryContext, "Logical Replication Launcher sublist",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
@ -671,14 +699,12 @@ void ApplyLauncherMain()
} }
/* /*
* Try to launch the subscription worker one by one, to avoid launch an abnormal * Try to launch the subscription worker one by one, we will wait at the end of
* subscription again and again and normal subscription become starve. * logicalrep_worker_launch, to make sure in the next loop, the previous worker
* has started or aborted definitely.
*/ */
if (pendingSubList != NIL) { foreach(lc, pendingSubList) {
nextLaunchSub = nextLaunchSub % list_length(pendingSubList); Subscription *readyToLaunchSub = (Subscription*)lfirst(lc);
Subscription *readyToLaunchSub = (Subscription*)list_nth(pendingSubList, nextLaunchSub);
nextLaunchSub++;
logicalrep_worker_launch(readyToLaunchSub->dbid, readyToLaunchSub->oid, logicalrep_worker_launch(readyToLaunchSub->dbid, readyToLaunchSub->oid,
readyToLaunchSub->name, readyToLaunchSub->owner); readyToLaunchSub->name, readyToLaunchSub->owner);
last_start_time = now; last_start_time = now;

View File

@ -3206,7 +3206,7 @@ typedef struct knl_t_lsc_context {
typedef struct knl_t_apply_launcher_context { typedef struct knl_t_apply_launcher_context {
/* Flags set by signal handlers */ /* Flags set by signal handlers */
volatile sig_atomic_t got_SIGHUP; volatile sig_atomic_t got_SIGHUP;
volatile sig_atomic_t got_SIGUSR2; volatile sig_atomic_t newWorkerRequest;
volatile sig_atomic_t got_SIGTERM; volatile sig_atomic_t got_SIGTERM;
bool onCommitLauncherWakeup; bool onCommitLauncherWakeup;
ApplyLauncherShmStruct *applyLauncherShm; ApplyLauncherShmStruct *applyLauncherShm;