!2314 根据当前可用线程数自动调整计划并行度,防止出现No free procs错误

Merge pull request !2314 from xiyanziran/dev
This commit is contained in:
opengauss-bot 2022-11-17 02:34:04 +00:00 committed by Gitee
commit e20304beae
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
11 changed files with 413 additions and 28 deletions

View File

@ -1538,6 +1538,28 @@ static void InitSqlConfigureNamesBool()
NULL,
NULL,
NULL},
{{"auto_dop",
PGC_USERSET,
NODE_ALL,
STREAMING,
gettext_noop("Sets the streaming engine enable flag."),
NULL},
&u_sess->opt_cxt.auto_dop,
false,
NULL,
NULL,
NULL},
{{"auto_dop_freeprocs",
PGC_USERSET,
NODE_ALL,
STREAMING,
gettext_noop("Automatically adjust query_dop according to the number of free procs."),
NULL},
&u_sess->opt_cxt.auto_dop_freeprocs,
false,
NULL,
NULL,
NULL},
#ifndef ENABLE_MULTIPLE_NODES
{{"plsql_show_all_error",
PGC_USERSET,
@ -1999,6 +2021,32 @@ static void InitSqlConfigureNamesInt()
parse_query_dop,
AssignQueryDop,
NULL},
{{"auto_dop_freeprocs_threshold",
PGC_USERSET,
NODE_ALL,
QUERY_TUNING,
gettext_noop("User-defined degree of parallelism."),
NULL},
&u_sess->opt_cxt.auto_dop_freeprocs_th,
10,
0,
INT_MAX,
NULL,
NULL,
NULL},
{{"auto_dop_join_threshold",
PGC_USERSET,
NODE_ALL,
QUERY_TUNING,
gettext_noop("The maximum join threshold for degrading the DOP to 2."),
NULL},
&u_sess->opt_cxt.auto_dop_join_th,
4,
1,
INT_MAX,
NULL,
NULL,
NULL},
{{"plan_mode_seed",
PGC_USERSET,
NODE_ALL,
@ -2981,6 +3029,7 @@ static void AssignQueryDop(int newval, void* extra)
u_sess->opt_cxt.query_dop = newval % 1000;
u_sess->opt_cxt.query_dop_store = u_sess->opt_cxt.query_dop;
u_sess->opt_cxt.max_query_dop = -1; /* turn off dynamic smp */
u_sess->opt_cxt.saved_dop = u_sess->opt_cxt.query_dop;
} else if (newval <= 0) {
if (newval == -1) {
/* -1 means not parallel */

View File

@ -24,23 +24,271 @@
#include "postgres.h"
#include "knl/knl_variable.h"
#include "optimizer/streamplan.h"
#include "nodes/print.h"
static int stream_dop_walker(Plan* plan);
static int dop_join_degree_walker(Query* parse);
void InitDynamicSmp()
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
u_sess->opt_cxt.query_dop = u_sess->opt_cxt.saved_dop;
return;
}
void ChooseStartQueryDop(int hashTableCount)
void ChooseStartQueryDop(Query* parse, int hashTableCount)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
int n_sess, n_free_procs = 0, n_other_procs = 0;
int n_dop;
if (u_sess->opt_cxt.auto_dop_join_th > 0 &&
dop_join_degree_walker(parse) >= u_sess->opt_cxt.auto_dop_join_th)
u_sess->opt_cxt.query_dop = 2;
if (!u_sess->opt_cxt.auto_dop_freeprocs)
return;
/* Current number of available threads */
#ifdef USE_ASSERT_CHECKING
GetNFreeProcs(&n_free_procs, &n_other_procs);
#endif
n_free_procs = g_instance.proc_base->nFreeProcs;
n_other_procs = g_instance.proc_base->nFreeUtilProcs;
/* When the number of free procs is less than the threshold */
if (n_free_procs < u_sess->opt_cxt.auto_dop_freeprocs_th) {
u_sess->opt_cxt.query_dop = 1;
return;
}
/* Current sessions */
n_sess = MAX_BACKEND_SLOT - n_free_procs - n_other_procs;
if (n_sess <= 0)
return;
/* available DOP = free_procs / sessions */
n_dop = n_free_procs / n_sess;
/* available DOP >= query_dop */
if (n_dop >= u_sess->opt_cxt.query_dop) {
return;
}
if (n_dop < 1) {
n_dop = 1;
}
u_sess->opt_cxt.query_dop = n_dop;
return;
}
void OptimizePlanDop(PlannedStmt* plannedStmt)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
int stream_level = 0;
elog(WARNING, "stream_level %d",stream_dop_walker(plannedStmt->planTree));
if (u_sess->opt_cxt.query_dop <= 2)
return;
if (plannedStmt->num_streams <= 0)
return;
stream_level = stream_dop_walker(plannedStmt->planTree);
if (stream_level <= 1)
return;
else {
int dop = (int)pow(u_sess->opt_cxt.query_dop, 1.0/stream_level);
if (dop < 2)
dop = 2;
plannedStmt->plan_dop = dop;
return;
}
}
bool IsDynamicSmpEnabled()
{
return IS_STREAM_PLAN && u_sess->opt_cxt.max_query_dop >= 0 && !u_sess->attr.attr_common.IsInplaceUpgrade &&
!IsInitdb;
if (IS_STREAM_PLAN && u_sess->opt_cxt.auto_dop && u_sess->opt_cxt.query_dop > 1) {
return true;
}
return false;
}
static int stream_dop_walker(Plan* plan)
{
if (plan == NULL)
return 0;
switch (nodeTag(plan)) {
/* Add Row Adapter */
case T_CStoreScan:
case T_DfsScan:
case T_DfsIndexScan:
case T_CStoreIndexScan:
case T_CStoreIndexHeapScan:
case T_CStoreIndexCtidScan:
#ifdef ENABLE_MULTIPLE_NODES
case T_TsStoreScan:
#endif /* ENABLE_MULTIPLE_NODES */
case T_ForeignScan:
return 0;
case T_ExtensiblePlan: {
int e_ret = 0, t_ret;
ListCell* lc = NULL;
ExtensiblePlan* ext_plans = (ExtensiblePlan*) plan;
foreach (lc, ext_plans->extensible_plans) {
Plan* plan = (Plan*)lfirst(lc);
t_ret = stream_dop_walker(plan);
if (t_ret > e_ret)
e_ret = t_ret;
}
return e_ret;
} break;
case T_RemoteQuery:
case T_Limit:
case T_PartIterator:
case T_SetOp:
case T_Group:
case T_Unique:
case T_BaseResult:
case T_ProjectSet:
case T_Sort:
case T_Material:
case T_StartWithOp:
case T_WindowAgg:
case T_Hash:
case T_Agg:
case T_RowToVec:
case T_VecRemoteQuery:
return stream_dop_walker(plan->lefttree);
case T_MergeJoin:
case T_NestLoop:
case T_HashJoin:
case T_RecursiveUnion: {
int l_ret, r_ret;
l_ret = stream_dop_walker(plan->lefttree);
r_ret = stream_dop_walker(plan->righttree);
return l_ret > r_ret ? l_ret : r_ret;
} break;
case T_Append: {
int a_ret = 0, t_ret;
Append* append = (Append*)plan;
ListCell* lc = NULL;
foreach (lc, append->appendplans) {
Plan* plan = (Plan*)lfirst(lc);
t_ret = stream_dop_walker(plan);
if (t_ret > a_ret)
a_ret = t_ret;
}
return a_ret;
} break;
case T_ModifyTable: {
int m_ret = 0, t_ret;
ModifyTable* mt = (ModifyTable*)plan;
ListCell* lc = NULL;
foreach (lc, mt->plans) {
Plan* plan = (Plan*)lfirst(lc);
t_ret = stream_dop_walker(plan);
if (t_ret > m_ret)
m_ret = t_ret;
}
return m_ret;
} break;
case T_SubqueryScan: {
SubqueryScan* ss = (SubqueryScan*)plan;
if (ss->subplan)
return stream_dop_walker(ss->subplan);
} break;
case T_MergeAppend: {
int m_ret = 0, t_ret;
MergeAppend* ma = (MergeAppend*)plan;
ListCell* lc = NULL;
foreach (lc, ma->mergeplans) {
Plan* plan = (Plan*)lfirst(lc);
t_ret = stream_dop_walker(plan);
if (t_ret > m_ret)
m_ret = t_ret;
}
return m_ret;
} break;
case T_Stream: {
return stream_dop_walker(plan->lefttree) + 1;
} break;
default:
break;
}
return 0;
}
static int
dop_range_table_walker(RangeTblEntry* rte)
{
switch (rte->rtekind) {
case RTE_RELATION:
case RTE_CTE:
case RTE_RESULT:
/* nothing to do */
return 0;
case RTE_SUBQUERY:
return dop_join_degree_walker(rte->subquery);
case RTE_JOIN:
return 0;
case RTE_FUNCTION:
return 0;
case RTE_VALUES:
return 0;
#ifdef PGXC
case RTE_REMOTE_DUMMY:
return 0;
#endif /* PGXC */
default:
break;
}
return 0;
}
static int dop_join_degree_walker(Query* parse)
{
int d_rt, m_rt = 0;
ListCell* lc = NULL;
foreach (lc, parse->rtable) {
d_rt = dop_range_table_walker((RangeTblEntry*)lfirst(lc));
if (d_rt > m_rt)
m_rt = d_rt;
}
d_rt = list_length(parse->jointree->fromlist);
return d_rt + m_rt;
}

View File

@ -524,7 +524,7 @@ PlannedStmt* standard_planner(Query* parse, int cursorOptions, ParamListInfo bou
GetHashTableCount(parse, parse->cteList, &hashTableCount);
}
ChooseStartQueryDop(hashTableCount);
ChooseStartQueryDop(parse, hashTableCount);
}
if (enable_check_implicit_cast())
@ -912,6 +912,7 @@ PlannedStmt* standard_planner(Query* parse, int cursorOptions, ParamListInfo bou
result->query_string = NULL;
result->MaxBloomFilterNum = root->glob->bloomfilter.bloomfilter_index + 1;
result->plan_dop = 0;
/* record which suplan belongs to which thread */
#ifdef ENABLE_MULTIPLE_NODES
if (IS_STREAM_PLAN) {

View File

@ -580,6 +580,14 @@ static void InitStream(StreamFlowCtl* ctl, StreamTransType transType)
key.queryId = pstmt->queryId;
key.planNodeId = plan->plan_node_id;
if (pstmt->plan_dop > 0) {
if (streamNode->smpDesc.consumerDop > 1)
streamNode->smpDesc.consumerDop = pstmt->plan_dop;
if (streamNode->smpDesc.producerDop > 1)
streamNode->smpDesc.producerDop = pstmt->plan_dop;
}
/*
* MPPDB with-recursive support
*/
@ -811,6 +819,12 @@ static void InitStreamFlow(StreamFlowCtl* ctl)
if (ctl->plan) {
Plan* oldPlan = ctl->plan;
StreamFlowCheckInfo oldCheckInfo = ctl->checkInfo;
if (ctl->pstmt->plan_dop > 0 && ctl->plan->dop > 1 &&
ctl->pstmt->plan_dop != ctl->plan->dop) {
ctl->plan->dop = ctl->pstmt->plan_dop;
}
switch (nodeTag(oldPlan)) {
case T_Append:
case T_VecAppend: {

View File

@ -269,10 +269,12 @@ void InitProcGlobal(void)
g_instance.proc_base->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
#endif
g_instance.proc_base->freeProcs = NULL;
g_instance.proc_base->nFreeProcs = 0;
g_instance.proc_base->externalFreeProcs = NULL;
g_instance.proc_base->autovacFreeProcs = NULL;
g_instance.proc_base->pgjobfreeProcs = NULL;
g_instance.proc_base->cmAgentFreeProcs = NULL;
g_instance.proc_base->nFreeUtilProcs = 0;
g_instance.proc_base->startupProc = NULL;
g_instance.proc_base->startupProcPid = 0;
g_instance.proc_base->startupBufferPinWaitBufId = -1;
@ -412,16 +414,19 @@ void InitProcGlobal(void)
/* PGPROC for normal backend and auxiliary backend, add to freeProcs list */
procs[i]->links.next = (SHM_QUEUE *)g_instance.proc_base->freeProcs;
g_instance.proc_base->freeProcs = procs[i];
g_instance.proc_base->nFreeProcs++;
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
g_instance.attr.attr_sql.job_queue_processes + 1) {
/* PGPROC for pg_job backend, add to pgjobfreeProcs list, 1 for Job Schedule Lancher */
procs[i]->links.next = (SHM_QUEUE *)g_instance.proc_base->pgjobfreeProcs;
g_instance.proc_base->pgjobfreeProcs = procs[i];
g_instance.proc_base->nFreeUtilProcs++;
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
g_instance.attr.attr_sql.job_queue_processes + 1 + NUM_DCF_CALLBACK_PROCS) {
/* PGPROC for external thread, add to externalFreeProcs list */
procs[i]->links.next = (SHM_QUEUE *)g_instance.proc_base->externalFreeProcs;
g_instance.proc_base->externalFreeProcs = procs[i];
g_instance.proc_base->nFreeUtilProcs++;
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
g_instance.attr.attr_sql.job_queue_processes + 1 + NUM_DCF_CALLBACK_PROCS + NUM_CMAGENT_PROCS) {
/*
@ -431,11 +436,13 @@ void InitProcGlobal(void)
*/
procs[i]->links.next = (SHM_QUEUE*)g_instance.proc_base->cmAgentFreeProcs;
g_instance.proc_base->cmAgentFreeProcs = procs[i];
g_instance.proc_base->nFreeUtilProcs++;
} else if (i < g_instance.shmem_cxt.MaxConnections + thread_pool_stream_proc_num + AUXILIARY_BACKENDS +
g_instance.attr.attr_sql.job_queue_processes + 1 +
NUM_CMAGENT_PROCS + g_max_worker_processes + NUM_DCF_CALLBACK_PROCS) {
procs[i]->links.next = (SHM_QUEUE*)g_instance.proc_base->bgworkerFreeProcs;
g_instance.proc_base->bgworkerFreeProcs = procs[i];
g_instance.proc_base->nFreeUtilProcs++;
} else if (i < g_instance.shmem_cxt.MaxBackends + NUM_CMAGENT_PROCS + NUM_DCF_CALLBACK_PROCS) {
/*
* PGPROC for AV launcher/worker, add to autovacFreeProcs list
@ -443,6 +450,7 @@ void InitProcGlobal(void)
*/
procs[i]->links.next = (SHM_QUEUE *)g_instance.proc_base->autovacFreeProcs;
g_instance.proc_base->autovacFreeProcs = procs[i];
g_instance.proc_base->nFreeUtilProcs++;
}
/* Initialize myProcLocks[] shared memory queues. */
@ -468,7 +476,7 @@ void InitProcGlobal(void)
NUM_CMAGENT_PROCS + NUM_AUXILIARY_PROCS + NUM_DCF_CALLBACK_PROCS];
/* Create &g_instance.proc_base_lock mutexlock, too */
pthread_mutex_init(&g_instance.proc_base_lock, NULL);
pthread_rwlock_init(&g_instance.proc_base_lock, NULL);
MemoryContextSwitchTo(oldContext);
}
@ -502,6 +510,7 @@ PGPROC *GetFreeProc()
}
if (current && current == g_instance.proc_base->freeProcs) {
g_instance.proc_base->freeProcs = (PGPROC *)current->links.next;
g_instance.proc_base->nFreeProcs--;
}
return current;
@ -668,7 +677,7 @@ void InitProcess(void)
* While we are holding the &g_instance.proc_base_lock, also copy the current shared
* estimate of spins_per_delay to local storage.
*/
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
#ifndef ENABLE_THREAD_CHECK
set_spins_per_delay(g_instance.proc_base->spins_per_delay);
#endif
@ -680,29 +689,37 @@ void InitProcess(void)
if (IsAnyAutoVacuumProcess()) {
g_instance.proc_base->autovacFreeProcs = (PGPROC*)t_thrd.proc->links.next;
g_instance.proc_base->nFreeUtilProcs--;
} else if (IsJobSchedulerProcess() || IsJobWorkerProcess()) {
g_instance.proc_base->pgjobfreeProcs = (PGPROC*)t_thrd.proc->links.next;
g_instance.proc_base->nFreeUtilProcs--;
} else if (IsBgWorkerProcess()) {
g_instance.proc_base->bgworkerFreeProcs = (PGPROC*)t_thrd.proc->links.next;
g_instance.proc_base->nFreeUtilProcs--;
} else if (t_thrd.dcf_cxt.is_dcf_thread) {
g_instance.proc_base->externalFreeProcs = (PGPROC*)t_thrd.proc->links.next;
g_instance.proc_base->nFreeUtilProcs--;
} else if (u_sess->libpq_cxt.IsConnFromCmAgent) {
g_instance.proc_base->cmAgentFreeProcs = (PGPROC *)t_thrd.proc->links.next;
g_instance.proc_base->nFreeUtilProcs--;
} else {
#ifndef __USE_NUMA
g_instance.proc_base->freeProcs = (PGPROC*)t_thrd.proc->links.next;
g_instance.proc_base->nFreeProcs--;
#endif
}
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
} else {
g_instance.proc_base->nFreeProcs = 0;
/*
* If we reach here, all the PGPROCs are in use. This is one of the
* possible places to detect "too many backends", so give the standard
* error message. XXX do we need to give a different failure message
* in the autovacuum case?
*/
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
if (IsUnderPostmaster && StreamThreadAmI())
MarkPostmasterChildUnuseForStreamWorker();
@ -969,7 +986,7 @@ void InitAuxiliaryProcess(void)
* While we are holding the &g_instance.proc_base_lock, also copy the current shared
* estimate of spins_per_delay to local storage.
*/
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
#ifndef ENABLE_THREAD_CHECK
set_spins_per_delay(g_instance.proc_base->spins_per_delay);
#endif
@ -983,7 +1000,7 @@ void InitAuxiliaryProcess(void)
break;
}
if (proctype >= NUM_AUXILIARY_PROCS) {
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
ereport(FATAL, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("all &g_instance.proc_aux_base are in use")));
}
@ -994,7 +1011,7 @@ void InitAuxiliaryProcess(void)
t_thrd.proc = auxproc;
t_thrd.pgxact = &g_instance.proc_base_all_xacts[auxproc->pgprocno];
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
/*
* Initialize all fields of t_thrd.proc, except for those previously
@ -1117,12 +1134,12 @@ int GetAuxProcEntryIndex(int baseIdx)
*/
void PublishStartupProcessInformation(void)
{
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
g_instance.proc_base->startupProc = t_thrd.proc;
g_instance.proc_base->startupProcPid = t_thrd.proc_cxt.MyProcPid;
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
}
@ -1135,7 +1152,7 @@ bool HaveNFreeProcs(int n)
{
PGPROC* proc = NULL;
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_rdlock(&g_instance.proc_base_lock);
if (u_sess->libpq_cxt.IsConnFromCmAgent) {
proc = g_instance.proc_base->cmAgentFreeProcs;
@ -1148,11 +1165,50 @@ bool HaveNFreeProcs(int n)
n--;
}
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
return (n <= 0);
}
/* Returns the number of elements in the proc list */
static inline int GetNumProcsList(PGPROC* proc)
{
int n = 0;
while (proc != NULL) {
proc = (PGPROC*)proc->links.next;
n++;
}
return n;
}
void GetNFreeProcs(int *n_free, int *n_other)
{
Assert(n_free && n_other);
pthread_rwlock_rdlock(&g_instance.proc_base_lock);
/* Returns the number of free procs available for connections */
*n_free = GetNumProcsList(g_instance.proc_base->freeProcs);
/* Returns the total number of free procs of other types */
*n_other = GetNumProcsList(g_instance.proc_base->externalFreeProcs);
*n_other += GetNumProcsList(g_instance.proc_base->autovacFreeProcs);
*n_other += GetNumProcsList(g_instance.proc_base->cmAgentFreeProcs);
*n_other += GetNumProcsList(g_instance.proc_base->pgjobfreeProcs);
*n_other += GetNumProcsList(g_instance.proc_base->bgworkerFreeProcs);
Assert((*n_free) == g_instance.proc_base->nFreeProcs);
Assert((*n_other) == g_instance.proc_base->nFreeUtilProcs);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
}
/*
* Check if the current process is awaiting a lock.
*/
@ -1263,15 +1319,19 @@ static void ProcPutBackToFreeList()
if (IsAnyAutoVacuumProcess()) {
t_thrd.proc->links.next = (SHM_QUEUE*)g_instance.proc_base->autovacFreeProcs;
g_instance.proc_base->autovacFreeProcs = t_thrd.proc;
g_instance.proc_base->nFreeUtilProcs++;
} else if (IsJobSchedulerProcess() || IsJobWorkerProcess()) {
t_thrd.proc->links.next = (SHM_QUEUE*)g_instance.proc_base->pgjobfreeProcs;
g_instance.proc_base->pgjobfreeProcs = t_thrd.proc;
g_instance.proc_base->nFreeUtilProcs++;
} else if (t_thrd.dcf_cxt.is_dcf_thread) {
t_thrd.proc->links.next = (SHM_QUEUE *)g_instance.proc_base->externalFreeProcs;
g_instance.proc_base->externalFreeProcs = t_thrd.proc;
g_instance.proc_base->nFreeUtilProcs++;
} else if (u_sess->libpq_cxt.IsConnFromCmAgent) {
t_thrd.proc->links.next = (SHM_QUEUE*)g_instance.proc_base->cmAgentFreeProcs;
g_instance.proc_base->cmAgentFreeProcs = t_thrd.proc;
g_instance.proc_base->nFreeUtilProcs++;
(void)pg_atomic_sub_fetch_u32(&g_instance.conn_cxt.CurCMAProcCount, 1);
ereport(DEBUG5, (errmsg("Proc exit, put cm_agent to free list, current cm_agent proc count is %d",
g_instance.conn_cxt.CurCMAProcCount)));
@ -1282,10 +1342,12 @@ static void ProcPutBackToFreeList()
}
} else if (IsBgWorkerProcess()) {
t_thrd.proc->links.next = (SHM_QUEUE*)g_instance.proc_base->bgworkerFreeProcs;
g_instance.proc_base->bgworkerFreeProcs = t_thrd.proc;
g_instance.proc_base->bgworkerFreeProcs = t_thrd.proc;
g_instance.proc_base->nFreeUtilProcs++;
} else {
t_thrd.proc->links.next = (SHM_QUEUE*)g_instance.proc_base->freeProcs;
g_instance.proc_base->freeProcs = t_thrd.proc;
g_instance.proc_base->nFreeProcs++;
if (t_thrd.role == WORKER && u_sess->proc_cxt.PassConnLimit) {
SpinLockAcquire(&g_instance.conn_cxt.ConnCountLock);
g_instance.conn_cxt.CurConnCount--;
@ -1379,7 +1441,7 @@ static void ProcKill(int code, Datum arg)
clean_proc_dw_buf();
/* Clean subxid cache if needed. */
ProcSubXidCacheClean();
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
/*
* If we're still a member of a locking group, that means we're a leader
@ -1407,7 +1469,7 @@ static void ProcKill(int code, Datum arg)
g_instance.proc_base->spins_per_delay = update_spins_per_delay(g_instance.proc_base->spins_per_delay);
#endif
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
/*
* This process is no longer present in shared memory in any meaningful
@ -1468,7 +1530,7 @@ static void AuxiliaryProcKill(int code, Datum arg)
clean_proc_dw_buf();
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
/* Mark auxiliary proc no longer in use */
t_thrd.proc->pid = 0;
@ -1484,7 +1546,7 @@ static void AuxiliaryProcKill(int code, Datum arg)
g_instance.proc_base->spins_per_delay = update_spins_per_delay(g_instance.proc_base->spins_per_delay);
#endif
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
}
/*
@ -2357,7 +2419,7 @@ void ProcSendSignal(ThreadId pid)
PGPROC* proc = NULL;
if (RecoveryInProgress()) {
pthread_mutex_lock(&g_instance.proc_base_lock);
pthread_rwlock_wrlock(&g_instance.proc_base_lock);
/*
* Check to see whether it is the Startup process we wish to signal.
@ -2369,7 +2431,7 @@ void ProcSendSignal(ThreadId pid)
*/
proc = MultiRedoThreadPidGetProc(pid);
pthread_mutex_unlock(&g_instance.proc_base_lock);
pthread_rwlock_unlock(&g_instance.proc_base_lock);
}
if (proc == NULL)

View File

@ -1127,7 +1127,7 @@ typedef struct knl_instance_context {
void* bgw_base;
pthread_mutex_t bgw_base_lock;
struct PROC_HDR* proc_base;
pthread_mutex_t proc_base_lock;
pthread_rwlock_t proc_base_lock;
struct PGPROC** proc_base_all_procs;
struct PGXACT* proc_base_all_xacts;
struct PGPROC** proc_aux_base;

View File

@ -335,6 +335,11 @@ typedef struct knl_u_optimizer_context {
int query_dop_store; /* Store the dop. */
int query_dop; /* Degree of parallel, 1 means not parallel. */
bool auto_dop; /* Automatically degrade the DOP */
bool auto_dop_freeprocs; /* Adjust the dop based on the number of free procs */
int auto_dop_freeprocs_th; /* min number of free procs to disable DOP */
int auto_dop_join_th; /* When joins of the QueryTree exceeds the threshold, the dop is reduced to 2 */
int saved_dop; /* original value of query_dop */
double smp_thread_cost;

View File

@ -187,6 +187,7 @@ typedef struct PlannedStmt {
bool multi_node_hint;
uint64 uniqueSQLId;
int plan_dop;
} PlannedStmt;
typedef struct NodeGroupInfoContext {

View File

@ -30,7 +30,7 @@
extern void OptimizePlanDop(PlannedStmt* plannedStmt);
extern void InitDynamicSmp();
extern bool IsDynamicSmpEnabled();
extern void ChooseStartQueryDop(int hashTableCount);
extern void ChooseStartQueryDop(Query* parse, int hashTableCount);
extern void CheckQueryDopValue();
typedef struct GMNSDConext {

View File

@ -348,6 +348,8 @@ typedef struct PROC_HDR {
uint32 allNonPreparedProcCount;
/* Head of list of free PGPROC structures */
PGPROC* freeProcs;
/* Number of freeProcs */
int nFreeProcs;
/* Head of list of external's free PGPROC structures */
PGPROC* externalFreeProcs;
/* Head of list of autovacuum's free PGPROC structures */
@ -358,6 +360,8 @@ typedef struct PROC_HDR {
PGPROC* pgjobfreeProcs;
/* Head of list of bgworker free PGPROC structures */
PGPROC* bgworkerFreeProcs;
/* Number of above free procs(from externalFreeProcs to bgworkerFreeProcs) */
int nFreeUtilProcs;
/* First pgproc waiting for group XID clear */
pg_atomic_uint32 procArrayGroupFirst;
/* First pgproc waiting for group transaction status update */
@ -457,6 +461,7 @@ extern int GetAuxProcEntryIndex(int baseIdx);
extern void PublishStartupProcessInformation(void);
extern bool HaveNFreeProcs(int n);
extern void GetNFreeProcs(int *n_free, int *n_other);
extern void ProcReleaseLocks(bool isCommit);
extern int GetUsedConnectionCount(void);
extern int GetUsedInnerToolConnCount(void);

View File

@ -5238,7 +5238,7 @@ static void check_global_variables()
}
}
#define BASE_PGXC_LIKE_MACRO_NUM 1398
#define BASE_PGXC_LIKE_MACRO_NUM 1399
static void check_pgxc_like_macros()
{
#ifdef BUILD_BY_CMAKE