Update coordinator list from cluster file (#7382)

* Log failed connection attempts in monitorProxies

* Update coordinator list from the cluster file after failing to connect to all coordinators

* Wiggle and upgrade test with legacy version monitoring; updating tests to use 7.1.9

* Update coordinator list from the cluster file: addressing review comments

* Update coordinator list from the cluster file: addressing review comments

* Wait on future for all setAndPersistConnectionString calls
This commit is contained in:
Vaidas Gasiunas 2022-06-23 09:22:09 +02:00 committed by GitHub
parent 75423a100c
commit e28a8401fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 33 deletions

View File

@ -330,7 +330,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0"
--upgrade-path "6.3.23" "7.0.0" "7.1.9" "7.2.0"
--process-number 1
)
@ -338,7 +338,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "7.0.0" "7.1.5" "7.2.0"
--upgrade-path "7.0.0" "7.1.9" "7.2.0"
--process-number 1
)
@ -346,7 +346,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "6.3.23" "7.0.0" "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -354,7 +354,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "7.0.0" "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -362,7 +362,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -376,15 +376,25 @@ endif()
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "wiggle" "7.2.0"
--disable-log-dump
--process-number 3
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade_latest
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.1.9" "wiggle" "7.2.0"
--disable-log-dump
--process-number 3
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade_63
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.24" "wiggle" "7.0.0"
--disable-log-dump
--process-number 3
--redundancy double
)
endif()

View File

@ -46,6 +46,8 @@ struct ClientLeaderRegInterface {
bool operator==(const ClientLeaderRegInterface& rhs) const {
return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase;
}
std::string getAddressString() const;
};
// A string containing the information necessary to connect to a cluster.

View File

@ -485,6 +485,14 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) {
TaskPriority::Coordination);
}
std::string ClientLeaderRegInterface::getAddressString() const {
if (hostname.present()) {
return hostname.get().toString();
} else {
return getLeader.getEndpoint().getPrimaryAddress().toString();
}
}
// Nominee is the worker among all workers that are considered as leader by one coordinator
// This function contacts a coordinator coord to ask who is its nominee.
ACTOR Future<Void> monitorNominee(Key key,
@ -510,9 +518,7 @@ ACTOR Future<Void> monitorNominee(Key key,
TraceEvent("GetLeaderReply")
.suppressFor(1.0)
.detail("Coordinator",
coord.hostname.present() ? coord.hostname.get().toString()
: coord.getLeader.getEndpoint().getPrimaryAddress().toString())
.detail("Coordinator", coord.getAddressString())
.detail("Nominee", li.present() ? li.get().changeID : UID())
.detail("ClusterKey", key.printable());
@ -581,6 +587,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state Future<Void> allActors;
state Optional<std::pair<LeaderInfo, bool>> leader;
nominees.resize(coordinators.clientLeaderServers.size());
@ -594,7 +601,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
allActors = waitForAll(actors);
loop {
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
leader = getLeader(nominees);
TraceEvent("MonitorLeaderChange")
.detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1));
if (leader.present()) {
@ -615,7 +622,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
wait(connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()));
info.intermediateConnRecord = connRecord;
}
@ -871,6 +878,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
state std::vector<UID> lastGrvProxyUIDs;
state std::vector<GrvProxyInterface> lastGrvProxies;
state std::vector<ClientLeaderRegInterface> clientLeaderServers;
state bool allConnectionsFailed = false;
clientLeaderServers.reserve(coordinatorsSize);
for (const auto& h : cs.hostnames) {
@ -896,7 +904,22 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
state ClusterConnectionString storedConnectionString;
if (connRecord) {
bool upToDate = wait(connRecord->upToDate(storedConnectionString));
if (!upToDate) {
if (upToDate) {
incorrectTime = Optional<double>();
} else if (allConnectionsFailed) {
// Failed to connect to all coordinators from the current connection string,
// so it is not possible to get any new updates from the cluster. It can be that
// all the coordinators have changed, but the client missed that, because it had
// an incompatible protocol version. Since the cluster file is different,
// it may have been updated by other clients.
TraceEvent("UpdatingConnectionStringFromFile")
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connRecord->getConnectionString().toString());
wait(connRecord->setAndPersistConnectionString(storedConnectionString));
info.intermediateConnRecord = connRecord;
return info;
} else {
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connRecord->getConnectionString().toString();
if (!incorrectTime.present()) {
@ -909,8 +932,6 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
} else {
incorrectTime = Optional<double>();
}
} else {
incorrectTime = Optional<double>();
@ -953,7 +974,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
wait(connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()));
info.intermediateConnRecord = connRecord;
}
@ -964,11 +985,16 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
clientInfo->setUnconditional(ni);
successIndex = index;
allConnectionsFailed = false;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
TEST(rep.getError().code() == error_code_lookup_failed); // Coordinator hostname resolving failure
TraceEvent("MonitorProxiesConnectFailed")
.detail("Error", rep.getError().name())
.detail("Coordinator", clientLeaderServer.getAddressString());
index = (index + 1) % coordinatorsSize;
if (index == successIndex) {
allConnectionsFailed = true;
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
}
}

View File

@ -349,13 +349,7 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<I
int coordinatorsUnavailable = 0;
for (int i = 0; i < leaderServers.size(); i++) {
StatusObject coordStatus;
if (coord.clientLeaderServers[i].hostname.present()) {
coordStatus["address"] = coord.clientLeaderServers[i].hostname.get().toString();
} else {
coordStatus["address"] =
coord.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress().toString();
}
coordStatus["address"] = coord.clientLeaderServers[i].getAddressString();
if (leaderServers[i].isReady()) {
coordStatus["reachable"] = true;
} else {

View File

@ -166,8 +166,8 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
.detail("StoredConnectionString", coordinators.ccr->getConnectionString().toString())
.detail("CurrentConnectionString", leader.get().first.serializedInfo.toString());
}
coordinators.ccr->setAndPersistConnectionString(
ClusterConnectionString(leader.get().first.serializedInfo.toString()));
wait(coordinators.ccr->setAndPersistConnectionString(
ClusterConnectionString(leader.get().first.serializedInfo.toString())));
TraceEvent("LeaderForwarding")
.detail("ConnStr", coordinators.ccr->getConnectionString().toString())
.trackLatest("LeaderForwarding");

View File

@ -3026,7 +3026,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
wait(connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()));
info.intermediateConnRecord = connRecord;
}

View File

@ -21,6 +21,10 @@ from local_cluster import LocalCluster, random_secret_string
SUPPORTED_PLATFORMS = ["x86_64"]
SUPPORTED_VERSIONS = [
"7.2.0",
"7.1.9",
"7.1.8",
"7.1.7",
"7.1.6",
"7.1.5",
"7.1.4",
"7.1.3",