!2654 IUD OPTIMIZE

Merge pull request !2654 from wuchenglin/dev_iud_1219
This commit is contained in:
opengauss-bot 2022-12-20 09:04:08 +00:00 committed by Gitee
commit 7f7ac666a6
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
35 changed files with 652 additions and 380 deletions

View File

@ -100,7 +100,7 @@ elseif($ENV{DEBUG_TYPE} STREQUAL "release")
#close something for release version.
set(ENABLE_LLT OFF)
set(ENABLE_UT OFF)
set(OPTIMIZE_LEVEL -O2 -g3)
set(OPTIMIZE_LEVEL -O2)
elseif($ENV{DEBUG_TYPE} STREQUAL "memcheck")
message("DEBUG_TYPE:$ENV{DEBUG_TYPE}")
set(ENABLE_MEMORY_CHECK ON)
@ -134,9 +134,9 @@ endif()
set(PROTECT_OPTIONS -fwrapv -std=c++14 -fnon-call-exceptions ${OPTIMIZE_LEVEL})
set(WARNING_OPTIONS -Wall -Wendif-labels -Werror -Wformat-security)
set(OPTIMIZE_OPTIONS -pipe -pthread -fno-aggressive-loop-optimizations -fno-expensive-optimizations -fno-omit-frame-pointer -fno-strict-aliasing -freg-struct-return)
set(OPTIMIZE_OPTIONS -pipe -pthread -fno-aggressive-loop-optimizations -fno-strict-aliasing -freg-struct-return)
set(CHECK_OPTIONS -Wmissing-format-attribute -Wno-attributes -Wno-unused-but-set-variable -Wno-write-strings -Wpointer-arith)
set(MACRO_OPTIONS -D_GLIBCXX_USE_CXX11_ABI=0 -DENABLE_GSTRACE -D_GNU_SOURCE -DPGXC -D_POSIX_PTHREAD_SEMANTICS -D_REENTRANT -DSTREAMPLAN -D_THREAD_SAFE ${DB_COMMON_DEFINE})
set(MACRO_OPTIONS -D_GLIBCXX_USE_CXX11_ABI=0 -D_GNU_SOURCE -DPGXC -D_POSIX_PTHREAD_SEMANTICS -DSTREAMPLAN ${DB_COMMON_DEFINE})
# libraries need secure options during compling
set(LIB_SECURE_OPTIONS -fPIC -fno-common -fstack-protector)

View File

@ -659,6 +659,7 @@ auto_explain_level|enum|off,log,notice|NULL|NULL|
cost_weight_index|real|1e-10,1e+10|NULL|NULL|
default_limit_rows|real|-100,1.79769e+308|NULL|NULL|
sql_beta_feature|enum|partition_fdw_on,partition_opfusion,index_cost_with_leaf_pages_only,canonical_pathkey,join_sel_with_cast_func,no_unique_index_first,sel_semi_poisson,sel_expr_instr,param_path_gen,rand_cost_opt,param_path_opt,page_est_opt,a_style_coerce,predpush_same_level,sublink_pullup_enhanced,none|NULL|NULL|
sql_fusion_engine|enum|iud_checksum_remove,iud_node_context_remove,iud_is_system_class_remove_package,iud_errorrel_remove,iud_block_chain_remove,iud_trigger_remove,iud_memory_context_track_remove,iud_instr_time_remove,iud_markdrop_remove,iud_code_optimize,iud_report_remove,iud_pending,none|NULL|NULL|
max_logical_replication_workers|int|0,262143|NULL|Maximum number of logical replication worker processes.|
walwriter_sleep_threshold|int64|1,50000|NULL|NULL|
walwriter_cpu_bind|int|-1,2147483647|NULL|NULL|

View File

@ -5087,6 +5087,7 @@ AclMode pg_class_aclmask(Oid table_oid, Oid roleid, AclMode mask, AclMaskHow how
errcause("System error."), erraction("Contact engineer to support.")));
classForm = (Form_pg_class)GETSTRUCT(tuple);
#ifdef ENABLE_MULTIPLE_NODES
/* Check current user has privilige to this group */
if (IS_PGXC_COORDINATOR && !IsInitdb && check_nodegroup && is_pgxc_class_table(table_oid) &&
roleid != classForm->relowner) {
@ -5103,6 +5104,7 @@ AclMode pg_class_aclmask(Oid table_oid, Oid roleid, AclMode mask, AclMaskHow how
}
}
}
#endif
/*
* Deny anyone permission to update a system catalog unless
@ -5126,13 +5128,14 @@ AclMode pg_class_aclmask(Oid table_oid, Oid roleid, AclMode mask, AclMaskHow how
* initial user and monitorsdmin bypass all permission-checking.
*/
Oid namespaceId = classForm->relnamespace;
if (IsMonitorSpace(namespaceId) && (roleid == INITIAL_USER_ID || isMonitoradmin(roleid))) {
bool isMonitorNs = IsMonitorSpace(namespaceId);
if (isMonitorNs && (roleid == INITIAL_USER_ID || isMonitoradmin(roleid))) {
ReleaseSysCache(tuple);
return mask;
}
/* Blockchain hist table cannot be modified */
if (table_oid == GsGlobalChainRelationId || classForm->relnamespace == PG_BLOCKCHAIN_NAMESPACE) {
if (table_oid == GsGlobalChainRelationId || namespaceId == PG_BLOCKCHAIN_NAMESPACE) {
if (isRelSuperuser() || isAuditadmin(roleid)) {
mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE | ACL_REFERENCES);
} else {
@ -5144,7 +5147,7 @@ AclMode pg_class_aclmask(Oid table_oid, Oid roleid, AclMode mask, AclMaskHow how
/* Otherwise, superusers bypass all permission-checking, except access independent role's objects. */
/* Database Security: Support separation of privilege. */
if (!is_ddl_privileges && !IsMonitorSpace(namespaceId) && (superuser_arg(roleid) || systemDBA_arg(roleid)) &&
if (!is_ddl_privileges && !isMonitorNs && (superuser_arg(roleid) || systemDBA_arg(roleid)) &&
((classForm->relowner == roleid) || !is_role_independent(classForm->relowner) ||
independent_priv_aclcheck(mask, classForm->relkind))) {
#ifdef ACLDEBUG

View File

@ -645,6 +645,10 @@ bool IsSystemClass(Form_pg_class reltuple)
{
Oid relnamespace = reltuple->relnamespace;
if (ENABLE_SQL_FUSION_ENGINE(IUD_IS_SYSTEM_CLASS_REMOVE_PACKAGE)) {
return IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace);
}
return IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace) || IsPackageSchemaOid(relnamespace);
}
@ -682,6 +686,13 @@ bool IsCatalogClass(Oid relid, Form_pg_class reltuple)
{
Oid relnamespace = reltuple->relnamespace;
/* Optimize if judgment */
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
if ((relid < FirstNormalObjectId) && (IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace))) {
return true;
}
return false;
}
/*
* Never consider relations outside pg_catalog/pg_toast to be catalog
* relations.

View File

@ -793,7 +793,11 @@ char* RelnameGetRelidExtended(const char* relname, Oid* relOid, Oid* refSynOid,
recomputeNamespacePath();
tempActiveSearchPath = list_copy(u_sess->catalog_cxt.activeSearchPath);
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
tempActiveSearchPath = u_sess->catalog_cxt.activeSearchPath;
} else {
tempActiveSearchPath = list_copy(u_sess->catalog_cxt.activeSearchPath);
}
foreach (l, tempActiveSearchPath) {
Oid namespaceId = lfirst_oid(l);
@ -812,7 +816,9 @@ char* RelnameGetRelidExtended(const char* relname, Oid* relOid, Oid* refSynOid,
if (relOid != NULL && !OidIsValid(*relOid) && module_logging_is_on(MOD_SCHEMA)) {
AddSchemaSearchPathInfo(tempActiveSearchPath, detailInfo);
}
list_free_ext(tempActiveSearchPath);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
list_free_ext(tempActiveSearchPath);
}
/* return checking details. */
return errDetail;

View File

@ -1120,7 +1120,9 @@ Relation parserOpenTable(ParseState *pstate, const RangeVar *relation, int lockm
cancel_parser_errposition_callback(&pcbstate);
/* Forbit DQL/DML on recyclebin object */
TrForbidAccessRbObject(RelationRelationId, RelationGetRelid(rel), relation->relname);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
TrForbidAccessRbObject(RelationRelationId, RelationGetRelid(rel), relation->relname);
}
/* check wlm session info whether is valid in this database */
if (!CheckWLMSessionInfoTableValid(relation->relname) && !u_sess->attr.attr_common.IsInplaceUpgrade) {

View File

@ -1850,9 +1850,13 @@ CachedPlan* GetCachedPlan(CachedPlanSource* plansource, ParamListInfo boundParam
plansource->num_custom_plans++;
}
ereport(DEBUG2, (errmodule(MOD_OPT), errmsg("Custom plan is used for \"%s\"", plansource->query_string)));
if (SHOW_DEBUG_MESSAGE()) {
ereport(DEBUG2, (errmodule(MOD_OPT), errmsg("Custom plan is used for \"%s\"", plansource->query_string)));
}
} else {
ereport(DEBUG2, (errmodule(MOD_OPT), errmsg("Generic plan is used for \"%s\"", plansource->query_string)));
if (SHOW_DEBUG_MESSAGE()) {
ereport(DEBUG2, (errmodule(MOD_OPT), errmsg("Generic plan is used for \"%s\"", plansource->query_string)));
}
}
/*
@ -1892,14 +1896,16 @@ CachedPlan* GetCachedPlan(CachedPlanSource* plansource, ParamListInfo boundParam
plan->mot_jit_context = plansource->mot_jit_context;
#endif
ListCell *lc;
foreach (lc, plan->stmt_list) {
PlannedStmt* plannedstmt = (PlannedStmt*)lfirst(lc);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
ListCell *lc;
foreach (lc, plan->stmt_list) {
PlannedStmt* plannedstmt = (PlannedStmt*)lfirst(lc);
if (!IsA(plannedstmt, PlannedStmt))
continue; /* Ignore utility statements */
if (!IsA(plannedstmt, PlannedStmt))
continue; /* Ignore utility statements */
check_gtm_free_plan(plannedstmt, u_sess->attr.attr_sql.explain_allow_multinode ? WARNING : ERROR);
check_gtm_free_plan(plannedstmt, u_sess->attr.attr_sql.explain_allow_multinode ? WARNING : ERROR);
}
}
return plan;

View File

@ -412,6 +412,7 @@ const char* sync_guc_variable_namelist[] = {"work_mem",
#ifndef ENABLE_MULTIPLE_NODES
"plsql_show_all_error",
"uppercase_attribute_name",
"sql_fusion_engine",
#endif
"track_stmt_session_slot",
"track_stmt_stat_level",
@ -4404,6 +4405,13 @@ struct config_generic** get_guc_variables(void)
return u_sess->guc_variables;
}
#ifdef ENABLE_MULTIPLE_NODES
static void InitMultipleNodeUnsupportGuc()
{
u_sess->attr.attr_sql.sql_fusion_engine = 0;
}
#endif
#ifndef ENABLE_MULTIPLE_NODES
static void InitSingleNodeUnsupportGuc()
{
@ -6646,7 +6654,8 @@ static const char* config_enum_map_lookup_by_value(struct config_enum* record, i
const char* config_enum_lookup_by_value(struct config_enum* record, int val)
{
if (pg_strncasecmp(record->gen.name, "rewrite_rule", sizeof("rewrite_rule")) == 0 ||
pg_strncasecmp(record->gen.name, "sql_beta_feature", sizeof("sql_beta_feature")) == 0) {
pg_strncasecmp(record->gen.name, "sql_beta_feature", sizeof("sql_beta_feature")) == 0 ||
pg_strncasecmp(record->gen.name, "sql_fusion_engine", sizeof("sql_fusion_engine")) == 0) {
return config_enum_map_lookup_by_value(record, val);
} else {
const struct config_enum_entry* entry = NULL;
@ -6716,7 +6725,8 @@ static bool config_enum_map_lookup_by_name(struct config_enum* record, const cha
bool config_enum_lookup_by_name(struct config_enum* record, const char* value, int* retval)
{
if (pg_strncasecmp(record->gen.name, "rewrite_rule", sizeof("rewrite_rule")) == 0 ||
pg_strncasecmp(record->gen.name, "sql_beta_feature", sizeof("sql_beta_feature")) == 0) {
pg_strncasecmp(record->gen.name, "sql_beta_feature", sizeof("sql_beta_feature")) == 0 ||
pg_strncasecmp(record->gen.name, "sql_fusion_engine", sizeof("sql_fusion_engine")) == 0) {
return config_enum_map_lookup_by_name(record, value, retval);
} else {
const struct config_enum_entry* entry = NULL;

View File

@ -295,6 +295,23 @@ static const struct config_enum_entry sql_beta_options[] = {
{NULL, 0, false}
};
static const struct config_enum_entry sql_fusion_engine_options[] = {
{"none", NO_SQL_FUSION_FEATURE, false},
{"iud_checksum_remove", IUD_CHECKSUM_REMOVE, false},
{"iud_node_context_remove", IUD_NODE_CONTEXT_REMOVE, false},
{"iud_is_system_class_remove_package", IUD_IS_SYSTEM_CLASS_REMOVE_PACKAGE, false},
{"iud_errorrel_remove", IUD_ERRORREL_REMOVE, false},
{"iud_block_chain_remove", IUD_BLOCK_CHAIN_REMOVE, false},
{"iud_trigger_remove", IUD_TRIGGER_REMOVE, false},
{"iud_memory_context_track_remove", IUD_MEMORY_CONTEXT_TRACK_REMOVE, false},
{"iud_instr_time_remove", IUD_INSTR_TIME_REMOVE, false},
{"iud_markdrop_remove", IUD_MARKDROP_REMOVE, false},
{"iud_code_optimize", IUD_CODE_OPTIMIZE, false},
{"iud_report_remove", IUD_REPORT_REMOVE, false},
{"iud_pending", IUD_PENDING, false},
{NULL, 0, false}
};
static const struct config_enum_entry vector_engine_strategy[] = {
{"off", OFF_VECTOR_ENGINE, false},
{"force", FORCE_VECTOR_ENGINE, false},
@ -2909,6 +2926,17 @@ static void InitSqlConfigureNamesEnum()
NULL,
NULL,
NULL},
{{"sql_fusion_engine",
PGC_USERSET,
NODE_SINGLENODE,
QUERY_TUNING,
gettext_noop("Sets the beta feature for SQL fusion engine."), NULL, GUC_LIST_INPUT},
&u_sess->attr.attr_sql.sql_fusion_engine,
NO_SQL_FUSION_FEATURE,
sql_fusion_engine_options,
NULL,
NULL,
NULL},
{{"try_vector_engine_strategy",
PGC_USERSET,
NODE_ALL,

View File

@ -363,6 +363,7 @@ enable_kill_query = off # optional: [on, off], default: off
#cpu_index_tuple_cost = 0.005 # same scale as above
#cpu_operator_cost = 0.0025 # same scale as above
#effective_cache_size = 128MB
#sql_fusion_engine = none
# - Genetic Query Optimizer -

View File

@ -216,7 +216,9 @@ void MemoryContextReset(MemoryContext context)
{
AssertArg(MemoryContextIsValid(context));
#ifdef MEMORY_CONTEXT_CHECKING
PreventActionOnSealedContext(context);
#endif
if (MemoryContextIsShared(context))
MemoryContextLock(context);
@ -1000,7 +1002,7 @@ void* MemoryAllocFromContext(MemoryContext context, Size size, const char* file,
context->isReset = false;
ret = (*context->methods->alloc)(context, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1062,7 +1064,7 @@ void* MemoryContextAllocZeroDebug(MemoryContext context, Size size, const char*
context->isReset = false;
ret = (*context->methods->alloc)(context, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1100,8 +1102,9 @@ void* MemoryContextAllocZeroAlignedDebug(MemoryContext context, Size size, const
void* ret = NULL;
AssertArg(MemoryContextIsValid(context));
#ifdef MEMORY_CONTEXT_CHECKING
PreventActionOnSealedContext(context);
#endif
if (!AllocSizeIsValid(size)) {
ereport(ERROR,
@ -1112,7 +1115,7 @@ void* MemoryContextAllocZeroAlignedDebug(MemoryContext context, Size size, const
context->isReset = false;
ret = (*context->methods->alloc)(context, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1131,6 +1134,7 @@ void* MemoryContextAllocZeroAlignedDebug(MemoryContext context, Size size, const
if (unlikely(STATEMENT_MAX_MEM)) {
MemoryContextCheckSessionMemory(context, size, file, line);
}
MemSetLoop(ret, 0, size);
InsertMemoryAllocInfo(ret, context, file, line, size);
@ -1159,7 +1163,7 @@ void* palloc_extended(Size size, int flags)
CurrentMemoryContext->isReset = false;
ret = (*CurrentMemoryContext->methods->alloc)(CurrentMemoryContext, 0, size, __FILE__, __LINE__);
if (ret == NULL) {
if (unlikely(ret == NULL)) {
/*
* If flag has not MCXT_ALLOC_NO_OOM, we must ereport ERROR here
*/
@ -1264,9 +1268,10 @@ void* repalloc_noexcept_Debug(void* pointer, Size size, const char* file, int li
ret = (*context->methods->realloc)(context, pointer, 0, size, file, line);
if (ret == NULL) {
if (unlikely(ret == NULL)) {
return NULL;
}
/* check if the session used memory is beyond the limitation */
if (unlikely(STATEMENT_MAX_MEM)) {
MemoryContextCheckSessionMemory(context, size, file, line);
@ -1308,14 +1313,16 @@ void* repallocDebug(void* pointer, Size size, const char* file, int line)
context = &(block->aset->header);
#endif
AssertArg(MemoryContextIsValid(context));
#ifdef MEMORY_CONTEXT_CHECKING
PreventActionOnSealedContext(context);
#endif
/* isReset must be false already */
Assert(!context->isReset);
RemoveMemoryAllocInfo(pointer, context);
ret = (*context->methods->realloc)(context, pointer, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1358,7 +1365,7 @@ void* MemoryContextMemalignAllocDebug(MemoryContext context, Size align, Size si
context->isReset = false;
ret = (*context->methods->alloc)(context, align, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1404,7 +1411,7 @@ void* MemoryContextAllocHugeDebug(MemoryContext context, Size size, const char*
context->isReset = false;
ret = (*context->methods->alloc)(context, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),
@ -1520,7 +1527,7 @@ void* repallocHugeDebug(void* pointer, Size size, const char* file, int line)
RemoveMemoryAllocInfo(pointer, context);
ret = (*context->methods->realloc)(context, pointer, 0, size, file, line);
if (ret == NULL)
if (unlikely(ret == NULL))
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_LOGICAL_MEMORY),
errmsg("memory is temporarily unavailable"),

View File

@ -3922,12 +3922,16 @@ static int exec_stmt_fori(PLpgSQL_execstate* estate, PLpgSQL_stmt_fori* stmt)
* execution of the loop body. Release if successed; save exception
* information and rollback if failed.
*/
SPI_savepoint_create(SE_SAVEPOINT_NAME);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
SPI_savepoint_create(SE_SAVEPOINT_NAME);
}
PG_TRY();
{
rc = exec_stmts(estate, stmt->body);
SPI_savepoint_release(SE_SAVEPOINT_NAME);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
SPI_savepoint_release(SE_SAVEPOINT_NAME);
}
plpgsql_create_econtext(estate);
stp_cleanup_subxact_resowner(stackId);
t_thrd.utils_cxt.CurrentResourceOwner = oldowner;
@ -3949,7 +3953,9 @@ static int exec_stmt_fori(PLpgSQL_execstate* estate, PLpgSQL_stmt_fori* stmt)
be_var->isnull = false;
FlushErrorState();
SPI_savepoint_rollbackAndRelease(SE_SAVEPOINT_NAME, InvalidTransactionId);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
SPI_savepoint_rollbackAndRelease(SE_SAVEPOINT_NAME, InvalidTransactionId);
}
stp_cleanup_subxact_resowner(stackId);
t_thrd.utils_cxt.CurrentResourceOwner = oldowner;
exception_saved = true;

View File

@ -252,7 +252,9 @@ void standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
t_thrd.utils_cxt.mctx_sequent_count = 0;
/* Initialize the memory tracking information */
MemoryTrackingInit();
if (!ENABLE_SQL_FUSION_ENGINE(IUD_MEMORY_CONTEXT_TRACK_REMOVE)) {
MemoryTrackingInit();
}
/*
* Build EState, switch into per-query memory context for startup.
@ -268,33 +270,38 @@ void standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
#endif
#ifndef ENABLE_MULTIPLE_NODES
(void)InitStreamObject(queryDesc->plannedstmt);
if (queryDesc->plannedstmt->num_streams > 0) {
(void)InitStreamObject(queryDesc->plannedstmt);
}
#endif
if (StreamTopConsumerAmI() && queryDesc->instrument_options != 0 && IS_PGXC_DATANODE) {
int dop = queryDesc->plannedstmt->query_dop;
if (queryDesc->plannedstmt->in_compute_pool) {
dop = 1;
if (StreamTopConsumerAmI() && queryDesc->instrument_options != 0) {
if (IS_PGXC_DATANODE) {
int dop = queryDesc->plannedstmt->query_dop;
if (queryDesc->plannedstmt->in_compute_pool) {
dop = 1;
}
AutoContextSwitch streamCxtGuard(u_sess->stream_cxt.stream_runtime_mem_cxt);
u_sess->instr_cxt.global_instr = StreamInstrumentation::InitOnDn(queryDesc, dop);
// u_sess->instr_cxt.thread_instr in DN
u_sess->instr_cxt.thread_instr =
u_sess->instr_cxt.global_instr->allocThreadInstrumentation(
queryDesc->plannedstmt->planTree->plan_node_id);
}
AutoContextSwitch streamCxtGuard(u_sess->stream_cxt.stream_runtime_mem_cxt);
u_sess->instr_cxt.global_instr = StreamInstrumentation::InitOnDn(queryDesc, dop);
// u_sess->instr_cxt.thread_instr in DN
u_sess->instr_cxt.thread_instr =
u_sess->instr_cxt.global_instr->allocThreadInstrumentation(queryDesc->plannedstmt->planTree->plan_node_id);
}
/* CN of the compute pool. */
if (IS_PGXC_COORDINATOR && queryDesc->plannedstmt->in_compute_pool) {
const int dop = 1;
/* CN of the compute pool. */
if (StreamTopConsumerAmI() && queryDesc->instrument_options != 0 && IS_PGXC_COORDINATOR &&
queryDesc->plannedstmt->in_compute_pool) {
const int dop = 1;
/* m_instrDataContext in CN of compute pool is under t_thrd.mem_cxt.stream_runtime_mem_cxt */
AutoContextSwitch streamCxtGuard(u_sess->stream_cxt.stream_runtime_mem_cxt);
u_sess->instr_cxt.global_instr = StreamInstrumentation::InitOnCP(queryDesc, dop);
/* m_instrDataContext in CN of compute pool is under t_thrd.mem_cxt.stream_runtime_mem_cxt */
AutoContextSwitch streamCxtGuard(u_sess->stream_cxt.stream_runtime_mem_cxt);
u_sess->instr_cxt.global_instr = StreamInstrumentation::InitOnCP(queryDesc, dop);
u_sess->instr_cxt.thread_instr =
u_sess->instr_cxt.global_instr->allocThreadInstrumentation(queryDesc->plannedstmt->planTree->plan_node_id);
u_sess->instr_cxt.thread_instr =
u_sess->instr_cxt.global_instr->allocThreadInstrumentation(
queryDesc->plannedstmt->planTree->plan_node_id);
}
}
old_context = MemoryContextSwitchTo(estate->es_query_cxt);
@ -369,7 +376,7 @@ void standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
if (IS_PGXC_COORDINATOR || IsConnFromApp()) {
#else
/* statement always start in non-stream thread */
if (!StreamThreadAmI()) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE) && !StreamThreadAmI()) {
#endif
SetCurrentStmtTimestamp();
} /* else stmtSystemTimestamp synchronize from CN */
@ -377,23 +384,26 @@ void standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
/*
* Initialize the plan state tree
*/
(void)INSTR_TIME_SET_CURRENT(starttime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(starttime);
IPC_PERFORMANCE_LOG_OUTPUT("standard_ExecutorStart InitPlan start.");
InitPlan(queryDesc, eflags);
IPC_PERFORMANCE_LOG_OUTPUT("standard_ExecutorStart InitPlan end.");
totaltime += elapsed_time(&starttime);
IPC_PERFORMANCE_LOG_OUTPUT("standard_ExecutorStart InitPlan start.");
InitPlan(queryDesc, eflags);
IPC_PERFORMANCE_LOG_OUTPUT("standard_ExecutorStart InitPlan end.");
totaltime += elapsed_time(&starttime);
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (estate->es_instrument != INSTRUMENT_NONE && StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr &&
u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int *m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (estate->es_instrument != INSTRUMENT_NONE && StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr &&
u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int *m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.init_time =
totaltime;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.init_time =
totaltime;
}
} else {
InitPlan(queryDesc, eflags);
}
/*
@ -506,7 +516,9 @@ void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
}
}
print_duration(queryDesc);
instr_stmt_report_query_plan(queryDesc);
if (ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_query_plan(queryDesc);
}
/* sql active feature, opeartor history statistics */
if (can_operator_history_statistics) {
@ -596,7 +608,10 @@ void standard_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long co
u_sess->exec_cxt.global_bucket_cnt = 0;
}
(void)INSTR_TIME_SET_CURRENT(starttime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(starttime);
}
/*
* run plan
*/
@ -612,26 +627,27 @@ void standard_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long co
#endif
}
}
totaltime += elapsed_time(&starttime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
totaltime += elapsed_time(&starttime);
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (
#ifndef ENABLE_MULTIPLE_NODES
!u_sess->attr.attr_common.enable_seqscan_fusion &&
#endif
estate->es_instrument != INSTRUMENT_NONE
&& StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr && u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int* m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.run_time =
totaltime;
}
}
queryDesc->executed = true;
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (
#ifndef ENABLE_MULTIPLE_NODES
!u_sess->attr.attr_common.enable_seqscan_fusion &&
#endif
estate->es_instrument != INSTRUMENT_NONE
&& StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr && u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int* m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.run_time =
totaltime;
}
/*
* shutdown tuple receiver, if we started it
*/
@ -745,7 +761,9 @@ void standard_ExecutorEnd(QueryDesc *queryDesc)
instr_time starttime;
double totaltime = 0;
(void)INSTR_TIME_SET_CURRENT(starttime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(starttime);
}
/* sanity checks */
Assert(queryDesc != NULL);
@ -805,18 +823,20 @@ void standard_ExecutorEnd(QueryDesc *queryDesc)
/* output the memory tracking information into file */
MemoryTrackingOutputFile();
totaltime += elapsed_time(&starttime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
totaltime += elapsed_time(&starttime);
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (queryDesc->instrument_options != 0 && StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr &&
u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int *m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
/*
* if current plan is working for expression, no need to collect instrumentation.
*/
if (queryDesc->instrument_options != 0 && StreamTopConsumerAmI() && u_sess->instr_cxt.global_instr &&
u_sess->instr_cxt.thread_instr) {
int node_id = queryDesc->plannedstmt->planTree->plan_node_id - 1;
int *m_instrArrayMap = u_sess->instr_cxt.thread_instr->m_instrArrayMap;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.end_time =
totaltime;
u_sess->instr_cxt.thread_instr->m_instrArray[m_instrArrayMap[node_id]].instr.instruPlanData.end_time =
totaltime;
}
}
/* reset global values of perm space */
@ -952,7 +972,8 @@ static bool ExecCheckRTEPerms(RangeTblEntry *rte)
/*
* If relation is in ledger schema, avoid procedure or function on it.
*/
if (u_sess->SPI_cxt._connected > -1 && is_ledger_usertable(rte->relid)) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_BLOCK_CHAIN_REMOVE)
&& (u_sess->SPI_cxt._connected > -1) && is_ledger_usertable(rte->relid)) {
gstrace_exit(GS_TRC_ID_ExecCheckRTEPerms);
return false;
}
@ -973,10 +994,12 @@ static bool ExecCheckRTEPerms(RangeTblEntry *rte)
* so we should do special handling: for query involving pg_statistic/pg_statistic_ext from
* other CNs, ignore the authorization check.
*/
#ifdef ENABLE_MULTIPLE_NODES
if ((StatisticRelationId == rte->relid || StatisticExtRelationId == rte->relid) && IsConnFromCoord()) {
gstrace_exit(GS_TRC_ID_ExecCheckRTEPerms);
return true;
}
#endif
rel_oid = rte->relid;
@ -1159,29 +1182,15 @@ void InitPlan(QueryDesc *queryDesc, int eflags)
TupleDesc tupType = NULL;
ListCell *l = NULL;
int i;
bool check = false;
gstrace_entry(GS_TRC_ID_InitPlan);
/*
* Do permissions checks
*/
if (!(IS_PGXC_DATANODE && (IsConnFromCoord() || IsConnFromDatanode()))) {
check = true;
}
if (u_sess->exec_cxt.is_exec_trigger_func) {
check = true;
}
if (plannedstmt->in_compute_pool) {
check = true;
}
if (u_sess->pgxc_cxt.is_gc_fdw && u_sess->pgxc_cxt.is_gc_fdw_analyze) {
check = true;
}
if (check) {
if (!(IS_PGXC_DATANODE && (IsConnFromCoord() || IsConnFromDatanode()))
|| u_sess->exec_cxt.is_exec_trigger_func
|| plannedstmt->in_compute_pool
|| (u_sess->pgxc_cxt.is_gc_fdw && u_sess->pgxc_cxt.is_gc_fdw_analyze)) {
(void)ExecCheckRTPerms(rangeTable, true);
}
@ -1320,11 +1329,13 @@ void InitPlan(QueryDesc *queryDesc, int eflags)
ItemPointerSetInvalid(&(erm->curCtid));
estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
}
uint64 plan_end_time = time(NULL);
if ((plan_end_time - plan_start_time) > THREAD_INTSERVAL_60S) {
ereport(WARNING,
(errmsg("InitPlan foreach plannedstmt->rowMarks takes %lus, plan_start_time:%lus, plan_end_time:%lus.",
plan_end_time - plan_start_time, plan_start_time, plan_end_time)));
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE) && !StreamThreadAmI()) {
uint64 plan_end_time = time(NULL);
if ((plan_end_time - plan_start_time) > THREAD_INTSERVAL_60S) {
ereport(WARNING,
(errmsg("InitPlan foreach plannedstmt->rowMarks takes %lus, plan_start_time:%lus, plan_end_time:%lus.",
plan_end_time - plan_start_time, plan_start_time, plan_end_time)));
}
}
/*
@ -2130,7 +2141,13 @@ static void ExecutePlan(EState *estate, PlanState *planstate, CmdType operation,
}
/* Change DestReceiver's tmpContext to PerTupleMemoryContext to avoid memory leak. */
dest->tmpContext = GetPerTupleMemoryContext(estate);
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
if (sendTuples) {
dest->tmpContext = GetPerTupleMemoryContext(estate);
}
} else {
dest->tmpContext = GetPerTupleMemoryContext(estate);
}
// planstate->plan will be release if rollback excuted
bool is_saved_recursive_union_plan_nodeid = EXEC_IN_RECURSIVE_MODE(planstate->plan);

View File

@ -446,10 +446,9 @@ void ExecInitNodeSubPlan(Plan* node, EState* estate, PlanState* result)
PlanState* ExecInitNode(Plan* node, EState* estate, int e_flags)
{
PlanState* result = NULL;
MemoryContext old_context;
MemoryContext node_context;
MemoryContext query_context;
char context_name[NODENAMELEN];
MemoryContext old_context = estate->es_query_cxt;
MemoryContext node_context = estate->es_query_cxt;
MemoryContext query_context = estate->es_query_cxt;
int rc = 0;
/*
@ -468,24 +467,25 @@ PlanState* ExecInitNode(Plan* node, EState* estate, int e_flags)
gstrace_entry(GS_TRC_ID_ExecInitNode);
if (!StreamTopConsumerAmI())
rc = snprintf_s(context_name,
NODENAMELEN,
NODENAMELEN - 1,
"%s_%lu",
nodeTagToString(nodeTag(node)),
t_thrd.proc_cxt.MyProcPid);
else
rc = snprintf_s(context_name,
NODENAMELEN,
NODENAMELEN - 1,
"%s_%lu_%d",
nodeTagToString(nodeTag(node)),
estate->es_plannedstmt->queryId,
node->plan_node_id);
securec_check_ss(rc, "", "");
if (!ENABLE_SQL_FUSION_ENGINE(IUD_NODE_CONTEXT_REMOVE)) {
char context_name[NODENAMELEN];
if (!StreamTopConsumerAmI())
rc = snprintf_s(context_name,
NODENAMELEN,
NODENAMELEN - 1,
"%s_%lu",
nodeTagToString(nodeTag(node)),
t_thrd.proc_cxt.MyProcPid);
else
rc = snprintf_s(context_name,
NODENAMELEN,
NODENAMELEN - 1,
"%s_%lu_%d",
nodeTagToString(nodeTag(node)),
estate->es_plannedstmt->queryId,
node->plan_node_id);
securec_check_ss(rc, "", "");
if (g_instance.attr.attr_memory.enable_memory_limit) {
/*
* Create working memory for expression evaluation in this context.
*/
@ -494,18 +494,16 @@ PlanState* ExecInitNode(Plan* node, EState* estate, int e_flags)
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
} else {
node_context = estate->es_query_cxt;
query_context = estate->es_query_cxt;
// reassign the node context as we must run under this context.
estate->es_query_cxt = node_context;
/* Switch to Node Level Memory Context */
old_context = MemoryContextSwitchTo(node_context);
}
query_context = estate->es_query_cxt;
// reassign the node context as we must run under this context.
estate->es_query_cxt = node_context;
/* Switch to Node Level Memory Context */
old_context = MemoryContextSwitchTo(node_context);
/*
* Check whether this 'plan node' needs be processed in current DN exec_nodes,
* skip real initialization if it is not in exec-nodes
@ -611,18 +609,19 @@ PlanState* ExecInitNode(Plan* node, EState* estate, int e_flags)
}
}
/* Switch to OldContext */
MemoryContextSwitchTo(old_context);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_NODE_CONTEXT_REMOVE)) {
/* Switch to OldContext */
MemoryContextSwitchTo(old_context);
if (g_instance.attr.attr_memory.enable_memory_limit) {
/* Set the nodeContext */
result->nodeContext = node_context;
/* restore the per query context */
estate->es_query_cxt = query_context;
} else {
/* Set the nodeContext */
result->nodeContext = NULL;
}
/* restore the per query context */
estate->es_query_cxt = query_context;
result->ps_rownum = 0;
gstrace_exit(GS_TRC_ID_ExecInitNode);

View File

@ -829,7 +829,8 @@ static TupleDesc ExecTypeFromTLInternal(List* target_list, bool has_oid, bool sk
TupleDescInitEntryCollation(type_info, cur_resno, exprCollation((Node*)tle->expr));
/* mark dropped column, maybe we can find another way some day */
if (mark_dropped && strstr(tle->resname, "........pg.dropped.")) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_MARKDROP_REMOVE) && mark_dropped
&& strstr(tle->resname, "........pg.dropped.")) {
type_info->attrs[cur_resno - 1].attisdropped = true;
}

View File

@ -689,7 +689,11 @@ IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags)
/*
* get the scan type from the relation descriptor.
*/
ExecAssignScanType(&index_state->ss, CreateTupleDescCopy(RelationGetDescr(current_relation)));
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
ExecAssignScanType(&index_state->ss, RelationGetDescr(current_relation));
} else {
ExecAssignScanType(&index_state->ss, CreateTupleDescCopy(RelationGetDescr(current_relation)));
}
index_state->ss.ss_ScanTupleSlot->tts_tupleDescriptor->td_tam_ops = current_relation->rd_tam_ops;
/*

View File

@ -1360,7 +1360,7 @@ TupleTableSlot* ExecDelete(ItemPointer tupleid, Oid deletePartitionOid, int2 buc
fake_relation = result_relation_desc;
ldelete:
#ifdef PGXC
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && result_remote_rel) {
/* for merge into we have to provide the slot */
slot = ExecProcNodeDMLInXC(estate, planSlot, NULL);
@ -1422,7 +1422,7 @@ ldelete:
(void)MemoryContextSwitchTo(old_context);
}
/* Record deleted tupleid when target table is under cluster resizing */
if (RelationInClusterResizing(result_relation_desc) &&
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING) && RelationInClusterResizing(result_relation_desc) &&
!RelationInClusterResizingReadOnly(result_relation_desc)) {
ItemPointerData start_ctid;
ItemPointerData end_ctid;
@ -1432,17 +1432,21 @@ ldelete:
}
}
Bitmapset *modifiedIdxAttrs = NULL;
ExecIndexTuplesState exec_index_tuples_state;
exec_index_tuples_state.estate = estate;
exec_index_tuples_state.targetPartRel =
isPartitionedRelation(result_relation_desc->rd_rel) ? part_relation : NULL;
exec_index_tuples_state.p = isPartitionedRelation(result_relation_desc->rd_rel) ? partition : NULL;
exec_index_tuples_state.conflict = NULL;
tableam_tops_exec_delete_index_tuples(oldslot, fake_relation, node,
tupleid, exec_index_tuples_state, modifiedIdxAttrs);
if (oldslot) {
ExecDropSingleTupleTableSlot(oldslot);
if (ENABLE_SQL_FUSION_ENGINE(IUD_PENDING) && !RelationIsUstoreFormat(result_relation_desc)) {
break;
} else {
Bitmapset *modifiedIdxAttrs = NULL;
ExecIndexTuplesState exec_index_tuples_state;
exec_index_tuples_state.estate = estate;
exec_index_tuples_state.targetPartRel =
isPartitionedRelation(result_relation_desc->rd_rel) ? part_relation : NULL;
exec_index_tuples_state.p = isPartitionedRelation(result_relation_desc->rd_rel) ? partition : NULL;
exec_index_tuples_state.conflict = NULL;
tableam_tops_exec_delete_index_tuples(oldslot, fake_relation, node,
tupleid, exec_index_tuples_state, modifiedIdxAttrs);
if (oldslot) {
ExecDropSingleTupleTableSlot(oldslot);
}
}
} break;
@ -1519,13 +1523,13 @@ ldelete:
* take care of it later. We can't delete index tuples immediately
* anyway, since the tuple is still visible to other transactions.
*/
#ifdef PGXC
#ifdef ENABLE_MULTIPLE_NODES
}
#endif
}
end:;
if (canSetTag) {
#ifdef PGXC
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && result_remote_rel) {
estate->es_processed += result_remote_rel->rqs_processed;
} else {
@ -1534,7 +1538,7 @@ end:;
estate->es_modifiedRowHash = lappend(estate->es_modifiedRowHash, (void *)UInt64GetDatum(res_hash));
}
(estate->es_processed)++;
#ifdef PGXC
#ifdef ENABLE_MULTIPLE_NODES
}
#endif
}
@ -1554,7 +1558,7 @@ end:;
}
/* Process RETURNING if present */
#ifdef PGXC
#ifdef ENABLE_MULTIPLE_NODES
if (IS_PGXC_COORDINATOR && result_remote_rel != NULL && result_rel_info->ri_projectReturning != NULL) {
if (TupIsNull(slot))
return NULL;
@ -2787,10 +2791,12 @@ static TupleTableSlot* ExecModifyTable(PlanState* state)
subPlanState = node->mt_plans[node->mt_whichplan];
#ifdef PGXC
/* Initialize remote plan state */
remote_rel_state = node->mt_remoterels[node->mt_whichplan];
insert_remote_rel_state = node->mt_insert_remoterels[node->mt_whichplan];
update_remote_rel_state = node->mt_update_remoterels[node->mt_whichplan];
delete_remote_rel_state = node->mt_delete_remoterels[node->mt_whichplan];
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
remote_rel_state = node->mt_remoterels[node->mt_whichplan];
insert_remote_rel_state = node->mt_insert_remoterels[node->mt_whichplan];
update_remote_rel_state = node->mt_update_remoterels[node->mt_whichplan];
delete_remote_rel_state = node->mt_delete_remoterels[node->mt_whichplan];
}
#endif
junk_filter = result_rel_info->ri_junkFilter;
@ -2882,11 +2888,13 @@ static TupleTableSlot* ExecModifyTable(PlanState* state)
subPlanState = node->mt_plans[node->mt_whichplan];
#ifdef PGXC
/* Move to next remote plan */
estate->es_result_remoterel = node->mt_remoterels[node->mt_whichplan];
remote_rel_state = node->mt_remoterels[node->mt_whichplan];
insert_remote_rel_state = node->mt_insert_remoterels[node->mt_whichplan];
update_remote_rel_state = node->mt_update_remoterels[node->mt_whichplan];
delete_remote_rel_state = node->mt_delete_remoterels[node->mt_whichplan];
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
estate->es_result_remoterel = node->mt_remoterels[node->mt_whichplan];
remote_rel_state = node->mt_remoterels[node->mt_whichplan];
insert_remote_rel_state = node->mt_insert_remoterels[node->mt_whichplan];
update_remote_rel_state = node->mt_update_remoterels[node->mt_whichplan];
delete_remote_rel_state = node->mt_delete_remoterels[node->mt_whichplan];
}
#endif
junk_filter = result_rel_info->ri_junkFilter;
estate->es_result_relation_info = result_rel_info;
@ -3142,7 +3150,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
estate->deleteLimitCount = 0;
if (node->cacheEnt != NULL) {
if (node->cacheEnt != NULL && !ENABLE_SQL_FUSION_ENGINE(IUD_ERRORREL_REMOVE)) {
ErrorCacheEntry* entry = node->cacheEnt;
/* fetch query dop from this way but not query_dop */
@ -3179,10 +3187,12 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
mt_state->mt_plans = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
#ifdef PGXC
mt_state->mt_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_insert_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_update_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_delete_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
mt_state->mt_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_insert_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_update_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
mt_state->mt_delete_remoterels = (PlanState**)palloc0(sizeof(PlanState*) * nplans);
}
#endif
mt_state->resultRelInfo = estate->es_result_relations + node->resultRelIndex;
mt_state->mt_arowmarks = (List**)palloc0(sizeof(List*) * nplans);
@ -3244,19 +3254,27 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
* query.
*/
if (result_rel_info->ri_RelationDesc->rd_rel->relhasindex &&
if ((operation != CMD_DELETE || RelationIsUstoreFormat(result_rel_info->ri_RelationDesc)) &&
result_rel_info->ri_RelationDesc->rd_rel->relhasindex &&
result_rel_info->ri_IndexRelationDescs == NULL) {
#ifdef ENABLE_MOT
if (result_rel_info->ri_FdwRoutine == NULL || result_rel_info->ri_FdwRoutine->GetFdwType == NULL ||
result_rel_info->ri_FdwRoutine->GetFdwType() != MOT_ORC) {
#endif
/*
* Two cases of executing ExecOpenIndices:
* non-delete operation
* delete operation in ustore format
*/
ExecOpenIndices(result_rel_info, node->upsertAction != UPSERT_NONE);
#ifdef ENABLE_MOT
}
#endif
}
init_gtt_storage(operation, result_rel_info);
if (RELATION_IS_GLOBAL_TEMP(result_rel_info->ri_RelationDesc)) {
init_gtt_storage(operation, result_rel_info);
}
/* Now init the plan for this result rel */
estate->es_result_relation_info = result_rel_info;
if (sub_plan->type == T_Limit && operation == CMD_DELETE && IsLimitDML((Limit*)sub_plan)) {
@ -3282,7 +3300,8 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
* For update/delete/upsert case, we need further check if it is in cluster resizing, then
* we need open delete_delta rel for this target relation.
*/
if (operation == CMD_UPDATE || operation == CMD_DELETE || node->upsertAction == UPSERT_UPDATE) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING) &&
(operation == CMD_UPDATE || operation == CMD_DELETE || node->upsertAction == UPSERT_UPDATE)) {
Relation target_rel = estate->es_result_relation_info->ri_RelationDesc;
Assert(target_rel != NULL && mt_state->delete_delta_rel == NULL);
if (RelationInClusterResizing(target_rel) && !RelationInClusterResizingReadOnly(target_rel)) {
@ -3308,30 +3327,32 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
}
#ifdef PGXC
i = 0;
foreach (l, node->plans) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
i = 0;
foreach (l, node->plans) {
Plan* remoteplan = NULL;
if (node->remote_plans) {
remoteplan = (Plan*)list_nth(node->remote_plans, i);
mt_state->mt_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
Plan* remoteplan = NULL;
if (node->remote_plans) {
remoteplan = (Plan*)list_nth(node->remote_plans, i);
mt_state->mt_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
if (node->remote_insert_plans) {
remoteplan = (Plan*)list_nth(node->remote_insert_plans, i);
mt_state->mt_insert_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
if (node->remote_insert_plans) {
remoteplan = (Plan*)list_nth(node->remote_insert_plans, i);
mt_state->mt_insert_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
if (node->remote_update_plans) {
remoteplan = (Plan*)list_nth(node->remote_update_plans, i);
mt_state->mt_update_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
if (node->remote_update_plans) {
remoteplan = (Plan*)list_nth(node->remote_update_plans, i);
mt_state->mt_update_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
if (node->remote_delete_plans) {
remoteplan = (Plan*)list_nth(node->remote_delete_plans, i);
mt_state->mt_delete_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
if (node->remote_delete_plans) {
remoteplan = (Plan*)list_nth(node->remote_delete_plans, i);
mt_state->mt_delete_remoterels[i] = ExecInitNode(remoteplan, estate, eflags);
}
i++;
}
i++;
}
#endif
@ -3381,7 +3402,11 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
*/
tup_desc = ExecTypeFromTL(NIL, false);
ExecInitResultTupleSlot(estate, &mt_state->ps);
ExecAssignResultType(&mt_state->ps, tup_desc);
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
mt_state->ps.ps_ResultTupleSlot->tts_tupleDescriptor = tup_desc;
} else {
ExecAssignResultType(&mt_state->ps, tup_desc);
}
mt_state->ps.ps_ExprContext = NULL;
}
@ -3630,7 +3655,7 @@ ModifyTableState* ExecInitModifyTable(ModifyTable* node, EState* estate, int efl
* containing multiple ModifyTable nodes, all can share one such slot, so
* we keep it in the estate.
*/
if (estate->es_trig_tuple_slot == NULL) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_TRIGGER_REMOVE) && estate->es_trig_tuple_slot == NULL) {
result_rel_info = mt_state->resultRelInfo;
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, result_rel_info->ri_RelationDesc->rd_tam_ops);
}
@ -3735,7 +3760,9 @@ void ExecEndModifyTable(ModifyTableState* node)
for (i = 0; i < node->mt_nplans; i++) {
ExecEndNode(node->mt_plans[i]);
#ifdef PGXC
ExecEndNode(node->mt_remoterels[i]);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
ExecEndNode(node->mt_remoterels[i]);
}
#endif
}
}

View File

@ -179,7 +179,9 @@ void ExecVecMerge(VecModifyTableState* mtstate)
sub_plan_state = mtstate->mt_plans[mtstate->mt_whichplan];
/* Initialize remote plan state */
remote_rel_state = mtstate->mt_remoterels[mtstate->mt_whichplan];
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
remote_rel_state = mtstate->mt_remoterels[mtstate->mt_whichplan];
}
junkfilter = result_rel_info->ri_junkFilter;
result_relation_desc = result_rel_info->ri_RelationDesc;

View File

@ -533,7 +533,9 @@ VectorBatch* ExecVecModifyTable(VecModifyTableState* node)
sub_plan_stat = node->mt_plans[node->mt_whichplan];
#ifdef PGXC
/* Initialize remote plan state */
remote_rel_stat = node->mt_remoterels[node->mt_whichplan];
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING)) {
remote_rel_stat = node->mt_remoterels[node->mt_whichplan];
}
#endif
junk_filter = result_rel_info->ri_junkFilter;

View File

@ -4510,7 +4510,7 @@ TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid,
HeapTupleCopyBaseFromPage(&tp, page);
tmfd->xmin = HeapTupleHeaderGetXmin(page, tp.t_data);
if (RELATION_HAS_UIDS(relation) && HeapTupleHeaderHasUid(tp.t_data)) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_PENDING) && RELATION_HAS_UIDS(relation) && HeapTupleHeaderHasUid(tp.t_data)) {
uint64 tupleUid = HeapTupleGetUid(&tp);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
LockTupleUid(relation, tupleUid, ExclusiveLock,
@ -4686,7 +4686,9 @@ l1:
* Compute replica identity tuple before entering the critical section so
* we don't PANIC upon a memory allocation failure.
*/
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied, &identity);
if (XLogLogicalInfoActive()) {
old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied, &identity);
}
/*
* If this is the first possibly-multixact-able operation in the

View File

@ -647,17 +647,19 @@ TM_Result HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid, Buffer buff
Page page = BufferGetPage(buffer);
/* do not need sync, because snapshot is not used */
ereport(DEBUG1,
(errmsg("HeapTupleSatisfiesUpdate self(%u,%u) ctid(%u,%u) cur_xid " XID_FMT " xmin"
XID_FMT " xmax " XID_FMT " infomask %u",
ItemPointerGetBlockNumber(&htup->t_self),
ItemPointerGetOffsetNumber(&htup->t_self),
ItemPointerGetBlockNumber(&tuple->t_ctid),
ItemPointerGetOffsetNumber(&tuple->t_ctid),
GetCurrentTransactionIdIfAny(),
HeapTupleHeaderGetXmin(page, tuple),
HeapTupleHeaderGetXmax(page, tuple),
tuple->t_infomask)));
if (SHOW_DEBUG_MESSAGE()) {
ereport(DEBUG1,
(errmsg("HeapTupleSatisfiesUpdate self(%u,%u) ctid(%u,%u) cur_xid " XID_FMT " xmin"
XID_FMT " xmax " XID_FMT " infomask %u",
ItemPointerGetBlockNumber(&htup->t_self),
ItemPointerGetOffsetNumber(&htup->t_self),
ItemPointerGetBlockNumber(&tuple->t_ctid),
ItemPointerGetOffsetNumber(&tuple->t_ctid),
GetCurrentTransactionIdIfAny(),
HeapTupleHeaderGetXmin(page, tuple),
HeapTupleHeaderGetXmax(page, tuple),
tuple->t_infomask)));
}
restart:
if (!HeapTupleHeaderXminCommitted(tuple)) {
@ -1019,17 +1021,19 @@ static bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buf
TransactionIdStatus hintstatus;
Page page = BufferGetPage(buffer);
ereport(DEBUG1,
(errmsg("HeapTupleSatisfiesMVCC self(%d,%d) ctid(%d,%d) cur_xid %ld xmin %ld"
" xmax %ld csn %lu",
ItemPointerGetBlockNumber(&htup->t_self),
ItemPointerGetOffsetNumber(&htup->t_self),
ItemPointerGetBlockNumber(&tuple->t_ctid),
ItemPointerGetOffsetNumber(&tuple->t_ctid),
GetCurrentTransactionIdIfAny(),
HeapTupleHeaderGetXmin(page, tuple),
HeapTupleHeaderGetXmax(page, tuple),
snapshot->snapshotcsn)));
if (SHOW_DEBUG_MESSAGE()) {
ereport(DEBUG1,
(errmsg("HeapTupleSatisfiesMVCC self(%d,%d) ctid(%d,%d) cur_xid %ld xmin %ld"
" xmax %ld csn %lu",
ItemPointerGetBlockNumber(&htup->t_self),
ItemPointerGetOffsetNumber(&htup->t_self),
ItemPointerGetBlockNumber(&tuple->t_ctid),
ItemPointerGetOffsetNumber(&tuple->t_ctid),
GetCurrentTransactionIdIfAny(),
HeapTupleHeaderGetXmin(page, tuple),
HeapTupleHeaderGetXmax(page, tuple),
snapshot->snapshotcsn)));
}
/*
* Just valid for read-only transaction when u_sess->attr.attr_common.XactReadOnly is true.

View File

@ -392,7 +392,9 @@ int32 _bt_compare(Relation rel, int keysz, ScanKey scankey, Page page, OffsetNum
/*
* Check tuple has correct number of attributes.
*/
Assert(_bt_check_natts(rel, page, offnum));
if (!ENABLE_SQL_FUSION_ENGINE(IUD_CHECKSUM_REMOVE)) {
Assert(_bt_check_natts(rel, page, offnum));
}
/*
* Force result ">" if target item is first data item on an internal page

View File

@ -947,6 +947,23 @@ void ExtendCLOG(TransactionId newestXact, bool allowXlog)
{
int64 pageno = 0;
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE)) {
/*
* No work except at first XID of a page.
*/
if (TransactionIdToPgIndex(newestXact) != 0 && !TransactionIdEquals(newestXact, FirstNormalTransactionId))
return;
pageno = (int64)TransactionIdToPage(newestXact);
(void)LWLockAcquire(ClogCtl(pageno)->shared->control_lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
(void)ZeroCLOGPage(pageno, !t_thrd.xlog_cxt.InRecovery);
LWLockRelease(ClogCtl(pageno)->shared->control_lock);
return;
}
#ifdef PGXC
int64 maxPageNoInSeg = 0;
/*

View File

@ -159,11 +159,13 @@ RETRY:
}
PG_END_TRY();
}
ereport(DEBUG1,
(errmsg("Get CSN xid %lu cur_xid %lu xid %lu result %lu iscommit %d, recentLocalXmin %lu, isMvcc :%d, "
"RecentXmin: %lu",
transactionId, GetCurrentTransactionIdIfAny(), xid, result, isCommit,
t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin, isMvcc, u_sess->utils_cxt.RecentXmin)));
if (SHOW_DEBUG_MESSAGE()) {
ereport(DEBUG1,
(errmsg("Get CSN xid %lu cur_xid %lu xid %lu result %lu iscommit %d, recentLocalXmin %lu, isMvcc :%d, "
"RecentXmin: %lu",
transactionId, GetCurrentTransactionIdIfAny(), xid, result, isCommit,
t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin, isMvcc, u_sess->utils_cxt.RecentXmin)));
}
/*
* Cache it, but DO NOT cache status for unfinished transactions!

View File

@ -892,9 +892,12 @@ static void AssignTransactionId(TransactionState s)
ProcXactHashTableAdd(s->transactionId, t_thrd.proc->pgprocno);
}
#ifdef ENABLE_MULTIPLE_NODES
/* send my top transaction id to exec CN */
if (!isSubXact && IsConnFromCoord() && u_sess->need_report_top_xid)
ReportTopXid(s->transactionId);
#endif
if (!isSubXact)
instr_stmt_report_txid(s->transactionId);
@ -1270,7 +1273,9 @@ bool TransactionIdIsCurrentTransactionId(TransactionId xid)
*/
if (!TransactionIdIsNormal(xid))
return false;
if (ENABLE_SQL_FUSION_ENGINE(IUD_CODE_OPTIMIZE) && TransactionIdEquals(xid, GetTopTransactionIdIfAny())) {
return true;
}
/*
* We will return true for the Xid of the current subtransaction, any of
* its subcommitted children, any of its parents, or any of their

View File

@ -2023,14 +2023,20 @@ int FilePRead(File file, char* buffer, int amount, off_t offset, uint32 wait_eve
retry:
PROFILING_MDIO_START();
pgstat_report_waitevent(wait_event_info);
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pread(vfdcache[file].fd, buffer, (size_t)amount, offset);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
pgstat_report_waitevent(WAIT_EVENT_END);
PROFILING_MDIO_END_READ((uint32)amount, returnCode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
PROFILING_MDIO_START();
pgstat_report_waitevent(wait_event_info);
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pread(vfdcache[file].fd, buffer, (size_t)amount, offset);
pgstat_report_waitevent(WAIT_EVENT_END);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
PROFILING_MDIO_END_READ((uint32)amount, returnCode);
} else {
pgstat_report_waitevent(wait_event_info);
returnCode = pread(vfdcache[file].fd, buffer, (size_t)amount, offset);
pgstat_report_waitevent(WAIT_EVENT_END);
}
if (returnCode >= 0)
vfdcache[file].seekPos += returnCode;
@ -2117,12 +2123,16 @@ int FileWrite(File file, const char* buffer, int amount, off_t offset, int fastE
/* assign returnCode with buffer size */
returnCode = amount;
} else {
PROFILING_MDIO_START();
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pwrite(vfdcache[file].fd, buffer, (size_t)amount, offset);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
PROFILING_MDIO_END_WRITE((uint32)amount, returnCode);
if (ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
returnCode = pwrite(vfdcache[file].fd, buffer, (size_t)amount, offset);
} else {
PROFILING_MDIO_START();
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pwrite(vfdcache[file].fd, buffer, (size_t)amount, offset);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
PROFILING_MDIO_END_WRITE((uint32)amount, returnCode);
}
}
return returnCode;
}
@ -2533,12 +2543,18 @@ int FileSync(File file, uint32 wait_event_info)
if (returnCode < 0)
return returnCode;
pgstat_report_waitevent(wait_event_info);
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pg_fsync(vfdcache[file].fd);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
pgstat_report_waitevent(WAIT_EVENT_END);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
pgstat_report_waitevent(wait_event_info);
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
returnCode = pg_fsync(vfdcache[file].fd);
PGSTAT_END_TIME_RECORD(DATA_IO_TIME);
pgstat_report_waitevent(WAIT_EVENT_END);
} else {
pgstat_report_waitevent(wait_event_info);
returnCode = pg_fsync(vfdcache[file].fd);
pgstat_report_waitevent(WAIT_EVENT_END);
}
return returnCode;
}

View File

@ -680,7 +680,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
ereport(LOG, (errmsg("LockAcquire: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2,
lockMethodTable->lockModeNames[lockmode])));
#endif
instr_stmt_report_lock(LOCK_START, lockmode, locktag);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_START, lockmode, locktag);
}
/* Identify owner for lock */
if (!sessionLock) {
@ -734,11 +736,15 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
*/
if (locallock->nLocks > 0) {
GrantLockLocal(locallock, owner);
instr_stmt_report_lock(LOCK_END, lockmode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, lockmode);
}
return LOCKACQUIRE_ALREADY_HELD;
#ifdef PGXC
} else if (only_increment) {
instr_stmt_report_lock(LOCK_END, NoLock);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, NoLock);
}
/* User does not want to create new lock if it does not already exist */
return LOCKACQUIRE_NOT_AVAIL;
#endif
@ -819,7 +825,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
locallock->lock = NULL;
locallock->proclock = NULL;
GrantLockLocal(locallock, owner);
instr_stmt_report_lock(LOCK_END, lockmode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, lockmode);
}
return LOCKACQUIRE_OK;
}
}
@ -836,7 +844,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
BeginStrongLockAcquire(locallock, fasthashcode);
if (!FastPathTransferRelationLocks(lockMethodTable, locktag, hashcode)) {
AbortStrongLockAcquire();
instr_stmt_report_lock(LOCK_END, NoLock);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, NoLock);
}
if (reportMemoryError)
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
@ -867,7 +877,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
if (proclock == NULL) {
AbortStrongLockAcquire();
LWLockRelease(partitionLock);
instr_stmt_report_lock(LOCK_END, NoLock);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, NoLock);
}
if (reportMemoryError)
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction.")));
@ -978,7 +990,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
if (locallock->nLocks == 0) {
RemoveLocalLock(locallock);
}
instr_stmt_report_lock(LOCK_END, NoLock);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, NoLock);
}
return LOCKACQUIRE_NOT_AVAIL;
}
@ -1021,7 +1035,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
LWLockRelease(partitionLock);
instr_stmt_report_lock(LOCK_END, NoLock);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, NoLock);
}
ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("LockAcquire failed")));
}
PROCLOCK_PRINT("LockAcquire: granted", proclock);
@ -1049,7 +1065,9 @@ static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, LOCKMODE
LogAccessExclusiveLock(locktag->locktag_field1, locktag->locktag_field2);
}
instr_stmt_report_lock(LOCK_END, lockmode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LOCK_END, lockmode);
}
return LOCKACQUIRE_OK;
}

View File

@ -1316,8 +1316,10 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
break; /* got the lock */
}
instr_stmt_report_lock(LWLOCK_WAIT_START, mode, NULL, lock->tranche);
pgstat_report_waitevent(PG_WAIT_LWLOCK | lock->tranche);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
instr_stmt_report_lock(LWLOCK_WAIT_START, mode, NULL, lock->tranche);
pgstat_report_waitevent(PG_WAIT_LWLOCK | lock->tranche);
}
/*
* Ok, at this point we couldn't grab the lock on the first try. We
* cannot simply queue ourselves to the end of the list and wait to be
@ -1337,8 +1339,10 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
LWLockDequeueSelf(lock, mode);
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
}
break;
}
@ -1363,7 +1367,9 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
#ifdef LWLOCK_STATS
lwstats->block_count++;
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
}
for (;;) {
/* "false" means cannot accept cancel/die interrupt here. */
PGSemaphoreLock(&proc->sem, false);
@ -1388,9 +1394,11 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
Assert(nwaiters < MAX_BACKENDS);
}
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
}
LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
@ -1398,7 +1406,9 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
result = false;
}
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_REPORT_REMOVE)) {
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
}
forget_lwlock_acquire();

View File

@ -1839,7 +1839,9 @@ SMGR_READ_STATUS mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber block
}
}
(void)INSTR_TIME_SET_CURRENT(startTime);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(startTime);
}
TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode, reln->smgr_rnode.backend);
@ -1856,53 +1858,55 @@ SMGR_READ_STATUS mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber block
TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum, reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode, reln->smgr_rnode.backend, nbytes, BLCKSZ);
(void)INSTR_TIME_SET_CURRENT(endTime);
INSTR_TIME_SUBTRACT(endTime, startTime);
timeDiff = INSTR_TIME_GET_MICROSEC(endTime);
if (msgCount == 0) {
lstFile = reln->smgr_rnode.node.relNode;
lstDb = reln->smgr_rnode.node.dbNode;
lstSpc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_2(msgCount, sumPage, 1);
CONTINUOUS_ASSIGN_3(sumTime, minTime, maxTime, timeDiff);
} else if (msgCount % STAT_MSG_BATCH == 0 || lstFile != reln->smgr_rnode.node.relNode) {
PgStat_MsgFile msg;
errno_t rc;
msg.dbid = lstDb;
msg.spcid = lstSpc;
msg.fn = lstFile;
msg.rw = 'r';
msg.cnt = msgCount;
msg.blks = sumPage;
msg.tim = sumTime;
msg.lsttim = lstTime;
msg.mintim = minTime;
msg.maxtim = maxTime;
reportFileStat(&msg);
rc = memset_s(&msg, sizeof(PgStat_MsgFile), 0, sizeof(PgStat_MsgFile));
securec_check(rc, "", "");
CONTINUOUS_ASSIGN_2(msgCount, sumPage, 1);
sumTime = timeDiff;
if (lstFile != reln->smgr_rnode.node.relNode) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(endTime);
INSTR_TIME_SUBTRACT(endTime, startTime);
timeDiff = INSTR_TIME_GET_MICROSEC(endTime);
if (msgCount == 0) {
lstFile = reln->smgr_rnode.node.relNode;
lstDb = reln->smgr_rnode.node.dbNode;
lstSpc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_2(msgCount, sumPage, 1);
CONTINUOUS_ASSIGN_3(sumTime, minTime, maxTime, timeDiff);
} else if (msgCount % STAT_MSG_BATCH == 0 || lstFile != reln->smgr_rnode.node.relNode) {
PgStat_MsgFile msg;
errno_t rc;
msg.dbid = lstDb;
msg.spcid = lstSpc;
msg.fn = lstFile;
msg.rw = 'r';
msg.cnt = msgCount;
msg.blks = sumPage;
msg.tim = sumTime;
msg.lsttim = lstTime;
msg.mintim = minTime;
msg.maxtim = maxTime;
reportFileStat(&msg);
rc = memset_s(&msg, sizeof(PgStat_MsgFile), 0, sizeof(PgStat_MsgFile));
securec_check(rc, "", "");
CONTINUOUS_ASSIGN_2(msgCount, sumPage, 1);
sumTime = timeDiff;
if (lstFile != reln->smgr_rnode.node.relNode) {
lstFile = reln->smgr_rnode.node.relNode;
lstDb = reln->smgr_rnode.node.dbNode;
lstSpc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_3(sumTime, minTime, maxTime, timeDiff);
}
} else {
msgCount++;
sumPage++;
sumTime += timeDiff;
}
lstTime = timeDiff;
if (minTime > timeDiff) {
minTime = timeDiff;
}
if (maxTime < timeDiff) {
maxTime = timeDiff;
}
} else {
msgCount++;
sumPage++;
sumTime += timeDiff;
}
lstTime = timeDiff;
if (minTime > timeDiff) {
minTime = timeDiff;
}
if (maxTime < timeDiff) {
maxTime = timeDiff;
}
if (nbytes != BLCKSZ) {
@ -2093,7 +2097,9 @@ void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const
static THR_LOCAL Oid lst_db = InvalidOid;
static THR_LOCAL Oid lst_spc = InvalidOid;
(void)INSTR_TIME_SET_CURRENT(start_time);
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(start_time);
}
/* This assert is too expensive to have on normally ... */
#ifdef CHECK_WRITE_VS_EXTEND
@ -2124,51 +2130,53 @@ void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const
reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode,
reln->smgr_rnode.backend, nbytes, BLCKSZ);
}
(void)INSTR_TIME_SET_CURRENT(end_time);
INSTR_TIME_SUBTRACT(end_time, start_time);
time_diff = (PgStat_Counter)INSTR_TIME_GET_MICROSEC(end_time);
if (msg_count == 0) {
lst_file = reln->smgr_rnode.node.relNode;
lst_db = reln->smgr_rnode.node.dbNode;
lst_spc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_2(msg_count, sum_page, 1);
CONTINUOUS_ASSIGN_3(sum_time, min_time, max_time, time_diff);
} else if (lst_file != reln->smgr_rnode.node.relNode || msg_count % STAT_MSG_BATCH) {
PgStat_MsgFile msg;
errno_t rc = memset_s(&msg, sizeof(msg), 0, sizeof(msg));
securec_check(rc, "", "");
msg.dbid = lst_db;
msg.spcid = lst_spc;
msg.fn = lst_file;
msg.rw = 'w';
msg.cnt = msg_count;
msg.blks = sum_page;
msg.tim = sum_time;
msg.lsttim = lst_time;
msg.mintim = min_time;
msg.maxtim = max_time;
reportFileStat(&msg);
CONTINUOUS_ASSIGN_2(msg_count, sum_page, 1);
sum_time = time_diff;
if (lst_file != reln->smgr_rnode.node.relNode) {
if (!ENABLE_SQL_FUSION_ENGINE(IUD_INSTR_TIME_REMOVE)) {
(void)INSTR_TIME_SET_CURRENT(end_time);
INSTR_TIME_SUBTRACT(end_time, start_time);
time_diff = (PgStat_Counter)INSTR_TIME_GET_MICROSEC(end_time);
if (msg_count == 0) {
lst_file = reln->smgr_rnode.node.relNode;
lst_db = reln->smgr_rnode.node.dbNode;
lst_spc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_2(msg_count, sum_page, 1);
CONTINUOUS_ASSIGN_3(sum_time, min_time, max_time, time_diff);
} else if (lst_file != reln->smgr_rnode.node.relNode || msg_count % STAT_MSG_BATCH) {
PgStat_MsgFile msg;
errno_t rc = memset_s(&msg, sizeof(msg), 0, sizeof(msg));
securec_check(rc, "", "");
msg.dbid = lst_db;
msg.spcid = lst_spc;
msg.fn = lst_file;
msg.rw = 'w';
msg.cnt = msg_count;
msg.blks = sum_page;
msg.tim = sum_time;
msg.lsttim = lst_time;
msg.mintim = min_time;
msg.maxtim = max_time;
reportFileStat(&msg);
CONTINUOUS_ASSIGN_2(msg_count, sum_page, 1);
sum_time = time_diff;
if (lst_file != reln->smgr_rnode.node.relNode) {
lst_file = reln->smgr_rnode.node.relNode;
lst_db = reln->smgr_rnode.node.dbNode;
lst_spc = reln->smgr_rnode.node.spcNode;
CONTINUOUS_ASSIGN_3(sum_time, min_time, max_time, time_diff);
}
} else {
msg_count++;
sum_page++;
sum_time += time_diff;
}
lst_time = time_diff;
if (min_time > time_diff) {
min_time = time_diff;
}
if (max_time < time_diff) {
max_time = time_diff;
}
} else {
msg_count++;
sum_page++;
sum_time += time_diff;
}
lst_time = time_diff;
if (min_time > time_diff) {
min_time = time_diff;
}
if (max_time < time_diff) {
max_time = time_diff;
}
if (compressed) {
return;

View File

@ -214,6 +214,7 @@ typedef struct knl_session_attr_sql {
double default_limit_rows;
int sql_beta_feature;
int sql_fusion_engine;
/* hypo index */
bool enable_hypo_index;
bool hypopg_is_explain;

View File

@ -401,6 +401,7 @@ typedef struct FormatCallStack {
* PG_exception_stack first.
* ----------
*/
#ifdef ENABLE_GSTRACE
#define PG_TRY() \
do { \
sigjmp_buf* save_exception_stack = t_thrd.log_cxt.PG_exception_stack; \
@ -426,6 +427,29 @@ typedef struct FormatCallStack {
gstrace_tryblock_exit(false, oldTryCounter); \
} \
while (0)
#else /* !ENABLE_GSTRACE */
#define PG_TRY() \
do { \
sigjmp_buf* save_exception_stack = t_thrd.log_cxt.PG_exception_stack; \
ErrorContextCallback* save_context_stack = t_thrd.log_cxt.error_context_stack; \
sigjmp_buf local_sigjmp_buf; \
if (sigsetjmp(local_sigjmp_buf, 0) == 0) { \
t_thrd.log_cxt.PG_exception_stack = &local_sigjmp_buf
#define PG_CATCH() \
} \
else \
{ \
t_thrd.log_cxt.PG_exception_stack = save_exception_stack; \
t_thrd.log_cxt.error_context_stack = save_context_stack
#define PG_END_TRY() \
} \
t_thrd.log_cxt.PG_exception_stack = save_exception_stack; \
t_thrd.log_cxt.error_context_stack = save_context_stack; \
} \
while (0)
#endif
// ADIO means async direct io
#ifndef ENABLE_LITE_MODE

View File

@ -380,6 +380,22 @@ typedef enum {
SUBLINK_PULLUP_ENHANCED = 131072
} sql_beta_param;
typedef enum {
NO_SQL_FUSION_FEATURE = 0,
IUD_CHECKSUM_REMOVE = 8, /* do not use pg_checksum_page related functions */
IUD_NODE_CONTEXT_REMOVE = 16, /* do not create context for each node individually */
IUD_IS_SYSTEM_CLASS_REMOVE_PACKAGE = 32, /* remove IsPackageSchemaOid judgment in IsSystemClass */
IUD_ERRORREL_REMOVE = 64, /* do not use errorRel related functions */
IUD_BLOCK_CHAIN_REMOVE = 128, /* do not use blockchain related functions */
IUD_TRIGGER_REMOVE = 256, /* do not use trigger related functions */
IUD_MEMORY_CONTEXT_TRACK_REMOVE = 512, /* MemoryContext do not track and protect */
IUD_INSTR_TIME_REMOVE = 1024, /* remove INSTR_TIME related functions */
IUD_MARKDROP_REMOVE = 2048, /* remove some judgments */
IUD_CODE_OPTIMIZE = 4096, /* code optimization */
IUD_REPORT_REMOVE = 8192, /* remove some report-related statements */
IUD_PENDING = 16384 /* remove pending items */
} sql_fusion_engine;
typedef enum {
OFF_VECTOR_ENGINE,
FORCE_VECTOR_ENGINE,
@ -401,6 +417,9 @@ typedef enum {
#define ENABLE_SQL_BETA_FEATURE(feature) \
((bool)((uint)u_sess->attr.attr_sql.sql_beta_feature & feature))
#define ENABLE_SQL_FUSION_ENGINE(feature) \
((bool)((uint)u_sess->attr.attr_sql.sql_fusion_engine & feature))
#define PARTITION_OPFUSION_MAX_NUMA_NODE 4
#define PARTITION_ENABLE_CACHE_OPFUSION \
(ENABLE_SQL_BETA_FEATURE(PARTITION_OPFUSION) && \

View File

@ -615,7 +615,7 @@ extern TransactionId PartGetRelFrozenxid64(Partition part);
*/
#define RelationOpenSmgr(relation) \
do { \
if ((relation)->rd_smgr == NULL) \
if (unlikely((relation)->rd_smgr == NULL)) \
smgrsetowner(&((relation)->rd_smgr), smgropen((relation)->rd_node, (relation)->rd_backend)); \
} while (0)

View File

@ -40,9 +40,6 @@
#include "gstrace/funcs.comps.h"
#include "gstrace/gstrace_infra_int.h"
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
/* judge whether a char is digital */
#define isDigital(_ch) (((_ch) >= '0') && ((_ch) <= '9'))
@ -59,6 +56,11 @@
#define FUNCTION_IDX(traceId) (((traceId)&GS_TRC_FUNC_MASK) >> GS_TRC_FUNC_SHIFT)
#ifdef ENABLE_GSTRACE
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#define isTraceEnbled(pTrcCxt) ((pTrcCxt)->pTrcCfg != NULL && (pTrcCxt)->pTrcCfg->bEnabled)
#define FILL_TRACE_HADER(slotAddress, seq, numSlot) \
@ -112,6 +114,7 @@
} while (0)
static __thread int* gtCurTryCounter = NULL;
#endif
static trace_context* getTraceContext()
{
@ -120,12 +123,14 @@ static trace_context* getTraceContext()
return &TRC_GLOBALS;
}
#ifdef ENABLE_GSTRACE
static pthread_mutex_t* getTraceFileMutex()
{
static pthread_mutex_t fileMutex;
return &fileMutex;
}
#endif
// Rounds given size down to the nearest power of 2 (<= 2^n)
static uint64_t roundToNearestPowerOfTwo(uint64_t initialSize)
@ -304,6 +309,7 @@ static trace_msg_code attachTraceSharedMemLow(void** pTrcMem, const char* sMemNa
return TRACE_OK;
}
#ifdef ENABLE_GSTRACE
static trace_msg_code detachTraceSharedMemLow(trace_infra* pTrcMem)
{
if (pTrcMem != NULL && munmap(pTrcMem, pTrcMem->total_size) == -1) {
@ -311,6 +317,7 @@ static trace_msg_code detachTraceSharedMemLow(trace_infra* pTrcMem)
}
return TRACE_OK;
}
#endif
static trace_msg_code attachTraceBufferSharedMem(int key)
{
@ -346,6 +353,7 @@ static trace_msg_code attachTraceCfgSharedMem(int key)
return rc;
}
#ifdef ENABLE_GSTRACE
/* Attach to buffer shared memory if tracing has been enabled */
static void attachTraceBufferIfEnabled()
{
@ -504,6 +512,7 @@ int gstrace_init(int key)
return TRACE_OK;
}
#endif
static trace_msg_code createAndAttachTraceBuffer(int key, uint64_t bufferSize)
{
@ -654,6 +663,7 @@ trace_msg_code gstrace_stop(int key)
return TRACE_OK;
}
#ifdef ENABLE_GSTRACE
static bool isTraceIdRequired(const uint32_t rec_id)
{
trace_context* pTrcCxt = getTraceContext();
@ -952,6 +962,7 @@ void gstrace_tryblock_exit(bool inCatch, int* oldTryCounter)
}
gtCurTryCounter = oldTryCounter;
}
#endif
static trace_msg_code dump_trace_context(int fd)
{

View File

@ -5238,7 +5238,7 @@ static void check_global_variables()
}
}
#define BASE_PGXC_LIKE_MACRO_NUM 1399
#define BASE_PGXC_LIKE_MACRO_NUM 1394
static void check_pgxc_like_macros()
{
#ifdef BUILD_BY_CMAKE