support any x group

This commit is contained in:
gentle_hu 2022-03-20 17:17:22 +08:00
parent 3a552c955d
commit e930224d92
20 changed files with 914 additions and 246 deletions

View File

@ -1203,6 +1203,66 @@ static void CheckKeepSyncWindow(char** opt_lines, int idx)
#endif
static void
parse_next_sync_groups(char **pgroup, char *result)
{
if (**pgroup == '\0') {
result[0] = '\0';
return;
}
char *this_group = *pgroup;
char *p = *pgroup;
while (*p != '\0' && *p != ')') p++;
while (*p != '\0' && *p != ',') p++;
if (*p == ',') {
*p = '\0';
p++;
}
*pgroup = p;
errno_t rc = snprintf_s(result, MAX_VALUE_LEN, MAX_VALUE_LEN - 1, "'%s'", this_group);
securec_check_ss_c(rc, "\0", "\0");
return;
}
static int
transform_az_name(char *config_value, char *allAZString, int allAZStringBufLen, const char *data_dir)
{
char *azString = NULL;
char *buf = allAZString;
int buflen = allAZStringBufLen;
char *allgroup = xstrdup(config_value);
char *pgrp = allgroup + 1; // trim first "'"
char this_group[MAX_VALUE_LEN] = {0x00};
allgroup[strlen(allgroup) - 1] = '\0'; // trim last "'"
parse_next_sync_groups(&pgrp, this_group);
while (this_group[0] != '\0') {
azString = get_AZ_value(this_group, data_dir);
if (NULL == azString) {
GS_FREE(allgroup);
return FAILURE;
}
int azStringLen = strlen(azString);
azString[0] = ' ';
azString[azStringLen - 1] = ',';
errno_t rc = strncpy_s(buf, buflen, azString, azStringLen);
securec_check_c(rc, "\0", "\0");
buf += azStringLen;
buflen -= azStringLen;
GS_FREE(azString);
parse_next_sync_groups(&pgrp, this_group);
}
GS_FREE(allgroup);
allAZString[0] = allAZString[strlen(allAZString) - 1] = '\'';
return SUCCESS;
}
/*
* @@GaussDB@@
* Brief :
@ -1282,17 +1342,16 @@ do_gucset(const char *action_type, const char *data_dir)
// get AZ string
if (NULL != config_value[i]) {
char *azString = NULL;
azString = get_AZ_value(config_value[i], data_dir);
if (NULL == azString) {
result_status = FAILURE;
char allAZString[MAX_VALUE_LEN] = {0x00};
result_status = transform_az_name(config_value[i], allAZString, MAX_VALUE_LEN, data_dir);
if (result_status == FAILURE) {
continue;
}
tmpAZStr = xstrdup(config_value[i]);
GS_FREE(config_value[i]);
config_value[i] = xstrdup(azString);
GS_FREE(azString);
config_value[i] = xstrdup(allAZString);
}
}

View File

@ -8641,7 +8641,7 @@
),
AddFuncGroup(
"pg_stat_get_wal_senders", 1,
AddBuiltinFunc(_0(3099), _1("pg_stat_get_wal_senders"), _2(0), _3(false), _4(true), _5(pg_stat_get_wal_senders), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(21, 20, 23, 25, 25, 25, 25, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 23, 25, 25), _22(21, 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(21, "pid", "sender_pid", "local_role", "peer_role", "peer_state", "state", "catchup_start", "catchup_end", "sender_sent_location", "sender_write_location", "sender_flush_location", "sender_replay_location", "receiver_received_location", "receiver_write_location", "receiver_flush_location", "receiver_replay_location", "sync_percent", "sync_state", "sync_priority", "sync_most_available", "channel"), _24(NULL), _25("pg_stat_get_wal_senders"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about currently active replication"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
AddBuiltinFunc(_0(3099), _1("pg_stat_get_wal_senders"), _2(0), _3(false), _4(true), _5(pg_stat_get_wal_senders), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(10), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(22, 20, 23, 25, 25, 25, 25, 1184, 1184, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 23, 23, 25, 25), _22(22, 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(22, "pid", "sender_pid", "local_role", "peer_role", "peer_state", "state", "catchup_start", "catchup_end", "sender_sent_location", "sender_write_location", "sender_flush_location", "sender_replay_location", "receiver_received_location", "receiver_write_location", "receiver_flush_location", "receiver_replay_location", "sync_percent", "sync_state", "sync_group", "sync_priority", "sync_most_available", "channel"), _24(NULL), _25("pg_stat_get_wal_senders"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("statistics: information about currently active replication"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0))
),
AddFuncGroup(
"pg_stat_get_wlm_ec_operator_info", 1,

View File

@ -59,7 +59,7 @@ bool open_join_children = true;
bool will_shutdown = false;
/* hard-wired binary version number */
const uint32 GRAND_VERSION_NUM = 92605;
const uint32 GRAND_VERSION_NUM = 92606;
const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522;
const uint32 UPSERT_WHERE_VERSION_NUM = 92514;

View File

@ -164,7 +164,6 @@
#include "utils/guc_memory.h"
#include "utils/guc_network.h"
#include "utils/guc_resource.h"
#include "common/config/cm_config.h"
#ifndef PG_KRB_SRVTAB
#define PG_KRB_SRVTAB ""
@ -517,9 +516,6 @@ static bool validate_conf_enum(struct config_generic *record, const char *name,
int elevel, bool freemem, void *newvalue, void **newextra);
#ifndef ENABLE_MULTIPLE_NODES
int init_gauss_cluster_config(void);
static bool AlterSystemSetCheckSyncStandbyNames(struct config_generic *record, const char *name, const char *value,
void *newextra);
static void CheckAndGetAlterSystemSetParam(AlterSystemStmt* altersysstmt,
char** outer_name, char** outer_value, struct config_generic** outer_record);
static void FinishAlterSystemSet(GucContext context);
@ -8252,121 +8248,6 @@ static void CheckAlterSystemSetPrivilege(const char* name)
}
}
/*
* Obtain cluster information from cluster_static_config.
*/
int init_gauss_cluster_config(void)
{
char path[MAXPGPATH] = {0};
int err_no = 0;
int nRet = 0;
int status = 0;
uint32 nodeidx = 0;
struct stat statbuf {};
char* gausshome = gs_getenv_r("GAUSSHOME");
check_backend_env(gausshome);
nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/bin/%s", gausshome, STATIC_CONFIG_FILE);
securec_check_ss_c(nRet, "\0", "\0");
if (lstat(path, &statbuf) != 0) {
return 1;
}
status = read_config_file(path, &err_no);
if (status != 0) {
return 1;
}
if (g_nodeHeader.node <= 0) {
free(g_node);
g_node = nullptr;
return 1;
}
for (nodeidx = 0; nodeidx < g_node_num; nodeidx++) {
if (g_node[nodeidx].node == g_nodeHeader.node) {
g_currentNode = &g_node[nodeidx];
break;
}
}
if (g_currentNode == NULL) {
free(g_node);
g_node = nullptr;
return 1;
}
if (get_dynamic_dn_role() != 0) {
// failed to get dynamic dn role
free(g_node);
g_node = nullptr;
return 1;
}
return 0;
}
static bool AlterSystemSetCheckSyncStandbyNames(struct config_generic *record, const char *name, const char *value,
void *newextra)
{
char* newvalue = NULL;
bool res = false;
SyncRepConfigData *pconf = NULL;
char* data_dir = t_thrd.proc_cxt.DataDir;
char* p = NULL;
if (value != NULL) {
newvalue = guc_strdup(DEBUG5, value);
}
if (!validate_conf_option(record, name, value, PGC_S_FILE, ERROR, false, &newvalue, &newextra)) {
goto ret;
}
pconf = (SyncRepConfigData *)newextra;
if (pconf == NULL || strcmp(value, "*") == 0) {
res = true;
goto ret;
}
if (pconf->num_sync > pconf->nmembers) {
// The sync number must less or equals to the number of standby node names.
goto ret;
}
/* get current cluster information from cluster_staic_config */
if (has_static_config()) {
if (init_gauss_cluster_config() != 0) {
res = true;
goto ret;
}
} else {
res = true;
goto ret;
}
p = pconf->member_names;
for (int i = 1; i <= pconf->nmembers; i++) {
if (!CheckDataNameValue(p, data_dir)) {
goto ret;
}
p += strlen(p) + 1;
}
ret:
if (newvalue != NULL) {
pfree(newvalue);
newvalue = NULL;
}
if (newextra != NULL) {
pfree(newextra);
newextra = NULL;
}
return res;
}
/*
* Persist the configuration parameter value.
*
@ -8424,10 +8305,7 @@ static void CheckAndGetAlterSystemSetParam(AlterSystemStmt* altersysstmt,
"ALTER SYSTEM SET only support POSTMASTER-level, SIGHUP-level and BACKEND-level guc variable,\n"
"and it must be allowed to set in postgresql.conf.", name)));
if ((strcmp(name, "synchronous_standby_names") == 0 &&
!AlterSystemSetCheckSyncStandbyNames(record, name, value, newextra)) ||
(strcmp(name, "synchronous_standby_names") != 0 &&
!validate_conf_option(record, name, value, PGC_S_FILE, ERROR, true, NULL, &newextra))) {
if (!validate_conf_option(record, name, value, PGC_S_FILE, ERROR, true, NULL, &newextra)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for parameter \"%s\": \"%s\"", name, value)));

View File

@ -792,7 +792,7 @@ static void knl_t_replscanner_init(knl_t_replscanner_context* replscanner_cxt)
static void knl_t_syncgram_init(knl_t_syncrepgram_context* syncrepgram_cxt)
{
syncrepgram_cxt->syncrep_parse_result = NULL;
syncrepgram_cxt->syncrep_parse_result = NIL;
}
static void knl_t_syncrepscanner_init(knl_t_syncrepscanner_context* syncrepscanner_cxt)
@ -805,6 +805,8 @@ static void knl_t_syncrepscanner_init(knl_t_syncrepscanner_context* syncrepscann
static void knl_t_syncrep_init(knl_t_syncrep_context* syncrep_cxt)
{
syncrep_cxt->SyncRepConfig = NULL;
syncrep_cxt->SyncRepConfigGroups = 0;
syncrep_cxt->SyncRepMaxPossib = 0;
syncrep_cxt->announce_next_takeover = true;
}

View File

@ -983,7 +983,8 @@ void ReplicationSlotsComputeRequiredLSN(ReplicationSlotState *repl_slt_state)
return;
}
if (t_thrd.syncrep_cxt.SyncRepConfig != NULL) {
for (i = t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1; i >= 0; i--) {
i = Min(t_thrd.syncrep_cxt.SyncRepMaxPossib, g_instance.attr.attr_storage.max_replication_slots) - 1;
for ( ; i >= 0; i--) {
if (standby_slots_list[i] != InvalidXLogRecPtr) {
repl_slt_state->quorum_min_required = standby_slots_list[i];
break;

View File

@ -74,12 +74,20 @@ volatile bool most_available_sync = false;
const int MAX_SYNC_REP_RETRY_COUNT = 1000;
const int SYNC_REP_SLEEP_DELAY = 1000;
typedef enum SyncStandbyNumState {
STANDBIES_EMPTY = 0,
STANDBIES_NOT_ENOUGH,
STANDBIES_ENOUGH
} SyncStandbyNumState;
static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys);
static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state);
static void SyncRepQueueInsert(int mode);
static bool SyncRepCancelWait(void);
static void SyncRepWaitCompletionQueue();
static void SyncRepNotifyComplete();
static int SyncRepGetStandbyPriority(void);
static void SyncRepGetStandbyGroupAndPriority(int* gid, int* prio);
#ifndef ENABLE_MULTIPLE_NODES
static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTime);
#endif
@ -88,7 +96,6 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* write
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr,
XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth);
static void SyncPaxosQueueInsert(void);
static void SyncPaxosCancelWait(void);
static int SyncPaxosWakeQueue(void);
@ -98,14 +105,57 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
static bool SyncPaxosQueueIsOrderedByLSN(void);
#endif
static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys = NULL);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys = NULL);
static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys = NULL);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys = NULL);
static inline void free_sync_standbys_list(List* sync_standbys);
static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state);
static int cmp_lsn(const void *a, const void *b);
static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum = 0);
static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state = STANDBIES_EMPTY);
typedef struct TransContext {
/* for global */
List* source;
bool has_star;
List *existers;
int SyncRepAllCount;
int SyncRepMinPossib;
/* for each group*/
SyncRepConfigData* conf;
bool is_star;
/* results */
bool success;
List *SyncRepConfig;
int SyncRepConfigGroups;
int SyncRepMaxPossib;
} TransContext;
static TransContext* create_transform_context();
static void bind_transform_context(TransContext *tcxt, SyncRepConfigData *conf);
static void advance_transform_result(TransContext *tcxt);
static void finalize_transform_result(TransContext *tcxt);
static void clear_transform_context(TransContext *tcxt);
static void destroy_transform_context(TransContext *tcxt);
static bool analyze_star_and_num(TransContext *tcxt);
static bool analyze_duplicate_names(TransContext *tcxt);
static void transform_synchronous_standby_names(TransContext *tcxt);
#define CATCHUP_XLOG_DIFF(ptr1, ptr2, amount) \
XLogRecPtrIsInvalid(ptr1) ? false : (XLByteDifference(ptr2, ptr1) < amount)
/*
* sync_standbys_list is a two-dimensional array means sync standby nodes in groups
* The structure is as follows: List[IntList[int, int,...], IntList[int, int, ...], ...]
*/
static inline void free_sync_standbys_list(List* sync_standbys)
{
ListCell *lc = NULL;
foreach(lc, sync_standbys) {
list_free((List*)lfirst(lc));
}
list_free(sync_standbys);
}
/*
* Determine whether to wait for standby catching up, if requested by user.
*
@ -481,15 +531,19 @@ void SyncRepCleanupAtProcExit(void)
*/
void SyncRepInitConfig(void)
{
int group;
int priority;
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
*/
priority = SyncRepGetStandbyPriority();
if (t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority != priority) {
SyncRepGetStandbyGroupAndPriority(&group, &priority);
if (t_thrd.walsender_cxt.MyWalSnd->sync_standby_group != group ||
t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority != priority) {
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
t_thrd.walsender_cxt.MyWalSnd->sync_standby_group = group;
t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority = priority;
/*
@ -499,8 +553,8 @@ void SyncRepInitConfig(void)
SyncRepCheckSyncStandbyAlive();
LWLockRelease(SyncRepLock);
ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %d",
u_sess->attr.attr_common.application_name, priority)));
ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby group and priority: %d %d",
u_sess->attr.attr_common.application_name, group, priority)));
}
}
@ -558,13 +612,15 @@ void SyncRepReleaseWaiters(void)
*/
if (t_thrd.syncrep_cxt.announce_next_takeover && am_sync) {
t_thrd.syncrep_cxt.announce_next_takeover = false;
if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) {
ereport(LOG, (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
if (GetWalsndSyncRepConfig(t_thrd.walsender_cxt.MyWalSnd)->syncrep_method == SYNC_REP_PRIORITY) {
ereport(LOG, (errmsg("standby \"%s\" is now a synchronous standby with group and priority: %d %d",
u_sess->attr.attr_common.application_name,
t_thrd.walsender_cxt.MyWalSnd->sync_standby_group,
t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority)));
} else {
ereport(LOG, (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
u_sess->attr.attr_common.application_name)));
ereport(LOG, (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby in group: %d",
u_sess->attr.attr_common.application_name,
t_thrd.walsender_cxt.MyWalSnd->sync_standby_group)));
}
}
@ -609,6 +665,30 @@ void SyncRepReleaseWaiters(void)
numapply, (uint32)(replayPtr >> 32), (uint32)replayPtr)));
}
static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys)
{
if (t_thrd.syncrep_cxt.SyncRepConfig == NULL)
return STANDBIES_ENOUGH;
ListCell* lc = NULL;
List* per_group = NIL;
int gid = 0;
int alive = 0;
SyncStandbyNumState res = STANDBIES_ENOUGH;
foreach(lc, sync_standbys) {
per_group = (List*)lfirst(lc);
if (list_length(per_group) < t_thrd.syncrep_cxt.SyncRepConfig[gid]->num_sync)
res = STANDBIES_NOT_ENOUGH;
alive += list_length(per_group);
gid++;
}
if (alive == 0) res = STANDBIES_EMPTY;
return res;
}
/*
* Check whether to delay the time of entering most_available_sync mode.
*
@ -617,7 +697,7 @@ void SyncRepReleaseWaiters(void)
* when the number of sync standby is not qualified.
* Otherwise it's set to true.
*/
static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum)
static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state)
{
bool result = false;
@ -629,10 +709,11 @@ static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum)
List *sync_standbys = NIL;
bool am_sync = false;
sync_standbys = SyncRepGetSyncStandbys(&am_sync);
curSyncNum = list_length(sync_standbys);
state = check_sync_standbys_num(sync_standbys);
free_sync_standbys_list(sync_standbys);
}
if (curSyncNum == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) {
if (state == STANDBIES_ENOUGH) {
if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start != 0) {
t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = 0;
t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = false;
@ -658,6 +739,25 @@ static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum)
return result;
}
/*
* In a particular scenario, when most_available_sync is true, primary only wait
* the alive sync standbys, even if the quantity does not meet the configuration
* requirements.
*/
static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state)
{
*state = check_sync_standbys_num(sync_standbys);
if (*state == STANDBIES_ENOUGH) {
DelayIntoMostAvaSync(false, STANDBIES_ENOUGH); // just for refresh keep_sync_window if needed
return true;
}
if (t_thrd.walsender_cxt.WalSndCtl->most_available_sync && !DelayIntoMostAvaSync(false, *state)) {
return true;
}
return false;
}
/*
* Calculate the synced Receive, Write, Flush and Apply positions among sync standbys.
*
@ -683,14 +783,13 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP
/*
* Quick exit if we are not managing a sync standby (or not check for check_am_sync is false)
* or there are not enough synchronous standbys.
* but in a particular scenario, when most_available_sync is true, primary only wait the alive sync standbys
* if list_length(sync_standbys) doesn't satisfy t_thrd.syncrep_cxt.SyncRepConfig->num_sync.
* or most_available_sync is working and allow some standby nodes to be missing.
*/
int cur_num_sync_standbys = list_length(sync_standbys);
if ((!(*am_sync) && check_am_sync) || t_thrd.syncrep_cxt.SyncRepConfig == NULL ||
((!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || DelayIntoMostAvaSync(false, cur_num_sync_standbys)) &&
cur_num_sync_standbys < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) {
list_free(sync_standbys);
SyncStandbyNumState state;
if ((!(*am_sync) && check_am_sync) ||
t_thrd.syncrep_cxt.SyncRepConfig == NULL ||
!judge_sync_standbys_num(sync_standbys, &state)) {
free_sync_standbys_list(sync_standbys);
return false;
}
@ -707,20 +806,29 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP
* we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
* positions even in a quorum-based sync replication.
*/
if (list_length(sync_standbys) == 0) {
if (state == STANDBIES_EMPTY) {
/* deal with sync standbys list is empty when most available sync mode is on */
*writePtr = GetXLogWriteRecPtr();
*flushPtr = GetFlushRecPtr();
*receivePtr = *writePtr;
*replayPtr = *flushPtr;
} else if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) {
SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys);
} else if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_QUORUM) {
SyncRepGetNthLatestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys,
t_thrd.syncrep_cxt.SyncRepConfig->num_sync);
} else {
int i = 0;
ListCell *lc = NULL;
foreach(lc, sync_standbys) {
List *per_group = (List*)lfirst(lc);
if (per_group == NIL) {
/* do nothing, this group of sync standbys are all offline or no sync standbys. */
} else if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) {
SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, per_group);
} else {
SyncRepGetNthLatestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr,
per_group, t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync);
}
i++;
}
}
list_free(sync_standbys);
free_sync_standbys_list(sync_standbys);
return true;
}
@ -742,8 +850,8 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi
/* Get standbys that are considered as synchronous at this moment. */
sync_standbys = SyncRepGetSyncStandbys(&am_sync, &catchup_standbys);
/* Skip here if there is at lease one sync standby, or no standby in catchup. */
if (list_length(sync_standbys) > 0 || list_length(catchup_standbys) == 0) {
list_free(sync_standbys);
if (check_sync_standbys_num(sync_standbys) != STANDBIES_EMPTY || list_length(catchup_standbys) == 0) {
free_sync_standbys_list(sync_standbys);
list_free(catchup_standbys);
return false;
}
@ -766,7 +874,7 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi
}
}
list_free(sync_standbys);
free_sync_standbys_list(sync_standbys);
list_free(catchup_standbys);
return true;
}
@ -860,10 +968,14 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr
}
/* Get Nth latest Write, Flush, Apply positions */
*receivePtr = receive_array[nth - 1];
*writePtr = write_array[nth - 1];
*flushPtr = flush_array[nth - 1];
*replayPtr = apply_array[nth - 1];
if (XLogRecPtrIsInvalid(*writePtr) || XLByteLE(write_array[nth - 1], *writePtr))
*writePtr = write_array[nth - 1];
if (XLogRecPtrIsInvalid(*flushPtr) || XLByteLE(flush_array[nth - 1], *flushPtr))
*flushPtr = flush_array[nth - 1];
if (XLogRecPtrIsInvalid(*receivePtr) || XLByteLE(receive_array[nth - 1], *receivePtr))
*receivePtr = receive_array[nth - 1];
if (XLogRecPtrIsInvalid(*replayPtr) || XLByteLE(apply_array[nth - 1], *replayPtr))
*replayPtr = apply_array[nth - 1];
pfree(receive_array);
receive_array = NULL;
@ -893,47 +1005,56 @@ static int cmp_lsn(const void *a, const void *b)
/*
* Check if we are in the list of sync standbys, and if so, determine
* priority sequence. Return priority if set, or zero to indicate that
* priority sequence. Return groupid and priority if set, or zero to indicate that
* we are not a potential sync standby.
*
* Compare the parameter SyncRepStandbyNames against the application_name
* for this WALSender, or allow any name if we find a wildcard "*".
*/
static int SyncRepGetStandbyPriority(void)
static void SyncRepGetStandbyGroupAndPriority(int* gid, int* prio)
{
const char *standby_name = NULL;
int group;
int priority;
bool found = false;
*gid = 0;
*prio = 0;
/*
* Since synchronous cascade replication is not allowed, we always set the
* priority of cascading walsender to zero.
*/
if (AM_WAL_STANDBY_SENDER || AM_WAL_SHARE_STORE_SENDER || AM_WAL_HADR_DNCN_SENDER || AM_WAL_DB_SENDER)
return 0;
return;
if (!SyncStandbysDefined() || t_thrd.syncrep_cxt.SyncRepConfig == NULL || !SyncRepRequested())
return 0;
return;
standby_name = t_thrd.syncrep_cxt.SyncRepConfig->member_names;
for (priority = 1; priority <= t_thrd.syncrep_cxt.SyncRepConfig->nmembers; priority++) {
if (pg_strcasecmp(standby_name, u_sess->attr.attr_common.application_name) == 0 ||
strcmp(standby_name, "*") == 0) {
found = true;
break;
for (group = 0; group < t_thrd.syncrep_cxt.SyncRepConfigGroups && !found; group++) {
standby_name = t_thrd.syncrep_cxt.SyncRepConfig[group]->member_names;
for (priority = 1; priority <= t_thrd.syncrep_cxt.SyncRepConfig[group]->nmembers; priority++) {
if (pg_strcasecmp(standby_name, u_sess->attr.attr_common.application_name) == 0 ||
strcmp(standby_name, "*") == 0) {
Assert(!(group > 1 && strcmp(standby_name, "*") == 0));
found = true;
break;
}
standby_name += strlen(standby_name) + 1;
}
standby_name += strlen(standby_name) + 1;
}
if (!found) {
return 0;
return;
}
/*
* In quorum-based sync replication, all the standbys in the list
* have the same priority, one.
*/
return (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
*gid = group - 1;
*prio = (t_thrd.syncrep_cxt.SyncRepConfig[group - 1]->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
return;
}
/*
@ -1154,9 +1275,16 @@ List *SyncRepGetSyncStandbys(bool *am_sync, List** catchup_standbys)
if (t_thrd.syncrep_cxt.SyncRepConfig == NULL)
return NIL;
return (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
? SyncRepGetSyncStandbysPriority(am_sync, catchup_standbys)
: SyncRepGetSyncStandbysQuorum(am_sync, catchup_standbys);
List* results = NIL;
for(int i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) {
if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) {
results = lappend(results, SyncRepGetSyncStandbysPriority(am_sync, i,catchup_standbys));
} else {
results = lappend(results, SyncRepGetSyncStandbysQuorum(am_sync, i, catchup_standbys));
}
}
return results;
}
/*
@ -1169,13 +1297,13 @@ List *SyncRepGetSyncStandbys(bool *am_sync, List** catchup_standbys)
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys)
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys)
{
List *result = NIL;
int i;
volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */
Assert(t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_QUORUM);
for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) {
walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i];
@ -1185,7 +1313,7 @@ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys
continue;
/* Must be synchronous */
if (walsnd->sync_standby_priority == 0)
if (walsnd->sync_standby_priority == 0 || walsnd->sync_standby_group != groupid)
continue;
if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) &&
@ -1230,7 +1358,7 @@ static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys)
static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys)
{
List *result = NIL;
List *pending = NIL;
@ -1242,9 +1370,9 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb
bool am_in_pending = false;
volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */
Assert(t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_PRIORITY);
lowest_priority = t_thrd.syncrep_cxt.SyncRepConfig->nmembers;
lowest_priority = t_thrd.syncrep_cxt.SyncRepConfig[groupid]->nmembers;
next_highest_priority = lowest_priority + 1;
/*
@ -1261,7 +1389,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb
/* Must be synchronous */
this_priority = walsnd->sync_standby_priority;
if (this_priority == 0)
if (this_priority == 0 || walsnd->sync_standby_group != groupid)
continue;
if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) &&
@ -1286,7 +1414,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb
result = lappend_int(result, i);
if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd)
*am_sync = true;
if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) {
if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) {
list_free(pending);
return result; /* Exit if got enough sync standbys */
}
@ -1311,7 +1439,7 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb
* Consider all pending standbys as sync if the number of them plus
* already-found sync ones is lower than the configuration requests.
*/
if (list_length(result) + list_length(pending) <= t_thrd.syncrep_cxt.SyncRepConfig->num_sync) {
if (list_length(result) + list_length(pending) <= t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) {
bool needfree = (result != NIL && pending != NIL);
/*
@ -1355,9 +1483,9 @@ static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standb
/*
* We should always exit here after the scan of pending list
* starts because we know that the list has enough elements to
* reach SyncRepConfig->num_sync.
* reach SyncRepConfig[groupid]->num_sync.
*/
if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) {
if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) {
list_free(pending);
return result; /* Exit if got enough sync standbys */
}
@ -1811,6 +1939,184 @@ static bool SyncPaxosQueueIsOrderedByLSN(void)
}
#endif
/*
* =================================================================
* Analyze and transform GUC param synchronous_standby_names content
* Create result in session MEMORY_CONTEXT_STORAGE session.
* =================================================================
*/
static TransContext* create_transform_context()
{
MemoryContext old_context = MemoryContextSwitchTo(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
TransContext* tcxt = (TransContext*)palloc(sizeof(TransContext));
(void)MemoryContextSwitchTo(old_context);
tcxt->source = t_thrd.syncrepgram_cxt.syncrep_parse_result;
tcxt->SyncRepConfigGroups = list_length(tcxt->source);
Assert(tcxt->SyncRepConfigGroups > 0);
tcxt->success = true;
tcxt->has_star = false;
tcxt->existers = NIL;
tcxt->conf = NULL;
tcxt->is_star = false;
tcxt->SyncRepConfig = NIL;
tcxt->SyncRepAllCount = 0;
tcxt->SyncRepMaxPossib = MAX_INT32;
tcxt->SyncRepMinPossib = 0;
return tcxt;
}
static void bind_transform_context(TransContext *tcxt, SyncRepConfigData *conf)
{
tcxt->conf = conf;
tcxt->is_star = false;
}
static void advance_transform_result(TransContext *tcxt)
{
tcxt->SyncRepMinPossib += tcxt->conf->num_sync;
if (tcxt->has_star) {
tcxt->SyncRepAllCount = -1;
tcxt->SyncRepMaxPossib = tcxt->SyncRepMinPossib;
} else {
tcxt->SyncRepAllCount += tcxt->conf->nmembers;
tcxt->SyncRepMaxPossib = Min(tcxt->SyncRepMaxPossib, tcxt->conf->nmembers - tcxt->conf->num_sync);
}
}
static void finalize_transform_result(TransContext *tcxt)
{
if (!tcxt->success) {
clear_transform_context(tcxt);
return;
}
MemoryContext old_context = MemoryContextSwitchTo(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
SyncRepConfigData* conf = NULL;
ListCell* lc = NULL;
SyncRepConfigData* result = NULL;
errno_t rc;
foreach(lc, tcxt->source) {
conf = (SyncRepConfigData*)lfirst(lc);
result = (SyncRepConfigData*)palloc(conf->config_size);
rc = memcpy_s(result, conf->config_size, conf, conf->config_size);
securec_check(rc, "", "");
tcxt->SyncRepConfig = lappend(tcxt->SyncRepConfig, result);
}
(void)MemoryContextSwitchTo(old_context);
if (!tcxt->has_star) {
tcxt->SyncRepMaxPossib = tcxt->SyncRepAllCount - tcxt->SyncRepMaxPossib;
}
clear_transform_context(tcxt);
}
/*
* clear the transform context, except results.
*/
static void clear_transform_context(TransContext *tcxt)
{
tcxt->source = NIL;
tcxt->conf = NULL;
list_free(tcxt->existers);
tcxt->existers = NIL;
}
static void destroy_transform_context(TransContext *tcxt)
{
list_free(tcxt->existers);
list_free_deep(tcxt->SyncRepConfig);
pfree(tcxt);
}
static bool analyze_star_and_num(TransContext *tcxt)
{
SyncRepConfigData *conf = tcxt->conf;
char *p = conf->member_names;
for (int i = 0; i < conf->nmembers; i++) {
if (strcmp(p, "*") != 0) {
p += strlen(p) + 1;
continue;
}
if (tcxt->SyncRepConfigGroups > 1) {
GUC_check_errdetail("'*' is not support when sync standby strategy is combinated.");
return false;
}
if (conf->nmembers > 1) {
GUC_check_errdetail("Please use the '*' separately, otherwise, it is semantic ambiguity.");
return false;
}
tcxt->is_star = true;
tcxt->has_star = true;
}
if (!tcxt->is_star && conf->num_sync > conf->nmembers) {
GUC_check_errdetail("The sync number must less or equals to the number of standby node names.");
return false;
}
return true;
}
static bool analyze_duplicate_names(TransContext *tcxt)
{
if (tcxt->is_star)
return true;
SyncRepConfigData *conf = tcxt->conf;
char *p = conf->member_names;
char *exister = NULL;
ListCell *lc = NULL;
for (int i = 0; i < conf->nmembers; i++) {
foreach(lc, tcxt->existers) {
exister = (char*)lfirst(lc);
if (pg_strcasecmp(p, exister) == 0) {
GUC_check_errdetail("Duplicate standby node name: %s", p);
return false;
}
}
tcxt->existers = lappend(tcxt->existers, p);
p += strlen(p) + 1;
}
return true;
}
static void transform_synchronous_standby_names(TransContext* tcxt)
{
if (tcxt->SyncRepConfigGroups > SYNC_REP_MAX_GROUPS) {
GUC_check_errdetail("Too much groups. Please no more than %d.", SYNC_REP_MAX_GROUPS);
tcxt->success = false;
clear_transform_context(tcxt);
return;
}
ListCell* lc = NULL;
foreach(lc, t_thrd.syncrepgram_cxt.syncrep_parse_result) {
bind_transform_context(tcxt, (SyncRepConfigData*)lfirst(lc));
if (tcxt->conf->syncrep_method == SYNC_REP_PRIORITY && tcxt->SyncRepConfigGroups > 1) {
GUC_check_errdetail("FIRST rule is not support when sync standby strategy is combinated.");
tcxt->success = false;
break;
}
if (!analyze_star_and_num(tcxt) ||
!analyze_duplicate_names(tcxt)) {
tcxt->success = false;
break;
}
advance_transform_result(tcxt);
}
finalize_transform_result(tcxt);
}
/*
* ===========================================================
* Synchronous Replication functions executed by any process
@ -1818,41 +2124,39 @@ static bool SyncPaxosQueueIsOrderedByLSN(void)
*/
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{
errno_t rc = EOK;
if (*newval != NULL && (*newval)[0] != '\0') {
int parse_rc;
SyncRepConfigData *pconf = NULL;
syncrep_scanner_yyscan_t yyscanner;
/* Reset communication variables to ensure a fresh start */
t_thrd.syncrepgram_cxt.syncrep_parse_result = NULL;
t_thrd.syncrepgram_cxt.syncrep_parse_result = NIL;
/* Parse the synchronous_standby_names string */
yyscanner = syncrep_scanner_init(*newval);
parse_rc = syncrep_yyparse(yyscanner);
syncrep_scanner_finish(yyscanner);
if (parse_rc != 0 || t_thrd.syncrepgram_cxt.syncrep_parse_result == NULL) {
if (parse_rc != 0 || t_thrd.syncrepgram_cxt.syncrep_parse_result == NIL) {
GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
GUC_check_errdetail("synchronous_standby_names parser failed");
return false;
}
/* GUC extra value must be malloc'd, not palloc'd */
pconf = (SyncRepConfigData *)MemoryContextAlloc(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE),
t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size);
if (pconf == NULL)
/*
* analyze and transform synchronous_standby_names content,
* make result in session MEMORY_CONTEXT_STORAGE.
*/
TransContext* tcxt = create_transform_context();
transform_synchronous_standby_names(tcxt);
if (!tcxt->success) {
destroy_transform_context(tcxt);
return false;
rc = memcpy_s(pconf, t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size,
t_thrd.syncrepgram_cxt.syncrep_parse_result,
t_thrd.syncrepgram_cxt.syncrep_parse_result->config_size);
securec_check(rc, "", "");
}
*extra = (void *)pconf;
*extra = (void *)tcxt;
if (t_thrd.syncrepgram_cxt.syncrep_parse_result) {
pfree(t_thrd.syncrepgram_cxt.syncrep_parse_result);
t_thrd.syncrepgram_cxt.syncrep_parse_result = NULL;
list_free_deep(t_thrd.syncrepgram_cxt.syncrep_parse_result);
t_thrd.syncrepgram_cxt.syncrep_parse_result = NIL;
}
/*
@ -1875,16 +2179,36 @@ void assign_synchronous_standby_names(const char *newval, void *extra)
* it should be safe to know the latest rep config ASAP for all sessions.
* If this assumption no longer holds, please move it to session level.
*/
pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig);
if (extra != NULL) {
SyncRepConfigData *pconf = (SyncRepConfigData *)extra;
errno_t rc = EOK;
t_thrd.syncrep_cxt.SyncRepConfig = (SyncRepConfigData *)MemoryContextAlloc(
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE), pconf->config_size);
rc = memcpy_s(t_thrd.syncrep_cxt.SyncRepConfig, pconf->config_size, pconf, pconf->config_size);
securec_check(rc, "", "");
for (int i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) {
pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig[i]);
}
pfree_ext(t_thrd.syncrep_cxt.SyncRepConfig);
t_thrd.syncrep_cxt.SyncRepConfigGroups = 0;
t_thrd.syncrep_cxt.SyncRepMaxPossib = 0;
if (extra == NULL)
return;
TransContext* tcxt = (TransContext*)extra;
ListCell* lc = NULL;
int i = 0;
errno_t rc = EOK;
MemoryContext old_context = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
t_thrd.syncrep_cxt.SyncRepConfigGroups = tcxt->SyncRepConfigGroups;
t_thrd.syncrep_cxt.SyncRepMaxPossib = tcxt->SyncRepMaxPossib;
t_thrd.syncrep_cxt.SyncRepConfig = (SyncRepConfigData **)palloc(
tcxt->SyncRepConfigGroups * sizeof(SyncRepConfigData*));
foreach(lc, tcxt->SyncRepConfig) {
SyncRepConfigData* pconf = (SyncRepConfigData*)lfirst(lc);
t_thrd.syncrep_cxt.SyncRepConfig[i] = (SyncRepConfigData*)palloc(pconf->config_size);
rc = memcpy_s(t_thrd.syncrep_cxt.SyncRepConfig[i], pconf->config_size, pconf, pconf->config_size);
securec_check(rc, "", "");
i++;
}
(void)MemoryContextSwitchTo(old_context);
}
void assign_synchronous_commit(int newval, void *extra)

View File

@ -66,8 +66,8 @@ static void syncrep_yyerror(YYLTYPE *yylloc,
%token <str> NAME NUM JUNK ANY FIRST
%type <config> result standby_config
%type <list> standby_list
%type <config> standby_config_simplify standby_config_complete
%type <list> result standby_config standby_config_combination standby_list
%type <str> standby_name
%start result
@ -78,9 +78,22 @@ result:
;
standby_config:
standby_config_simplify { $$ = list_make1($1); }
| standby_config_combination { $$ = $1; }
;
standby_config_combination:
standby_config_complete { $$ = list_make1($1); }
| standby_config_combination ',' standby_config_complete { $$ = lappend($1, $3); }
;
standby_config_simplify:
standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
| NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); pfree($1); }
| ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); pfree($2); }
;
standby_config_complete:
ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); pfree($2); }
| FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); pfree($2); }
;

View File

@ -1270,8 +1270,9 @@ void GetMinLsnRecordsFromHadrCascadeStandby(void)
qsort(standbyApplyList, g_instance.attr.attr_storage.max_wal_senders, sizeof(XLogRecPtr), cmp_min_lsn);
applyLoc = GetXLogReplayRecPtr(NULL, &ReplayReadPtr);
t_thrd.walreceiver_cxt.reply_message->applyRead = ReplayReadPtr;
for (i = t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1; i >= 0; i--) {
if (i < t_thrd.syncrep_cxt.SyncRepConfig->num_sync - 1) {
int min_require = Min(g_instance.attr.attr_storage.max_wal_senders, t_thrd.syncrep_cxt.SyncRepMaxPossib);
for (i = min_require - 1 ; i >= 0; i--) {
if (i < min_require - 1) {
needReport = true;
}
if (standbyReceiveList[i] != InvalidXLogRecPtr) {

View File

@ -4234,6 +4234,7 @@ static void InitWalSnd(void)
securec_check(rc, "", "");
rc = memset_s((void *)&walsnd->wal_sender_channel, sizeof(ReplConnInfo), 0, sizeof(ReplConnInfo));
securec_check(rc, "", "");
walsnd->sync_standby_group = 0;
walsnd->sync_standby_priority = 0;
walsnd->index = i;
walsnd->log_ctrl.sleep_time = 0;
@ -5329,8 +5330,7 @@ bool WalSndInProgress(int type)
bool WalSndQuorumInProgress(int type)
{
int i;
int num = 0;
int num_sync = t_thrd.syncrep_cxt.SyncRepConfig->num_sync;
int* nums = (int*)palloc0(t_thrd.syncrep_cxt.SyncRepConfigGroups * sizeof(int));
for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) {
/* use volatile pointer to prevent code rearrangement */
@ -5338,15 +5338,19 @@ bool WalSndQuorumInProgress(int type)
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid != 0 && walsnd->pid != t_thrd.proc_cxt.MyProcPid &&
((walsnd->sendRole & type) == walsnd->sendRole)) {
num++;
nums[walsnd->sync_standby_group]++;
}
SpinLockRelease(&walsnd->mutex);
}
if (num_sync <= num) {
return true;
} else {
return false;
for (i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) {
if (t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync > nums[i]) {
pfree(nums);
return false;
}
}
pfree(nums);
return true;
}
bool WalSndAllInProgressForMainStandby(int type)
@ -5810,13 +5814,14 @@ Datum gs_paxos_stat_replication(PG_FUNCTION_ARGS)
*/
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_WAL_SENDERS_COLS 21
#define PG_STAT_GET_WAL_SENDERS_COLS 22
TupleDesc tupdesc;
int *sync_priority = NULL;
int i = 0;
volatile HaShmemData *hashmdata = t_thrd.postmaster_cxt.HaShmData;
List *sync_standbys = NIL;
ListCell *lc = NULL;
Tuplestorestate *tupstore = BuildTupleResult(fcinfo, &tupdesc);
@ -5875,6 +5880,7 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int j = 0;
errno_t rc = 0;
int ret = 0;
int group = 0;
int priority = 0;
SpinLockAcquire(&hashmdata->mutex);
@ -5906,8 +5912,10 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
syncStart = walsnd->syncPercentCountStart;
catchup_time[0] = walsnd->catchupTime[0];
catchup_time[1] = walsnd->catchupTime[1];
if (IS_DN_MULTI_STANDYS_MODE())
if (IS_DN_MULTI_STANDYS_MODE()) {
group = walsnd->sync_standby_group;
priority = walsnd->sync_standby_priority;
}
SpinLockRelease(&walsnd->mutex);
set_xlog_location(local_role, &sndWrite, &sndFlush, &sndReplay);
@ -6043,9 +6051,11 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
/* sync_state and sync_prority */
if (!SyncRepRequested()) {
values[j++] = CStringGetTextDatum("Async");
nulls[j++] = true;
values[j++] = Int32GetDatum(0);
} else {
values[j++] = CStringGetTextDatum("Sync");
nulls[j++] = true;
values[j++] = Int32GetDatum(sync_priority[i]);
}
} else {
@ -6065,14 +6075,18 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* basically useless to report "sync" or "potential" as their sync
* states. We report just "quorum" for them.
*/
if (priority == 0)
if (priority == 0) {
values[j++] = CStringGetTextDatum("Async");
else if (list_member_int(sync_standbys, i)) {
values[j++] = t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY
nulls[j++] = true;
} else if (list_member_int((List*)list_nth(sync_standbys, group), i)) {
values[j++] = GetWalsndSyncRepConfig(walsnd)->syncrep_method == SYNC_REP_PRIORITY
? CStringGetTextDatum("Sync")
: CStringGetTextDatum("Quorum");
} else
values[j++] = Int32GetDatum(group);
} else {
values[j++] = CStringGetTextDatum("Potential");
values[j++] = Int32GetDatum(group);
}
values[j++] = Int32GetDatum(priority);
}
@ -6094,6 +6108,10 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
foreach(lc, sync_standbys) {
list_free((List*)lfirst(lc));
}
list_free(sync_standbys);
if (sync_priority != NULL) {
pfree(sync_priority);

View File

@ -0,0 +1,37 @@
-- for any x group
DO $$
DECLARE
ans boolean;
BEGIN
select case when working_version_num()=92301 then true else false end as ans into ans;
if ans = false then
DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099;
CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders(
OUT pid bigint,
OUT sender_pid integer,
OUT local_role text,
OUT peer_role text,
OUT peer_state text,
OUT state text,
OUT catchup_start timestamp with time zone,
OUT catchup_end timestamp with time zone,
OUT sender_sent_location text,
OUT sender_write_location text,
OUT sender_flush_location text,
OUT sender_replay_location text,
OUT receiver_received_location text,
OUT receiver_write_location text,
OUT receiver_flush_location text,
OUT receiver_replay_location text,
OUT sync_percent text,
OUT sync_state text,
OUT sync_priority integer,
OUT sync_most_available text,
OUT channel text
) RETURNS SETOF record
STABLE NOT FENCED NOT SHIPPABLE ROWS 10
LANGUAGE internal AS 'pg_stat_get_wal_senders';
end if;
END$$;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0;

View File

@ -0,0 +1,37 @@
-- for any x group
DO $$
DECLARE
ans boolean;
BEGIN
select case when working_version_num()=92301 then true else false end as ans into ans;
if ans = false then
DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099;
CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders(
OUT pid bigint,
OUT sender_pid integer,
OUT local_role text,
OUT peer_role text,
OUT peer_state text,
OUT state text,
OUT catchup_start timestamp with time zone,
OUT catchup_end timestamp with time zone,
OUT sender_sent_location text,
OUT sender_write_location text,
OUT sender_flush_location text,
OUT sender_replay_location text,
OUT receiver_received_location text,
OUT receiver_write_location text,
OUT receiver_flush_location text,
OUT receiver_replay_location text,
OUT sync_percent text,
OUT sync_state text,
OUT sync_priority integer,
OUT sync_most_available text,
OUT channel text
) RETURNS SETOF record
STABLE NOT FENCED NOT SHIPPABLE ROWS 10
LANGUAGE internal AS 'pg_stat_get_wal_senders';
end if;
END$$;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0;

View File

@ -0,0 +1,38 @@
-- for any x group
DO $$
DECLARE
ans boolean;
BEGIN
select case when working_version_num()=92301 then true else false end as ans into ans;
if ans = false then
DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099;
CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders(
OUT pid bigint,
OUT sender_pid integer,
OUT local_role text,
OUT peer_role text,
OUT peer_state text,
OUT state text,
OUT catchup_start timestamp with time zone,
OUT catchup_end timestamp with time zone,
OUT sender_sent_location text,
OUT sender_write_location text,
OUT sender_flush_location text,
OUT sender_replay_location text,
OUT receiver_received_location text,
OUT receiver_write_location text,
OUT receiver_flush_location text,
OUT receiver_replay_location text,
OUT sync_percent text,
OUT sync_state text,
OUT sync_group integer,
OUT sync_priority integer,
OUT sync_most_available text,
OUT channel text
) RETURNS SETOF record
STABLE NOT FENCED NOT SHIPPABLE ROWS 10
LANGUAGE internal AS 'pg_stat_get_wal_senders';
end if;
END$$;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0;

View File

@ -0,0 +1,38 @@
-- for any x group
DO $$
DECLARE
ans boolean;
BEGIN
select case when working_version_num()=92301 then true else false end as ans into ans;
if ans = false then
DROP FUNCTION IF EXISTS pg_catalog.pg_stat_get_wal_senders() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 3099;
CREATE FUNCTION pg_catalog.pg_stat_get_wal_senders(
OUT pid bigint,
OUT sender_pid integer,
OUT local_role text,
OUT peer_role text,
OUT peer_state text,
OUT state text,
OUT catchup_start timestamp with time zone,
OUT catchup_end timestamp with time zone,
OUT sender_sent_location text,
OUT sender_write_location text,
OUT sender_flush_location text,
OUT sender_replay_location text,
OUT receiver_received_location text,
OUT receiver_write_location text,
OUT receiver_flush_location text,
OUT receiver_replay_location text,
OUT sync_percent text,
OUT sync_state text,
OUT sync_group integer,
OUT sync_priority integer,
OUT sync_most_available text,
OUT channel text
) RETURNS SETOF record
STABLE NOT FENCED NOT SHIPPABLE ROWS 10
LANGUAGE internal AS 'pg_stat_get_wal_senders';
end if;
END$$;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 0;

View File

@ -2421,7 +2421,7 @@ typedef struct knl_t_replscanner_context {
typedef struct knl_t_syncrepgram_context {
/* Result of parsing is returned in one of these two variables */
struct SyncRepConfigData* syncrep_parse_result;
List* syncrep_parse_result;
} knl_t_syncrepgram_context;
typedef struct knl_t_syncrepscanner_context {
@ -2431,7 +2431,10 @@ typedef struct knl_t_syncrepscanner_context {
} knl_t_syncrepscanner_context;
typedef struct knl_t_syncrep_context {
struct SyncRepConfigData* SyncRepConfig;
struct SyncRepConfigData** SyncRepConfig; // array of SyncRepConfig
int SyncRepConfigGroups; // group of SyncRepConfig
int SyncRepMaxPossib; // max possible sync standby number
bool announce_next_takeover;
} knl_t_syncrep_context;

View File

@ -39,11 +39,16 @@
#define SYNC_REP_PRIORITY 0
#define SYNC_REP_QUORUM 1
#define SYNC_REP_MAX_GROUPS 256
extern volatile bool most_available_sync;
#define SyncStandbysDefined() \
(u_sess->attr.attr_storage.SyncRepStandbyNames != NULL && u_sess->attr.attr_storage.SyncRepStandbyNames[0] != '\0')
#define GetWalsndSyncRepConfig(walsnder) \
(t_thrd.syncrep_cxt.SyncRepConfig[(walsnder)->sync_standby_group])
/*
* Struct for the configuration of synchronous replication.
*

View File

@ -118,10 +118,11 @@ typedef struct WalSnd {
Latch latch;
/*
* The priority order of the standby managed by this WALSender, as listed
* in synchronous_standby_names, or 0 if not-listed. Protected by
* SyncRepLock.
* The strategy group and priority order of the standby managed by this WALSender,
* as listed in synchronous_standby_names, or 0 if not-listed.
* Protected by SyncRepLock.
*/
uint8 sync_standby_group;
int sync_standby_priority;
int index;
LogCtrlData log_ctrl;

View File

@ -0,0 +1,150 @@
show synchronous_standby_names;
synchronous_standby_names
---------------------------
*
(1 row)
-- single gram
alter system set synchronous_standby_names = '*'; -- suc
alter system set synchronous_standby_names = 'd1, *'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "d1, *"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = '*, *'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "*, *"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = 'd1'; -- suc
alter system set synchronous_standby_names = 'd1, d2, d3, d4'; -- suc
alter system set synchronous_standby_names = 'd1, d2, d1, d4'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d1, d4"
DETAIL: Duplicate standby node name: d1
alter system set synchronous_standby_names = '2 (*)'; -- suc
alter system set synchronous_standby_names = '2 (d1, *)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, *)"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = '2 (*, *)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "2 (*, *)"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = '2 (d1, d2, d1, d4)'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d1, d4)"
DETAIL: Duplicate standby node name: d1
alter system set synchronous_standby_names = '5 (d1, d2, d3, d4)'; -- err, requriement more than have
ERROR: invalid value for parameter "synchronous_standby_names": "5 (d1, d2, d3, d4)"
DETAIL: The sync number must less or equals to the number of standby node names.
alter system set synchronous_standby_names = '0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = '-1 (d1, d2, d3, d4)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "-1 (d1, d2, d3, d4)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'first 2 (*)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, *)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, *)"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = 'first 2 (*, *)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (*, *)"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, d2, d1, d4)'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, d2, d1, d4)"
DETAIL: Duplicate standby node name: d1
alter system set synchronous_standby_names = 'first 5 (d1, d2, d3, d4)'; -- err, requriement more than have
ERROR: invalid value for parameter "synchronous_standby_names": "first 5 (d1, d2, d3, d4)"
DETAIL: The sync number must less or equals to the number of standby node names.
alter system set synchronous_standby_names = 'first 0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'first -1 (d1, d2, d3, d4)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "first -1 (d1, d2, d3, d4)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (*)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, *'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, *"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (*, *)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (*, *)"
DETAIL: Please use the '*' separately, otherwise, it is semantic ambiguity.
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, d2, d1, d4)'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d1, d4)"
DETAIL: Duplicate standby node name: d1
alter system set synchronous_standby_names = 'any 5 (d1, d2, d3, d4)'; -- err, requriement more than have
ERROR: invalid value for parameter "synchronous_standby_names": "any 5 (d1, d2, d3, d4)"
DETAIL: The sync number must less or equals to the number of standby node names.
alter system set synchronous_standby_names = 'any 0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'any -1 (d1, d2, d3, d4)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "any -1 (d1, d2, d3, d4)"
DETAIL: synchronous_standby_names parser failed
-- combinate gram
alter system set synchronous_standby_names = 'd1, d2, d3, 1 (d5, d6)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d3, 1 (d5, d6)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (d5, d6)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (d5, d6)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '*, 1 (d5, d6)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "*, 1 (d5, d6)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (*)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "1 (d1, d2, d3), 1 (*)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)"
DETAIL: '*' is not support when sync standby strategy is combinated.
alter system set synchronous_standby_names = 'any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)"
DETAIL: '*' is not support when sync standby strategy is combinated.
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), d5, d6, d7'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), d5, d6, d7"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), *'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), *"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d3, d4), first 2 (d5, d6, d7)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "2 (d1, d2, d3, d4), any 4 (d5, d6, d7)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = '*, any 0 (d5, d6, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "*, any 0 (d5, d6, d7)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'd1, d2, d3, d4, any 2 (d5, d6, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "d1, d2, d3, d4, any 2 (d5, d6, d7)"
DETAIL: synchronous_standby_names parser failed
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)"
DETAIL: Duplicate standby node name: d1
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)'; -- err, duplicate name
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)"
DETAIL: Duplicate standby node name: d6
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)"
DETAIL: FIRST rule is not support when sync standby strategy is combinated.
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, requriement more than have
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)"
DETAIL: The sync number must less or equals to the number of standby node names.
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 0 (d5, d6, d7)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first
ERROR: invalid value for parameter "synchronous_standby_names": "first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)"
DETAIL: FIRST rule is not support when sync standby strategy is combinated.
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)'; -- err, parser err
ERROR: invalid value for parameter "synchronous_standby_names": "any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)"
DETAIL: synchronous_standby_names parser failed
-- recover
alter system set synchronous_standby_names = '*';
select pg_sleep(3);
pg_sleep
----------
(1 row)
show synchronous_standby_names;
synchronous_standby_names
---------------------------
*
(1 row)

View File

@ -37,6 +37,7 @@ test: array_funcs first_last_agg
test: hw_pwd_encryption_sm3
test: sync_standy_names
# test subpartition
test: hw_subpartition_createtable hw_subpartition_scan hw_subpartition_select hw_subpartition_split hw_subpartition_truncate hw_subpartition_update hw_subpartition_gpi hw_subpartition_analyze_vacuum hw_subpartition_alter_table hw_subpartition_index hw_subpartition_add_drop_partition hw_subpartition_tablespace hw_subpartition_ddl_index

View File

@ -0,0 +1,62 @@
show synchronous_standby_names;
-- single gram
alter system set synchronous_standby_names = '*'; -- suc
alter system set synchronous_standby_names = 'd1, *'; -- err, semantic ambiguity
alter system set synchronous_standby_names = '*, *'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'd1'; -- suc
alter system set synchronous_standby_names = 'd1, d2, d3, d4'; -- suc
alter system set synchronous_standby_names = 'd1, d2, d1, d4'; -- err, duplicate name
alter system set synchronous_standby_names = '2 (*)'; -- suc
alter system set synchronous_standby_names = '2 (d1, *)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = '2 (*, *)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = '2 (d1, d2, d1, d4)'; -- err, duplicate name
alter system set synchronous_standby_names = '5 (d1, d2, d3, d4)'; -- err, requriement more than have
alter system set synchronous_standby_names = '0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = '-1 (d1, d2, d3, d4)'; -- err, parser err
alter system set synchronous_standby_names = 'first 2 (*)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, *)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'first 2 (*, *)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, d2, d1, d4)'; -- err, duplicate name
alter system set synchronous_standby_names = 'first 5 (d1, d2, d3, d4)'; -- err, requriement more than have
alter system set synchronous_standby_names = 'first 0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'first -1 (d1, d2, d3, d4)'; -- err, parser err
alter system set synchronous_standby_names = 'any 2 (*)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, *'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'any 2 (*, *)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, d2, d1, d4)'; -- err, duplicate name
alter system set synchronous_standby_names = 'any 5 (d1, d2, d3, d4)'; -- err, requriement more than have
alter system set synchronous_standby_names = 'any 0 (d1, d2, d3, d4)'; -- suc
alter system set synchronous_standby_names = 'any -1 (d1, d2, d3, d4)'; -- err, parser err
-- combinate gram
alter system set synchronous_standby_names = 'd1, d2, d3, 1 (d5, d6)'; -- err, parser err
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (d5, d6)'; -- err, parser err
alter system set synchronous_standby_names = '*, 1 (d5, d6)'; -- err, parser err
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err
alter system set synchronous_standby_names = '1 (d1, d2, d3), 1 (*)'; -- err, parser err
alter system set synchronous_standby_names = 'any 2 (*), any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'any 2 (d, dd), any 2 (d1, *, d3, d4), any 2 (d5, d6, d7)'; -- err, semantic ambiguity
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), d5, d6, d7'; -- err, parser err
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), 2 (d5, d1, d7)'; -- err, parser err
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), *'; -- err, parser err
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, parser err
alter system set synchronous_standby_names = '2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, parser err
alter system set synchronous_standby_names = '*, any 0 (d5, d6, d7)'; -- err, parser err
alter system set synchronous_standby_names = 'd1, d2, d3, d4, any 2 (d5, d6, d7)'; -- err, parser err
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d7)'; -- suc
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d1, d7)'; -- err, duplicate name
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 2 (d5, d6, d6)'; -- err, duplicate name
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 4 (d5, d6, d7)'; -- err, requriement more than have
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), any 0 (d5, d6, d7)'; -- suc
alter system set synchronous_standby_names = 'first 2 (d1, d2, d3, d4), first 2 (d5, d6, d7)'; -- err, not support first
alter system set synchronous_standby_names = 'any 2 (d1, d2, d3, d4), , any 0 (d5, d6, d7)'; -- err, parser err
-- recover
alter system set synchronous_standby_names = '*';
select pg_sleep(3);
show synchronous_standby_names;