From e930224d92fa05d97aa542fb7635bcfaabd1aedc Mon Sep 17 00:00:00 2001 From: gentle_hu Date: Sun, 20 Mar 2022 17:17:22 +0800 Subject: [PATCH] support any x group --- src/bin/gs_guc/pg_guc.cpp | 71 ++- src/common/backend/catalog/builtin_funcs.ini | 2 +- src/common/backend/utils/init/globals.cpp | 2 +- src/common/backend/utils/misc/guc.cpp | 124 +---- .../process/threadpool/knl_thread.cpp | 4 +- src/gausskernel/storage/replication/slot.cpp | 3 +- .../storage/replication/syncrep.cpp | 504 ++++++++++++++---- .../storage/replication/syncrep_gram.y | 19 +- .../storage/replication/walreceiverfuncs.cpp | 5 +- .../storage/replication/walsender.cpp | 44 +- .../rollback_catalog_maindb_92_606.sql | 37 ++ .../rollback_catalog_otherdb_92_606.sql | 37 ++ .../upgrade-post_catalog_maindb_92_606.sql | 38 ++ .../upgrade-post_catalog_otherdb_92_606.sql | 38 ++ src/include/knl/knl_thread.h | 7 +- src/include/replication/syncrep.h | 5 + src/include/replication/walsender_private.h | 7 +- .../regress/expected/sync_standy_names.out | 150 ++++++ src/test/regress/parallel_schedule0 | 1 + src/test/regress/sql/sync_standy_names.sql | 62 +++ 20 files changed, 914 insertions(+), 246 deletions(-) create mode 100644 src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback_catalog_maindb_92_606.sql create mode 100644 src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback_catalog_otherdb_92_606.sql create mode 100644 src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_606.sql create mode 100644 src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_606.sql create mode 100644 src/test/regress/expected/sync_standy_names.out create mode 100644 src/test/regress/sql/sync_standy_names.sql diff --git a/src/bin/gs_guc/pg_guc.cpp b/src/bin/gs_guc/pg_guc.cpp index 885c455da..116e260ca 100644 --- a/src/bin/gs_guc/pg_guc.cpp +++ b/src/bin/gs_guc/pg_guc.cpp @@ -1203,6 +1203,66 @@ static void CheckKeepSyncWindow(char** opt_lines, int idx) #endif +static void +parse_next_sync_groups(char **pgroup, char *result) +{ + if (**pgroup == '\0') { + result[0] = '\0'; + return; + } + + char *this_group = *pgroup; + char *p = *pgroup; + while (*p != '\0' && *p != ')') p++; + while (*p != '\0' && *p != ',') p++; + if (*p == ',') { + *p = '\0'; + p++; + } + + *pgroup = p; + errno_t rc = snprintf_s(result, MAX_VALUE_LEN, MAX_VALUE_LEN - 1, "'%s'", this_group); + securec_check_ss_c(rc, "\0", "\0"); + return; +} + +static int +transform_az_name(char *config_value, char *allAZString, int allAZStringBufLen, const char *data_dir) +{ + char *azString = NULL; + char *buf = allAZString; + int buflen = allAZStringBufLen; + + char *allgroup = xstrdup(config_value); + char *pgrp = allgroup + 1; // trim first "'" + char this_group[MAX_VALUE_LEN] = {0x00}; + + allgroup[strlen(allgroup) - 1] = '\0'; // trim last "'" + parse_next_sync_groups(&pgrp, this_group); + while (this_group[0] != '\0') { + azString = get_AZ_value(this_group, data_dir); + if (NULL == azString) { + GS_FREE(allgroup); + return FAILURE; + } + + int azStringLen = strlen(azString); + azString[0] = ' '; + azString[azStringLen - 1] = ','; + errno_t rc = strncpy_s(buf, buflen, azString, azStringLen); + securec_check_c(rc, "\0", "\0"); + buf += azStringLen; + buflen -= azStringLen; + GS_FREE(azString); + + parse_next_sync_groups(&pgrp, this_group); + } + GS_FREE(allgroup); + allAZString[0] = allAZString[strlen(allAZString) - 1] = '\''; + + return SUCCESS; +} + /* * @@GaussDB@@ * Brief : @@ -1282,17 +1342,16 @@ do_gucset(const char *action_type, const char *data_dir) // get AZ string if (NULL != config_value[i]) { - char *azString = NULL; - azString = get_AZ_value(config_value[i], data_dir); - if (NULL == azString) { - result_status = FAILURE; + char allAZString[MAX_VALUE_LEN] = {0x00}; + + result_status = transform_az_name(config_value[i], allAZString, MAX_VALUE_LEN, data_dir); + if (result_status == FAILURE) { continue; } tmpAZStr = xstrdup(config_value[i]); GS_FREE(config_value[i]); - config_value[i] = xstrdup(azString); - GS_FREE(azString); + config_value[i] = xstrdup(allAZString); } } diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index d489c8b2e..2599ae331 100755 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -8641,7 +8641,7 @@ ), AddFuncGroup( "pg_stat_get_wal_senders", 1, - AddBuiltinFunc(_0(3099), _1("pg_stat_get_wal_senders"), _2(0), _3(false), _4(true), _5(pg_stat_get_wal_senders), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(21, 20, 23, 25, 25, 25, 25, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 23, 25, 25), _22(21, 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(21, "pid", "sender_pid", "local_role", "peer_role", "peer_state", "state", "catchup_start", "catchup_end", "sender_sent_location", "sender_write_location", "sender_flush_location", "sender_replay_location", "receiver_received_location", "receiver_write_location", "receiver_flush_location", "receiver_replay_location", "sync_percent", "sync_state", "sync_priority", "sync_most_available", "channel"), _24(NULL), _25("pg_stat_get_wal_senders"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about currently active replication"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + AddBuiltinFunc(_0(3099), _1("pg_stat_get_wal_senders"), _2(0), _3(false), _4(true), _5(pg_stat_get_wal_senders), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(22, 20, 23, 25, 25, 25, 25, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 23, 23, 25, 25), _22(22, 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(22, "pid", "sender_pid", "local_role", "peer_role", "peer_state", "state", "catchup_start", "catchup_end", "sender_sent_location", "sender_write_location", "sender_flush_location", "sender_replay_location", "receiver_received_location", "receiver_write_location", "receiver_flush_location", "receiver_replay_location", "sync_percent", "sync_state", "sync_group", "sync_priority", "sync_most_available", "channel"), _24(NULL), _25("pg_stat_get_wal_senders"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about currently active replication"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), AddFuncGroup( "pg_stat_get_wlm_ec_operator_info", 1, diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index a8329d9ea..8b4b9cf8c 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -59,7 +59,7 @@ bool open_join_children = true; bool will_shutdown = false; /* hard-wired binary version number */ -const uint32 GRAND_VERSION_NUM = 92605; +const uint32 GRAND_VERSION_NUM = 92606; const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522; const uint32 UPSERT_WHERE_VERSION_NUM = 92514; diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index 161556969..580260590 100755 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -164,7 +164,6 @@ #include "utils/guc_memory.h" #include "utils/guc_network.h" #include "utils/guc_resource.h" -#include "common/config/cm_config.h" #ifndef PG_KRB_SRVTAB #define PG_KRB_SRVTAB "" @@ -517,9 +516,6 @@ static bool validate_conf_enum(struct config_generic *record, const char *name, int elevel, bool freemem, void *newvalue, void **newextra); #ifndef ENABLE_MULTIPLE_NODES -int init_gauss_cluster_config(void); -static bool AlterSystemSetCheckSyncStandbyNames(struct config_generic *record, const char *name, const char *value, - void *newextra); static void CheckAndGetAlterSystemSetParam(AlterSystemStmt* altersysstmt, char** outer_name, char** outer_value, struct config_generic** outer_record); static void FinishAlterSystemSet(GucContext context); @@ -8252,121 +8248,6 @@ static void CheckAlterSystemSetPrivilege(const char* name) } } -/* - * Obtain cluster information from cluster_static_config. - */ -int init_gauss_cluster_config(void) -{ - char path[MAXPGPATH] = {0}; - int err_no = 0; - int nRet = 0; - int status = 0; - uint32 nodeidx = 0; - struct stat statbuf {}; - - char* gausshome = gs_getenv_r("GAUSSHOME"); - check_backend_env(gausshome); - - nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/bin/%s", gausshome, STATIC_CONFIG_FILE); - securec_check_ss_c(nRet, "\0", "\0"); - - if (lstat(path, &statbuf) != 0) { - return 1; - } - - status = read_config_file(path, &err_no); - - if (status != 0) { - return 1; - } - - if (g_nodeHeader.node <= 0) { - free(g_node); - g_node = nullptr; - return 1; - } - - for (nodeidx = 0; nodeidx < g_node_num; nodeidx++) { - if (g_node[nodeidx].node == g_nodeHeader.node) { - g_currentNode = &g_node[nodeidx]; - break; - } - } - - if (g_currentNode == NULL) { - free(g_node); - g_node = nullptr; - return 1; - } - - if (get_dynamic_dn_role() != 0) { - // failed to get dynamic dn role - free(g_node); - g_node = nullptr; - return 1; - } - - return 0; -} - -static bool AlterSystemSetCheckSyncStandbyNames(struct config_generic *record, const char *name, const char *value, - void *newextra) -{ - char* newvalue = NULL; - bool res = false; - SyncRepConfigData *pconf = NULL; - char* data_dir = t_thrd.proc_cxt.DataDir; - char* p = NULL; - - if (value != NULL) { - newvalue = guc_strdup(DEBUG5, value); - } - if (!validate_conf_option(record, name, value, PGC_S_FILE, ERROR, false, &newvalue, &newextra)) { - goto ret; - } - - pconf = (SyncRepConfigData *)newextra; - - if (pconf == NULL || strcmp(value, "*") == 0) { - res = true; - goto ret; - } - - if (pconf->num_sync > pconf->nmembers) { - // The sync number must less or equals to the number of standby node names. - goto ret; - } - /* get current cluster information from cluster_staic_config */ - if (has_static_config()) { - if (init_gauss_cluster_config() != 0) { - res = true; - goto ret; - } - } else { - res = true; - goto ret; - } - - p = pconf->member_names; - for (int i = 1; i <= pconf->nmembers; i++) { - if (!CheckDataNameValue(p, data_dir)) { - goto ret; - } - p += strlen(p) + 1; - } - -ret: - if (newvalue != NULL) { - pfree(newvalue); - newvalue = NULL; - } - if (newextra != NULL) { - pfree(newextra); - newextra = NULL; - } - return res; -} - /* * Persist the configuration parameter value. * @@ -8424,10 +8305,7 @@ static void CheckAndGetAlterSystemSetParam(AlterSystemStmt* altersysstmt, "ALTER SYSTEM SET only support POSTMASTER-level, SIGHUP-level and BACKEND-level guc variable,\n" "and it must be allowed to set in postgresql.conf.", name))); - if ((strcmp(name, "synchronous_standby_names") == 0 && - !AlterSystemSetCheckSyncStandbyNames(record, name, value, newextra)) || - (strcmp(name, "synchronous_standby_names") != 0 && - !validate_conf_option(record, name, value, PGC_S_FILE, ERROR, true, NULL, &newextra))) { + if (!validate_conf_option(record, name, value, PGC_S_FILE, ERROR, true, NULL, &newextra)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid value for parameter \"%s\": \"%s\"", name, value))); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 64505a73e..068d74627 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -792,7 +792,7 @@ static void knl_t_replscanner_init(knl_t_replscanner_context* replscanner_cxt) static void knl_t_syncgram_init(knl_t_syncrepgram_context* syncrepgram_cxt) { - syncrepgram_cxt->syncrep_parse_result = NULL; + syncrepgram_cxt->syncrep_parse_result = NIL; } static void knl_t_syncrepscanner_init(knl_t_syncrepscanner_context* syncrepscanner_cxt) @@ -805,6 +805,8 @@ static void knl_t_syncrepscanner_init(knl_t_syncrepscanner_context* syncrepscann static void knl_t_syncrep_init(knl_t_syncrep_context* syncrep_cxt) { syncrep_cxt->SyncRepConfig = NULL; + syncrep_cxt->SyncRepConfigGroups = 0; + syncrep_cxt->SyncRepMaxPossib = 0; syncrep_cxt->announce_next_takeover = true; } diff --git a/src/gausskernel/storage/replication/slot.cpp b/src/gausskernel/storage/replication/slot.cpp index e90dcf216..49d31bb5f 100755 --- a/src/gausskernel/storage/replication/slot.cpp +++ b/src/gausskernel/storage/replication/slot.cpp @@ -983,7 +983,8 @@ void ReplicationSlotsComputeRequiredLSN(ReplicationSlotState *repl_slt_state) return; } if (t_thrd.syncrep_cxt.SyncRepConfig != NULL) { - for (i = t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1; i >= 0; i--) { + i = Min(t_thrd.syncrep_cxt.SyncRepMaxPossib, g_instance.attr.attr_storage.max_replication_slots) - 1; + for ( ; i >= 0; i--) { if (standby_slots_list[i] != InvalidXLogRecPtr) { repl_slt_state->quorum_min_required = standby_slots_list[i]; break; diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 98e9c69f5..1e49eeb4d 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -74,12 +74,20 @@ volatile bool most_available_sync = false; const int MAX_SYNC_REP_RETRY_COUNT = 1000; const int SYNC_REP_SLEEP_DELAY = 1000; +typedef enum SyncStandbyNumState { + STANDBIES_EMPTY = 0, + STANDBIES_NOT_ENOUGH, + STANDBIES_ENOUGH +} SyncStandbyNumState; +static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys); +static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state); + static void SyncRepQueueInsert(int mode); static bool SyncRepCancelWait(void); static void SyncRepWaitCompletionQueue(); static void SyncRepNotifyComplete(); -static int SyncRepGetStandbyPriority(void); +static void SyncRepGetStandbyGroupAndPriority(int* gid, int* prio); #ifndef ENABLE_MULTIPLE_NODES static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTime); #endif @@ -88,7 +96,6 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* write static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth); - static void SyncPaxosQueueInsert(void); static void SyncPaxosCancelWait(void); static int SyncPaxosWakeQueue(void); @@ -98,14 +105,57 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); static bool SyncPaxosQueueIsOrderedByLSN(void); #endif -static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys = NULL); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys = NULL); +static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys = NULL); +static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys = NULL); +static inline void free_sync_standbys_list(List* sync_standbys); +static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state); static int cmp_lsn(const void *a, const void *b); -static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum = 0); +static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state = STANDBIES_EMPTY); + +typedef struct TransContext { + /* for global */ + List* source; + bool has_star; + List *existers; + int SyncRepAllCount; + int SyncRepMinPossib; + + /* for each group*/ + SyncRepConfigData* conf; + bool is_star; + + /* results */ + bool success; + List *SyncRepConfig; + int SyncRepConfigGroups; + int SyncRepMaxPossib; +} TransContext; +static TransContext* create_transform_context(); +static void bind_transform_context(TransContext *tcxt, SyncRepConfigData *conf); +static void advance_transform_result(TransContext *tcxt); +static void finalize_transform_result(TransContext *tcxt); +static void clear_transform_context(TransContext *tcxt); +static void destroy_transform_context(TransContext *tcxt); +static bool analyze_star_and_num(TransContext *tcxt); +static bool analyze_duplicate_names(TransContext *tcxt); +static void transform_synchronous_standby_names(TransContext *tcxt); #define CATCHUP_XLOG_DIFF(ptr1, ptr2, amount) \ XLogRecPtrIsInvalid(ptr1) ? false : (XLByteDifference(ptr2, ptr1) < amount) +/* + * sync_standbys_list is a two-dimensional array means sync standby nodes in groups + * The structure is as follows: List[IntList[int, int,...], IntList[int, int, ...], ...] + */ +static inline void free_sync_standbys_list(List* sync_standbys) +{ + ListCell *lc = NULL; + foreach(lc, sync_standbys) { + list_free((List*)lfirst(lc)); + } + list_free(sync_standbys); +} + /* * Determine whether to wait for standby catching up, if requested by user. * @@ -481,15 +531,19 @@ void SyncRepCleanupAtProcExit(void) */ void SyncRepInitConfig(void) { + int group; int priority; /* * Determine if we are a potential sync standby and remember the result * for handling replies from standby. */ - priority = SyncRepGetStandbyPriority(); - if (t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority != priority) { + SyncRepGetStandbyGroupAndPriority(&group, &priority); + if (t_thrd.walsender_cxt.MyWalSnd->sync_standby_group != group || + t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority != priority) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + t_thrd.walsender_cxt.MyWalSnd->sync_standby_group = group; t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority = priority; /* @@ -499,8 +553,8 @@ void SyncRepInitConfig(void) SyncRepCheckSyncStandbyAlive(); LWLockRelease(SyncRepLock); - ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %d", - u_sess->attr.attr_common.application_name, priority))); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby group and priority: %d %d", + u_sess->attr.attr_common.application_name, group, priority))); } } @@ -558,13 +612,15 @@ void SyncRepReleaseWaiters(void) */ if (t_thrd.syncrep_cxt.announce_next_takeover && am_sync) { t_thrd.syncrep_cxt.announce_next_takeover = false; - if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { - ereport(LOG, (errmsg("standby \"%s\" is now a synchronous standby with priority %d", + if (GetWalsndSyncRepConfig(t_thrd.walsender_cxt.MyWalSnd)->syncrep_method == SYNC_REP_PRIORITY) { + ereport(LOG, (errmsg("standby \"%s\" is now a synchronous standby with group and priority: %d %d", u_sess->attr.attr_common.application_name, + t_thrd.walsender_cxt.MyWalSnd->sync_standby_group, t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority))); } else { - ereport(LOG, (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby", - u_sess->attr.attr_common.application_name))); + ereport(LOG, (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby in group: %d", + u_sess->attr.attr_common.application_name, + t_thrd.walsender_cxt.MyWalSnd->sync_standby_group))); } } @@ -609,6 +665,30 @@ void SyncRepReleaseWaiters(void) numapply, (uint32)(replayPtr >> 32), (uint32)replayPtr))); } +static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys) +{ + if (t_thrd.syncrep_cxt.SyncRepConfig == NULL) + return STANDBIES_ENOUGH; + + ListCell* lc = NULL; + List* per_group = NIL; + int gid = 0; + int alive = 0; + SyncStandbyNumState res = STANDBIES_ENOUGH; + + foreach(lc, sync_standbys) { + per_group = (List*)lfirst(lc); + if (list_length(per_group) < t_thrd.syncrep_cxt.SyncRepConfig[gid]->num_sync) + res = STANDBIES_NOT_ENOUGH; + + alive += list_length(per_group); + gid++; + } + + if (alive == 0) res = STANDBIES_EMPTY; + return res; +} + /* * Check whether to delay the time of entering most_available_sync mode. * @@ -617,7 +697,7 @@ void SyncRepReleaseWaiters(void) * when the number of sync standby is not qualified. * Otherwise it's set to true. */ -static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum) +static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state) { bool result = false; @@ -629,10 +709,11 @@ static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum) List *sync_standbys = NIL; bool am_sync = false; sync_standbys = SyncRepGetSyncStandbys(&am_sync); - curSyncNum = list_length(sync_standbys); + state = check_sync_standbys_num(sync_standbys); + free_sync_standbys_list(sync_standbys); } - if (curSyncNum == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) { + if (state == STANDBIES_ENOUGH) { if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start != 0) { t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = 0; t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = false; @@ -658,6 +739,25 @@ static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum) return result; } +/* + * In a particular scenario, when most_available_sync is true, primary only wait + * the alive sync standbys, even if the quantity does not meet the configuration + * requirements. + */ +static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state) +{ + *state = check_sync_standbys_num(sync_standbys); + + if (*state == STANDBIES_ENOUGH) { + DelayIntoMostAvaSync(false, STANDBIES_ENOUGH); // just for refresh keep_sync_window if needed + return true; + } + if (t_thrd.walsender_cxt.WalSndCtl->most_available_sync && !DelayIntoMostAvaSync(false, *state)) { + return true; + } + return false; +} + /* * Calculate the synced Receive, Write, Flush and Apply positions among sync standbys. * @@ -683,14 +783,13 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP /* * Quick exit if we are not managing a sync standby (or not check for check_am_sync is false) * or there are not enough synchronous standbys. - * but in a particular scenario, when most_available_sync is true, primary only wait the alive sync standbys - * if list_length(sync_standbys) doesn't satisfy t_thrd.syncrep_cxt.SyncRepConfig->num_sync. + * or most_available_sync is working and allow some standby nodes to be missing. */ - int cur_num_sync_standbys = list_length(sync_standbys); - if ((!(*am_sync) && check_am_sync) || t_thrd.syncrep_cxt.SyncRepConfig == NULL || - ((!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || DelayIntoMostAvaSync(false, cur_num_sync_standbys)) && - cur_num_sync_standbys < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) { - list_free(sync_standbys); + SyncStandbyNumState state; + if ((!(*am_sync) && check_am_sync) || + t_thrd.syncrep_cxt.SyncRepConfig == NULL || + !judge_sync_standbys_num(sync_standbys, &state)) { + free_sync_standbys_list(sync_standbys); return false; } @@ -707,20 +806,29 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced * positions even in a quorum-based sync replication. */ - if (list_length(sync_standbys) == 0) { + if (state == STANDBIES_EMPTY) { /* deal with sync standbys list is empty when most available sync mode is on */ *writePtr = GetXLogWriteRecPtr(); *flushPtr = GetFlushRecPtr(); *receivePtr = *writePtr; *replayPtr = *flushPtr; - } else if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { - SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys); - } else if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_QUORUM) { - SyncRepGetNthLatestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys, - t_thrd.syncrep_cxt.SyncRepConfig->num_sync); + } else { + int i = 0; + ListCell *lc = NULL; + foreach(lc, sync_standbys) { + List *per_group = (List*)lfirst(lc); + if (per_group == NIL) { + /* do nothing, this group of sync standbys are all offline or no sync standbys. */ + } else if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) { + SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, per_group); + } else { + SyncRepGetNthLatestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, + per_group, t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync); + } + i++; + } } - - list_free(sync_standbys); + free_sync_standbys_list(sync_standbys); return true; } @@ -742,8 +850,8 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi /* Get standbys that are considered as synchronous at this moment. */ sync_standbys = SyncRepGetSyncStandbys(&am_sync, &catchup_standbys); /* Skip here if there is at lease one sync standby, or no standby in catchup. */ - if (list_length(sync_standbys) > 0 || list_length(catchup_standbys) == 0) { - list_free(sync_standbys); + if (check_sync_standbys_num(sync_standbys) != STANDBIES_EMPTY || list_length(catchup_standbys) == 0) { + free_sync_standbys_list(sync_standbys); list_free(catchup_standbys); return false; } @@ -766,7 +874,7 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi } } - list_free(sync_standbys); + free_sync_standbys_list(sync_standbys); list_free(catchup_standbys); return true; } @@ -860,10 +968,14 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr } /* Get Nth latest Write, Flush, Apply positions */ - *receivePtr = receive_array[nth - 1]; - *writePtr = write_array[nth - 1]; - *flushPtr = flush_array[nth - 1]; - *replayPtr = apply_array[nth - 1]; + if (XLogRecPtrIsInvalid(*writePtr) || XLByteLE(write_array[nth - 1], *writePtr)) + *writePtr = write_array[nth - 1]; + if (XLogRecPtrIsInvalid(*flushPtr) || XLByteLE(flush_array[nth - 1], *flushPtr)) + *flushPtr = flush_array[nth - 1]; + if (XLogRecPtrIsInvalid(*receivePtr) || XLByteLE(receive_array[nth - 1], *receivePtr)) + *receivePtr = receive_array[nth - 1]; + if (XLogRecPtrIsInvalid(*replayPtr) || XLByteLE(apply_array[nth - 1], *replayPtr)) + *replayPtr = apply_array[nth - 1]; pfree(receive_array); receive_array = NULL; @@ -893,47 +1005,56 @@ static int cmp_lsn(const void *a, const void *b) /* * Check if we are in the list of sync standbys, and if so, determine - * priority sequence. Return priority if set, or zero to indicate that + * priority sequence. Return groupid and priority if set, or zero to indicate that * we are not a potential sync standby. * * Compare the parameter SyncRepStandbyNames against the application_name * for this WALSender, or allow any name if we find a wildcard "*". */ -static int SyncRepGetStandbyPriority(void) +static void SyncRepGetStandbyGroupAndPriority(int* gid, int* prio) { const char *standby_name = NULL; + int group; int priority; bool found = false; + *gid = 0; + *prio = 0; + /* * Since synchronous cascade replication is not allowed, we always set the * priority of cascading walsender to zero. */ if (AM_WAL_STANDBY_SENDER || AM_WAL_SHARE_STORE_SENDER || AM_WAL_HADR_DNCN_SENDER || AM_WAL_DB_SENDER) - return 0; + return; if (!SyncStandbysDefined() || t_thrd.syncrep_cxt.SyncRepConfig == NULL || !SyncRepRequested()) - return 0; + return; - standby_name = t_thrd.syncrep_cxt.SyncRepConfig->member_names; - for (priority = 1; priority <= t_thrd.syncrep_cxt.SyncRepConfig->nmembers; priority++) { - if (pg_strcasecmp(standby_name, u_sess->attr.attr_common.application_name) == 0 || - strcmp(standby_name, "*") == 0) { - found = true; - break; + for (group = 0; group < t_thrd.syncrep_cxt.SyncRepConfigGroups && !found; group++) { + standby_name = t_thrd.syncrep_cxt.SyncRepConfig[group]->member_names; + for (priority = 1; priority <= t_thrd.syncrep_cxt.SyncRepConfig[group]->nmembers; priority++) { + if (pg_strcasecmp(standby_name, u_sess->attr.attr_common.application_name) == 0 || + strcmp(standby_name, "*") == 0) { + Assert(!(group > 1 && strcmp(standby_name, "*") == 0)); + found = true; + break; + } + standby_name += strlen(standby_name) + 1; } - standby_name += strlen(standby_name) + 1; } if (!found) { - return 0; + return; } /* * In quorum-based sync replication, all the standbys in the list * have the same priority, one. */ - return (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; + *gid = group - 1; + *prio = (t_thrd.syncrep_cxt.SyncRepConfig[group - 1]->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; + return; } /* @@ -1154,9 +1275,16 @@ List *SyncRepGetSyncStandbys(bool *am_sync, List** catchup_standbys) if (t_thrd.syncrep_cxt.SyncRepConfig == NULL) return NIL; - return (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - ? SyncRepGetSyncStandbysPriority(am_sync, catchup_standbys) - : SyncRepGetSyncStandbysQuorum(am_sync, catchup_standbys); + List* results = NIL; + for(int i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { + if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) { + results = lappend(results, SyncRepGetSyncStandbysPriority(am_sync, i,catchup_standbys)); + } else { + results = lappend(results, SyncRepGetSyncStandbysQuorum(am_sync, i, catchup_standbys)); + } + } + + return results; } /* @@ -1169,13 +1297,13 @@ List *SyncRepGetSyncStandbys(bool *am_sync, List** catchup_standbys) * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys) +static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys) { List *result = NIL; int i; volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */ - Assert(t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_QUORUM); for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) { walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i]; @@ -1185,7 +1313,7 @@ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (walsnd->sync_standby_priority == 0 || walsnd->sync_standby_group != groupid) continue; if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) && @@ -1230,7 +1358,7 @@ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ -static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys) +static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys) { List *result = NIL; List *pending = NIL; @@ -1242,9 +1370,9 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb bool am_in_pending = false; volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */ - Assert(t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); + Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_PRIORITY); - lowest_priority = t_thrd.syncrep_cxt.SyncRepConfig->nmembers; + lowest_priority = t_thrd.syncrep_cxt.SyncRepConfig[groupid]->nmembers; next_highest_priority = lowest_priority + 1; /* @@ -1261,7 +1389,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb /* Must be synchronous */ this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) + if (this_priority == 0 || walsnd->sync_standby_group != groupid) continue; if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) && @@ -1286,7 +1414,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb result = lappend_int(result, i); if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd) *am_sync = true; - if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) { + if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { list_free(pending); return result; /* Exit if got enough sync standbys */ } @@ -1311,7 +1439,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb * Consider all pending standbys as sync if the number of them plus * already-found sync ones is lower than the configuration requests. */ - if (list_length(result) + list_length(pending) <= t_thrd.syncrep_cxt.SyncRepConfig->num_sync) { + if (list_length(result) + list_length(pending) <= t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { bool needfree = (result != NIL && pending != NIL); /* @@ -1355,9 +1483,9 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb /* * We should always exit here after the scan of pending list * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. + * reach SyncRepConfig[groupid]->num_sync. */ - if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) { + if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { list_free(pending); return result; /* Exit if got enough sync standbys */ } @@ -1811,6 +1939,184 @@ static bool SyncPaxosQueueIsOrderedByLSN(void) } #endif +/* + * ================================================================= + * Analyze and transform GUC param synchronous_standby_names content + * Create result in session MEMORY_CONTEXT_STORAGE session. + * ================================================================= + */ +static TransContext* create_transform_context() +{ + MemoryContext old_context = MemoryContextSwitchTo(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE)); + TransContext* tcxt = (TransContext*)palloc(sizeof(TransContext)); + (void)MemoryContextSwitchTo(old_context); + + tcxt->source = t_thrd.syncrepgram_cxt.syncrep_parse_result; + tcxt->SyncRepConfigGroups = list_length(tcxt->source); + Assert(tcxt->SyncRepConfigGroups > 0); + tcxt->success = true; + tcxt->has_star = false; + tcxt->existers = NIL; + tcxt->conf = NULL; + tcxt->is_star = false; + + tcxt->SyncRepConfig = NIL; + tcxt->SyncRepAllCount = 0; + tcxt->SyncRepMaxPossib = MAX_INT32; + tcxt->SyncRepMinPossib = 0; + + return tcxt; +} + +static void bind_transform_context(TransContext *tcxt, SyncRepConfigData *conf) +{ + tcxt->conf = conf; + tcxt->is_star = false; +} + +static void advance_transform_result(TransContext *tcxt) +{ + tcxt->SyncRepMinPossib += tcxt->conf->num_sync; + if (tcxt->has_star) { + tcxt->SyncRepAllCount = -1; + tcxt->SyncRepMaxPossib = tcxt->SyncRepMinPossib; + } else { + tcxt->SyncRepAllCount += tcxt->conf->nmembers; + tcxt->SyncRepMaxPossib = Min(tcxt->SyncRepMaxPossib, tcxt->conf->nmembers - tcxt->conf->num_sync); + } +} + +static void finalize_transform_result(TransContext *tcxt) +{ + if (!tcxt->success) { + clear_transform_context(tcxt); + return; + } + + MemoryContext old_context = MemoryContextSwitchTo(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE)); + SyncRepConfigData* conf = NULL; + ListCell* lc = NULL; + SyncRepConfigData* result = NULL; + errno_t rc; + foreach(lc, tcxt->source) { + conf = (SyncRepConfigData*)lfirst(lc); + result = (SyncRepConfigData*)palloc(conf->config_size); + rc = memcpy_s(result, conf->config_size, conf, conf->config_size); + securec_check(rc, "", ""); + tcxt->SyncRepConfig = lappend(tcxt->SyncRepConfig, result); + } + (void)MemoryContextSwitchTo(old_context); + + if (!tcxt->has_star) { + tcxt->SyncRepMaxPossib = tcxt->SyncRepAllCount - tcxt->SyncRepMaxPossib; + } + + clear_transform_context(tcxt); +} + +/* + * clear the transform context, except results. + */ +static void clear_transform_context(TransContext *tcxt) +{ + tcxt->source = NIL; + tcxt->conf = NULL; + list_free(tcxt->existers); + tcxt->existers = NIL; +} + +static void destroy_transform_context(TransContext *tcxt) +{ + list_free(tcxt->existers); + list_free_deep(tcxt->SyncRepConfig); + pfree(tcxt); +} + +static bool analyze_star_and_num(TransContext *tcxt) +{ + SyncRepConfigData *conf = tcxt->conf; + + char *p = conf->member_names; + for (int i = 0; i < conf->nmembers; i++) { + if (strcmp(p, "*") != 0) { + p += strlen(p) + 1; + continue; + } + + if (tcxt->SyncRepConfigGroups > 1) { + GUC_check_errdetail("'*' is not support when sync standby strategy is combinated."); + return false; + } + if (conf->nmembers > 1) { + GUC_check_errdetail("Please use the '*' separately, otherwise, it is semantic ambiguity."); + return false; + } + tcxt->is_star = true; + tcxt->has_star = true; + } + + if (!tcxt->is_star && conf->num_sync > conf->nmembers) { + GUC_check_errdetail("The sync number must less or equals to the number of standby node names."); + return false; + } + + return true; +} + +static bool analyze_duplicate_names(TransContext *tcxt) +{ + if (tcxt->is_star) + return true; + + SyncRepConfigData *conf = tcxt->conf; + char *p = conf->member_names; + char *exister = NULL; + + ListCell *lc = NULL; + for (int i = 0; i < conf->nmembers; i++) { + foreach(lc, tcxt->existers) { + exister = (char*)lfirst(lc); + if (pg_strcasecmp(p, exister) == 0) { + GUC_check_errdetail("Duplicate standby node name: %s", p); + return false; + } + } + tcxt->existers = lappend(tcxt->existers, p); + p += strlen(p) + 1; + } + return true; +} + +static void transform_synchronous_standby_names(TransContext* tcxt) +{ + if (tcxt->SyncRepConfigGroups > SYNC_REP_MAX_GROUPS) { + GUC_check_errdetail("Too much groups. Please no more than %d.", SYNC_REP_MAX_GROUPS); + tcxt->success = false; + clear_transform_context(tcxt); + return; + } + + ListCell* lc = NULL; + foreach(lc, t_thrd.syncrepgram_cxt.syncrep_parse_result) { + bind_transform_context(tcxt, (SyncRepConfigData*)lfirst(lc)); + + if (tcxt->conf->syncrep_method == SYNC_REP_PRIORITY && tcxt->SyncRepConfigGroups > 1) { + GUC_check_errdetail("FIRST rule is not support when sync standby strategy is combinated."); + tcxt->success = false; + break; + } + if (!analyze_star_and_num(tcxt) || + !analyze_duplicate_names(tcxt)) { + tcxt->success = false; + break; + } + + advance_transform_result(tcxt); + } + + finalize_transform_result(tcxt); +} + /* * =========================================================== * Synchronous Replication functions executed by any process @@ -1818,41 +2124,39 @@ static bool SyncPaxosQueueIsOrderedByLSN(void) */ bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { - errno_t rc = EOK; - if (*newval != NULL && (*newval)[0] != '\0') { int parse_rc; - SyncRepConfigData *pconf = NULL; syncrep_scanner_yyscan_t yyscanner; /* Reset communication variables to ensure a fresh start */ - t_thrd.syncrepgram_cxt.syncrep_parse_result = NULL; + t_thrd.syncrepgram_cxt.syncrep_parse_result = NIL; /* Parse the synchronous_standby_names string */ yyscanner = syncrep_scanner_init(*newval); parse_rc = syncrep_yyparse(yyscanner); syncrep_scanner_finish(yyscanner); - if (parse_rc != 0 || t_thrd.syncrepgram_cxt.syncrep_parse_result == NULL) { + if (parse_rc != 0 || t_thrd.syncrepgram_cxt.syncrep_parse_result == NIL) { GUC_check_errcode(ERRCODE_SYNTAX_ERROR); GUC_check_errdetail("synchronous_standby_names parser failed"); return false; } - /* GUC extra value must be malloc'd, not palloc'd */ - pconf = (SyncRepConfigData *)MemoryContextAlloc(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE), - t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size); - if (pconf == NULL) + /* + * analyze and transform synchronous_standby_names content, + * make result in session MEMORY_CONTEXT_STORAGE. + */ + TransContext* tcxt = create_transform_context(); + transform_synchronous_standby_names(tcxt); + if (!tcxt->success) { + destroy_transform_context(tcxt); return false; - rc = memcpy_s(pconf, t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size, - t_thrd.syncrepgram_cxt.syncrep_parse_result, - t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size); - securec_check(rc, "", ""); + } - *extra = (void *)pconf; + *extra = (void *)tcxt; if (t_thrd.syncrepgram_cxt.syncrep_parse_result) { - pfree(t_thrd.syncrepgram_cxt.syncrep_parse_result); - t_thrd.syncrepgram_cxt.syncrep_parse_result = NULL; + list_free_deep(t_thrd.syncrepgram_cxt.syncrep_parse_result); + t_thrd.syncrepgram_cxt.syncrep_parse_result = NIL; } /* @@ -1875,16 +2179,36 @@ void assign_synchronous_standby_names(const char *newval, void *extra) * it should be safe to know the latest rep config ASAP for all sessions. * If this assumption no longer holds, please move it to session level. */ - pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig); - - if (extra != NULL) { - SyncRepConfigData *pconf = (SyncRepConfigData *)extra; - errno_t rc = EOK; - t_thrd.syncrep_cxt.SyncRepConfig = (SyncRepConfigData *)MemoryContextAlloc( - THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE), pconf->config_size); - rc = memcpy_s(t_thrd.syncrep_cxt.SyncRepConfig, pconf->config_size, pconf, pconf->config_size); - securec_check(rc, "", ""); + for (int i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { + pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig[i]); } + pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig); + t_thrd.syncrep_cxt.SyncRepConfigGroups = 0; + t_thrd.syncrep_cxt.SyncRepMaxPossib = 0; + + if (extra == NULL) + return; + + TransContext* tcxt = (TransContext*)extra; + ListCell* lc = NULL; + int i = 0; + errno_t rc = EOK; + MemoryContext old_context = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE)); + t_thrd.syncrep_cxt.SyncRepConfigGroups = tcxt->SyncRepConfigGroups; + t_thrd.syncrep_cxt.SyncRepMaxPossib = tcxt->SyncRepMaxPossib; + t_thrd.syncrep_cxt.SyncRepConfig = (SyncRepConfigData **)palloc( + tcxt->SyncRepConfigGroups * sizeof(SyncRepConfigData*)); + + foreach(lc, tcxt->SyncRepConfig) { + SyncRepConfigData* pconf = (SyncRepConfigData*)lfirst(lc); + t_thrd.syncrep_cxt.SyncRepConfig[i] = (SyncRepConfigData*)palloc(pconf->config_size); + rc = memcpy_s(t_thrd.syncrep_cxt.SyncRepConfig[i], pconf->config_size, pconf, pconf->config_size); + securec_check(rc, "", ""); + + i++; + } + + (void)MemoryContextSwitchTo(old_context); } void assign_synchronous_commit(int newval, void *extra) diff --git a/src/gausskernel/storage/replication/syncrep_gram.y b/src/gausskernel/storage/replication/syncrep_gram.y index bfcb29d32..90f8fc91b 100644 --- a/src/gausskernel/storage/replication/syncrep_gram.y +++ b/src/gausskernel/storage/replication/syncrep_gram.y @@ -66,8 +66,8 @@ static void syncrep_yyerror(YYLTYPE *yylloc, %token NAME NUM JUNK ANY FIRST -%type result standby_config -%type standby_list +%type standby_config_simplify standby_config_complete +%type result standby_config standby_config_combination standby_list %type standby_name %start result @@ -78,9 +78,22 @@ result: ; standby_config: + standby_config_simplify { $$ = list_make1($1); } + | standby_config_combination { $$ = $1; } + ; + +standby_config_combination: + standby_config_complete { $$ = list_make1($1); } + | standby_config_combination ',' standby_config_complete { $$ = lappend($1, $3); } + ; + +standby_config_simplify: standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); } | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); pfree($1); } - | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); pfree($2); } + ; + +standby_config_complete: + ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); pfree($2); } | FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); pfree($2); } ; diff --git a/src/gausskernel/storage/replication/walreceiverfuncs.cpp b/src/gausskernel/storage/replication/walreceiverfuncs.cpp index a165ab446..42172bd34 100755 --- a/src/gausskernel/storage/replication/walreceiverfuncs.cpp +++ b/src/gausskernel/storage/replication/walreceiverfuncs.cpp @@ -1270,8 +1270,9 @@ void GetMinLsnRecordsFromHadrCascadeStandby(void) qsort(standbyApplyList, g_instance.attr.attr_storage.max_wal_senders, sizeof(XLogRecPtr), cmp_min_lsn); applyLoc = GetXLogReplayRecPtr(NULL, &ReplayReadPtr); t_thrd.walreceiver_cxt.reply_message->applyRead = ReplayReadPtr; - for (i = t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1; i >= 0; i--) { - if (i < t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1) { + int min_require = Min(g_instance.attr.attr_storage.max_wal_senders, t_thrd.syncrep_cxt.SyncRepMaxPossib); + for (i = min_require - 1 ; i >= 0; i--) { + if (i < min_require - 1) { needReport = true; } if (standbyReceiveList[i] != InvalidXLogRecPtr) { diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 87d685a58..c69704126 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -4234,6 +4234,7 @@ static void InitWalSnd(void) securec_check(rc, "", ""); rc = memset_s((void *)&walsnd->wal_sender_channel, sizeof(ReplConnInfo), 0, sizeof(ReplConnInfo)); securec_check(rc, "", ""); + walsnd->sync_standby_group = 0; walsnd->sync_standby_priority = 0; walsnd->index = i; walsnd->log_ctrl.sleep_time = 0; @@ -5329,8 +5330,7 @@ bool WalSndInProgress(int type) bool WalSndQuorumInProgress(int type) { int i; - int num = 0; - int num_sync = t_thrd.syncrep_cxt.SyncRepConfig->num_sync; + int* nums = (int*)palloc0(t_thrd.syncrep_cxt.SyncRepConfigGroups * sizeof(int)); for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ @@ -5338,15 +5338,19 @@ bool WalSndQuorumInProgress(int type) SpinLockAcquire(&walsnd->mutex); if (walsnd->pid != 0 && walsnd->pid != t_thrd.proc_cxt.MyProcPid && ((walsnd->sendRole & type) == walsnd->sendRole)) { - num++; + nums[walsnd->sync_standby_group]++; } SpinLockRelease(&walsnd->mutex); } - if (num_sync <= num) { - return true; - } else { - return false; + + for (i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { + if (t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync > nums[i]) { + pfree(nums); + return false; + } } + pfree(nums); + return true; } bool WalSndAllInProgressForMainStandby(int type) @@ -5810,13 +5814,14 @@ Datum gs_paxos_stat_replication(PG_FUNCTION_ARGS) */ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 21 +#define PG_STAT_GET_WAL_SENDERS_COLS 22 TupleDesc tupdesc; int *sync_priority = NULL; int i = 0; volatile HaShmemData *hashmdata = t_thrd.postmaster_cxt.HaShmData; List *sync_standbys = NIL; + ListCell *lc = NULL; Tuplestorestate *tupstore = BuildTupleResult(fcinfo, &tupdesc); @@ -5875,6 +5880,7 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int j = 0; errno_t rc = 0; int ret = 0; + int group = 0; int priority = 0; SpinLockAcquire(&hashmdata->mutex); @@ -5906,8 +5912,10 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) syncStart = walsnd->syncPercentCountStart; catchup_time[0] = walsnd->catchupTime[0]; catchup_time[1] = walsnd->catchupTime[1]; - if (IS_DN_MULTI_STANDYS_MODE()) + if (IS_DN_MULTI_STANDYS_MODE()) { + group = walsnd->sync_standby_group; priority = walsnd->sync_standby_priority; + } SpinLockRelease(&walsnd->mutex); set_xlog_location(local_role, &sndWrite, &sndFlush, &sndReplay); @@ -6043,9 +6051,11 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) /* sync_state and sync_prority */ if (!SyncRepRequested()) { values[j++] = CStringGetTextDatum("Async"); + nulls[j++] = true; values[j++] = Int32GetDatum(0); } else { values[j++] = CStringGetTextDatum("Sync"); + nulls[j++] = true; values[j++] = Int32GetDatum(sync_priority[i]); } } else { @@ -6065,14 +6075,18 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * basically useless to report "sync" or "potential" as their sync * states. We report just "quorum" for them. */ - if (priority == 0) + if (priority == 0) { values[j++] = CStringGetTextDatum("Async"); - else if (list_member_int(sync_standbys, i)) { - values[j++] = t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY + nulls[j++] = true; + } else if (list_member_int((List*)list_nth(sync_standbys, group), i)) { + values[j++] = GetWalsndSyncRepConfig(walsnd)->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("Sync") : CStringGetTextDatum("Quorum"); - } else + values[j++] = Int32GetDatum(group); + } else { values[j++] = CStringGetTextDatum("Potential"); + values[j++] = Int32GetDatum(group); + } values[j++] = Int32GetDatum(priority); } @@ -6094,6 +6108,10 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + + foreach(lc, sync_standbys) { + list_free((List*)lfirst(lc)); + } list_free(sync_standbys); if (sync_priority != NULL) { pfree(sync_priority); diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback_catalog_maindb_92_606.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback_catalog_maindb_92_606.sql new file mode 100644 index 000000000..ff5477f71 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback_catalog_maindb_92_606.sql @@ -0,0 +1,37 @@ +-- for any x group +DO $$ +DECLARE +ans boolean; +BEGIN + select case when working_version_num()=92301 then true else false end as ans into ans; + if ans = false then + DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE; + SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099; + CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders( + OUT pid bigint, + OUT sender_pid integer, + OUT local_role text, + OUT peer_role text, + OUT peer_state text, + OUT state text, + OUT catchup_start timestamp with time zone, + OUT catchup_end timestamp with time zone, + OUT sender_sent_location text, + OUT sender_write_location text, + OUT sender_flush_location text, + OUT sender_replay_location text, + OUT receiver_received_location text, + OUT receiver_write_location text, + OUT receiver_flush_location text, + OUT receiver_replay_location text, + OUT sync_percent text, + OUT sync_state text, + OUT sync_priority integer, + OUT sync_most_available text, + OUT channel text + ) RETURNS SETOF record + STABLE NOT FENCED NOT SHIPPABLE ROWS 10 + LANGUAGE internal AS 'pg_stat_get_wal_senders'; + end if; +END$$; +SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0; diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback_catalog_otherdb_92_606.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback_catalog_otherdb_92_606.sql new file mode 100644 index 000000000..ff5477f71 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback_catalog_otherdb_92_606.sql @@ -0,0 +1,37 @@ +-- for any x group +DO $$ +DECLARE +ans boolean; +BEGIN + select case when working_version_num()=92301 then true else false end as ans into ans; + if ans = false then + DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE; + SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099; + CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders( + OUT pid bigint, + OUT sender_pid integer, + OUT local_role text, + OUT peer_role text, + OUT peer_state text, + OUT state text, + OUT catchup_start timestamp with time zone, + OUT catchup_end timestamp with time zone, + OUT sender_sent_location text, + OUT sender_write_location text, + OUT sender_flush_location text, + OUT sender_replay_location text, + OUT receiver_received_location text, + OUT receiver_write_location text, + OUT receiver_flush_location text, + OUT receiver_replay_location text, + OUT sync_percent text, + OUT sync_state text, + OUT sync_priority integer, + OUT sync_most_available text, + OUT channel text + ) RETURNS SETOF record + STABLE NOT FENCED NOT SHIPPABLE ROWS 10 + LANGUAGE internal AS 'pg_stat_get_wal_senders'; + end if; +END$$; +SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0; diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_606.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_606.sql new file mode 100644 index 000000000..eb9bae60a --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_606.sql @@ -0,0 +1,38 @@ +-- for any x group +DO $$ +DECLARE +ans boolean; +BEGIN + select case when working_version_num()=92301 then true else false end as ans into ans; + if ans = false then + DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE; + SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099; + CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders( + OUT pid bigint, + OUT sender_pid integer, + OUT local_role text, + OUT peer_role text, + OUT peer_state text, + OUT state text, + OUT catchup_start timestamp with time zone, + OUT catchup_end timestamp with time zone, + OUT sender_sent_location text, + OUT sender_write_location text, + OUT sender_flush_location text, + OUT sender_replay_location text, + OUT receiver_received_location text, + OUT receiver_write_location text, + OUT receiver_flush_location text, + OUT receiver_replay_location text, + OUT sync_percent text, + OUT sync_state text, + OUT sync_group integer, + OUT sync_priority integer, + OUT sync_most_available text, + OUT channel text + ) RETURNS SETOF record + STABLE NOT FENCED NOT SHIPPABLE ROWS 10 + LANGUAGE internal AS 'pg_stat_get_wal_senders'; + end if; +END$$; +SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_606.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_606.sql new file mode 100644 index 000000000..eb9bae60a --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_606.sql @@ -0,0 +1,38 @@ +-- for any x group +DO $$ +DECLARE +ans boolean; +BEGIN + select case when working_version_num()=92301 then true else false end as ans into ans; + if ans = false then + DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE; + SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099; + CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders( + OUT pid bigint, + OUT sender_pid integer, + OUT local_role text, + OUT peer_role text, + OUT peer_state text, + OUT state text, + OUT catchup_start timestamp with time zone, + OUT catchup_end timestamp with time zone, + OUT sender_sent_location text, + OUT sender_write_location text, + OUT sender_flush_location text, + OUT sender_replay_location text, + OUT receiver_received_location text, + OUT receiver_write_location text, + OUT receiver_flush_location text, + OUT receiver_replay_location text, + OUT sync_percent text, + OUT sync_state text, + OUT sync_group integer, + OUT sync_priority integer, + OUT sync_most_available text, + OUT channel text + ) RETURNS SETOF record + STABLE NOT FENCED NOT SHIPPABLE ROWS 10 + LANGUAGE internal AS 'pg_stat_get_wal_senders'; + end if; +END$$; +SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0; \ No newline at end of file diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 63a82c8ca..f071cca2f 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2421,7 +2421,7 @@ typedef struct knl_t_replscanner_context { typedef struct knl_t_syncrepgram_context { /* Result of parsing is returned in one of these two variables */ - struct SyncRepConfigData* syncrep_parse_result; + List* syncrep_parse_result; } knl_t_syncrepgram_context; typedef struct knl_t_syncrepscanner_context { @@ -2431,7 +2431,10 @@ typedef struct knl_t_syncrepscanner_context { } knl_t_syncrepscanner_context; typedef struct knl_t_syncrep_context { - struct SyncRepConfigData* SyncRepConfig; + struct SyncRepConfigData** SyncRepConfig; // array of SyncRepConfig + int SyncRepConfigGroups; // group of SyncRepConfig + int SyncRepMaxPossib; // max possible sync standby number + bool announce_next_takeover; } knl_t_syncrep_context; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 26c78ecf7..43bad2fed 100755 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -39,11 +39,16 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +#define SYNC_REP_MAX_GROUPS 256 + extern volatile bool most_available_sync; #define SyncStandbysDefined() \ (u_sess->attr.attr_storage.SyncRepStandbyNames != NULL && u_sess->attr.attr_storage.SyncRepStandbyNames[0] != '\0') +#define GetWalsndSyncRepConfig(walsnder) \ + (t_thrd.syncrep_cxt.SyncRepConfig[(walsnder)->sync_standby_group]) + /* * Struct for the configuration of synchronous replication. * diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 0941337ac..86bc69181 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -118,10 +118,11 @@ typedef struct WalSnd { Latch latch; /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. + * The strategy group and priority order of the standby managed by this WALSender, + * as listed in synchronous_standby_names, or 0 if not-listed. + * Protected by SyncRepLock. */ + uint8 sync_standby_group; int sync_standby_priority; int index; LogCtrlData log_ctrl; diff --git a/src/test/regress/expected/sync_standy_names.out b/src/test/regress/expected/sync_standy_names.out new file mode 100644 index 000000000..69999272a --- /dev/null +++ b/src/test/regress/expected/sync_standy_names.out @@ -0,0 +1,150 @@ +show synchronous_standby_names; + synchronous_standby_names +--------------------------- + * +(1 row) + +-- single gram +alter system set synchronous_standby_names = '*'; -- suc +alter system set synchronous_standby_names = 'd1, *'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "d1, *" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = '*, *'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "*, *" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = 'd1'; -- suc +alter system set synchronous_standby_names = 'd1, d2, d3, d4'; -- suc +alter system set synchronous_standby_names = 'd1, d2, d1, d4'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d1, d4" +DETAIL: Duplicate standby node name: d1 +alter system set synchronous_standby_names = '2 (*)'; -- suc +alter system set synchronous_standby_names = '2 (d1, *)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, *)" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = '2 (*, *)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "2 (*, *)" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = '2 (d1, d2, d1, d4)'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d1, d4)" +DETAIL: Duplicate standby node name: d1 +alter system set synchronous_standby_names = '5 (d1, d2, d3, d4)'; -- err, requriement more than have +ERROR: invalid value for parameter "synchronous_standby_names": "5 (d1, d2, d3, d4)" +DETAIL: The sync number must less or equals to the number of standby node names. +alter system set synchronous_standby_names = '0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = '-1 (d1, d2, d3, d4)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "-1 (d1, d2, d3, d4)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'first 2 (*)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, *)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, *)" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = 'first 2 (*, *)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (*, *)" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, d2, d1, d4)'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, d2, d1, d4)" +DETAIL: Duplicate standby node name: d1 +alter system set synchronous_standby_names = 'first 5 (d1, d2, d3, d4)'; -- err, requriement more than have +ERROR: invalid value for parameter "synchronous_standby_names": "first 5 (d1, d2, d3, d4)" +DETAIL: The sync number must less or equals to the number of standby node names. +alter system set synchronous_standby_names = 'first 0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'first -1 (d1, d2, d3, d4)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "first -1 (d1, d2, d3, d4)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (*)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, *'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, *" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (*, *)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (*, *)" +DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity. +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, d2, d1, d4)'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d1, d4)" +DETAIL: Duplicate standby node name: d1 +alter system set synchronous_standby_names = 'any 5 (d1, d2, d3, d4)'; -- err, requriement more than have +ERROR: invalid value for parameter "synchronous_standby_names": "any 5 (d1, d2, d3, d4)" +DETAIL: The sync number must less or equals to the number of standby node names. +alter system set synchronous_standby_names = 'any 0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'any -1 (d1, d2, d3, d4)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "any -1 (d1, d2, d3, d4)" +DETAIL: synchronous_standby_names parser failed +-- combinate gram +alter system set synchronous_standby_names = 'd1, d2, d3, 1 (d5, d6)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d3, 1 (d5, d6)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (d5, d6)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (d5, d6)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '*, 1 (d5, d6)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "*, 1 (d5, d6)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (*)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (*)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)" +DETAIL: '*' is not support when sync standby strategy is combinated. +alter system set synchronous_standby_names = 'any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)" +DETAIL: '*' is not support when sync standby strategy is combinated. +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), d5, d6, d7'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), d5, d6, d7" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), *'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), *" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d3, d4), first 2 (d5, d6, d7)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d3, d4), any 4 (d5, d6, d7)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = '*, any 0 (d5, d6, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "*, any 0 (d5, d6, d7)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'd1, d2, d3, d4, any 2 (d5, d6, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d3, d4, any 2 (d5, d6, d7)" +DETAIL: synchronous_standby_names parser failed +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)" +DETAIL: Duplicate standby node name: d1 +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)'; -- err, duplicate name +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)" +DETAIL: Duplicate standby node name: d6 +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)" +DETAIL: FIRST rule is not support when sync standby strategy is combinated. +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, requriement more than have +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)" +DETAIL: The sync number must less or equals to the number of standby node names. +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 0 (d5, d6, d7)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first +ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)" +DETAIL: FIRST rule is not support when sync standby strategy is combinated. +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)'; -- err, parser err +ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)" +DETAIL: synchronous_standby_names parser failed +-- recover +alter system set synchronous_standby_names = '*'; +select pg_sleep(3); + pg_sleep +---------- + +(1 row) + +show synchronous_standby_names; + synchronous_standby_names +--------------------------- + * +(1 row) + diff --git a/src/test/regress/parallel_schedule0 b/src/test/regress/parallel_schedule0 index 742884654..c65d648a9 100644 --- a/src/test/regress/parallel_schedule0 +++ b/src/test/regress/parallel_schedule0 @@ -37,6 +37,7 @@ test: array_funcs first_last_agg test: hw_pwd_encryption_sm3 +test: sync_standy_names # test subpartition test: hw_subpartition_createtable hw_subpartition_scan hw_subpartition_select hw_subpartition_split hw_subpartition_truncate hw_subpartition_update hw_subpartition_gpi hw_subpartition_analyze_vacuum hw_subpartition_alter_table hw_subpartition_index hw_subpartition_add_drop_partition hw_subpartition_tablespace hw_subpartition_ddl_index diff --git a/src/test/regress/sql/sync_standy_names.sql b/src/test/regress/sql/sync_standy_names.sql new file mode 100644 index 000000000..5d58b134e --- /dev/null +++ b/src/test/regress/sql/sync_standy_names.sql @@ -0,0 +1,62 @@ +show synchronous_standby_names; + +-- single gram +alter system set synchronous_standby_names = '*'; -- suc +alter system set synchronous_standby_names = 'd1, *'; -- err, semantic ambiguity +alter system set synchronous_standby_names = '*, *'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'd1'; -- suc +alter system set synchronous_standby_names = 'd1, d2, d3, d4'; -- suc +alter system set synchronous_standby_names = 'd1, d2, d1, d4'; -- err, duplicate name +alter system set synchronous_standby_names = '2 (*)'; -- suc +alter system set synchronous_standby_names = '2 (d1, *)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = '2 (*, *)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = '2 (d1, d2, d1, d4)'; -- err, duplicate name +alter system set synchronous_standby_names = '5 (d1, d2, d3, d4)'; -- err, requriement more than have +alter system set synchronous_standby_names = '0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = '-1 (d1, d2, d3, d4)'; -- err, parser err +alter system set synchronous_standby_names = 'first 2 (*)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, *)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'first 2 (*, *)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, d2, d1, d4)'; -- err, duplicate name +alter system set synchronous_standby_names = 'first 5 (d1, d2, d3, d4)'; -- err, requriement more than have +alter system set synchronous_standby_names = 'first 0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'first -1 (d1, d2, d3, d4)'; -- err, parser err +alter system set synchronous_standby_names = 'any 2 (*)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, *'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'any 2 (*, *)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, d2, d1, d4)'; -- err, duplicate name +alter system set synchronous_standby_names = 'any 5 (d1, d2, d3, d4)'; -- err, requriement more than have +alter system set synchronous_standby_names = 'any 0 (d1, d2, d3, d4)'; -- suc +alter system set synchronous_standby_names = 'any -1 (d1, d2, d3, d4)'; -- err, parser err + +-- combinate gram +alter system set synchronous_standby_names = 'd1, d2, d3, 1 (d5, d6)'; -- err, parser err +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (d5, d6)'; -- err, parser err +alter system set synchronous_standby_names = '*, 1 (d5, d6)'; -- err, parser err +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err +alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err +alter system set synchronous_standby_names = 'any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), d5, d6, d7'; -- err, parser err +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)'; -- err, parser err +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), *'; -- err, parser err +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, parser err +alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, parser err +alter system set synchronous_standby_names = '*, any 0 (d5, d6, d7)'; -- err, parser err +alter system set synchronous_standby_names = 'd1, d2, d3, d4, any 2 (d5, d6, d7)'; -- err, parser err +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- suc +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)'; -- err, duplicate name +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)'; -- err, duplicate name +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, requriement more than have +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 0 (d5, d6, d7)'; -- suc +alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first +alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)'; -- err, parser err + +-- recover +alter system set synchronous_standby_names = '*'; +select pg_sleep(3); +show synchronous_standby_names; \ No newline at end of file