Move the construction of ConnectionString in changeQuorumChecker() to coordinatorsCommitActor().

This commit is contained in:
Renxuan Wang 2022-02-24 16:14:06 -08:00
parent 3e761aef3b
commit 70d723f70c
4 changed files with 29 additions and 25 deletions

View File

@ -97,6 +97,7 @@ public:
void resetConnectionString();
void resetToUnresolved();
void parseKey(const std::string& key);
ConnectionStringStatus status = RESOLVED;
AsyncTrigger resolveFinish;
@ -106,7 +107,6 @@ public:
private:
void parseConnString();
void parseKey(const std::string& key);
Key key, keyDesc;
std::string connectionString;
};

View File

@ -772,7 +772,7 @@ ACTOR Future<std::vector<NetworkAddress>> getCoordinators(Database cx) {
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress>* desiredCoordinators) {
ClusterConnectionString* conn) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
@ -789,38 +789,39 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if (!desiredCoordinators->size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait(change->getDesiredCoordinators(
if (!conn->coords.size()) {
std::vector<NetworkAddress> desiredCoordinatorAddresses = wait(change->getDesiredCoordinators(
tr,
old.coordinators(),
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
*desiredCoordinators = _desiredCoordinators;
conn->coords = desiredCoordinatorAddresses;
}
if (result != CoordinatorsResult::SUCCESS)
return result;
if (!desiredCoordinators->size())
if (!conn->coordinators().size())
return CoordinatorsResult::INVALID_NETWORK_ADDRESSES;
std::sort(desiredCoordinators->begin(), desiredCoordinators->end());
std::sort(conn->coords.begin(), conn->coords.end());
std::string newName = change->getDesiredClusterKeyName();
if (newName.empty())
newName = old.clusterKeyName().toString();
if (old.coordinators() == *desiredCoordinators && old.clusterKeyName() == newName)
if (old.coordinators() == conn->coordinators() && old.clusterKeyName() == newName)
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
state ClusterConnectionString conn(*desiredCoordinators,
StringRef(newName + ':' + deterministicRandom()->randomAlphaNumeric(32)));
std::string key(newName + ':' + deterministicRandom()->randomAlphaNumeric(32));
conn->parseKey(key);
conn->resetConnectionString();
if (g_network->isSimulated()) {
int i = 0;
int protectedCount = 0;
while ((protectedCount < ((desiredCoordinators->size() / 2) + 1)) && (i < desiredCoordinators->size())) {
auto process = g_simulator.getProcessByAddress((*desiredCoordinators)[i]);
while ((protectedCount < ((conn->coordinators().size() / 2) + 1)) && (i < conn->coordinators().size())) {
auto process = g_simulator.getProcessByAddress(conn->coordinators()[i]);
auto addresses = process->addresses;
if (!process->isReliable()) {
@ -832,14 +833,14 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
if (addresses.secondaryAddress.present()) {
g_simulator.protectedAddresses.insert(process->addresses.secondaryAddress.get());
}
TraceEvent("ProtectCoordinator").detail("Address", (*desiredCoordinators)[i]).backtrace();
TraceEvent("ProtectCoordinator").detail("Address", conn->coordinators()[i]).backtrace();
protectedCount++;
i++;
}
}
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(conn)));
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(*conn)));
leaderServers.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
@ -851,7 +852,7 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
}
tr->set(coordinatorsKey, conn.toString());
tr->set(coordinatorsKey, conn->toString());
return Optional<CoordinatorsResult>();
}

View File

@ -56,7 +56,7 @@ struct IQuorumChange : ReferenceCounted<IQuorumChange> {
// Change to use the given set of coordination servers
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress>* desiredCoordinators);
ClusterConnectionString* conn);
ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChange> change);
Reference<IQuorumChange> autoQuorumChange(int desired = -1);
Reference<IQuorumChange> noQuorumChange();

View File

@ -1628,7 +1628,8 @@ Future<RangeResult> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, K
ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
state Reference<IQuorumChange> change;
state std::vector<NetworkAddress> addressesVec;
state ClusterConnectionString
conn; // We don't care about the Key here, it will be overrode in changeQuorumChecker().
state std::vector<std::string> process_address_strs;
state Optional<std::string> msg;
state int index;
@ -1650,11 +1651,12 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
for (index = 0; index < process_address_strs.size(); index++) {
try {
auto a = NetworkAddress::parse(process_address_strs[index]);
if (!a.isValid())
NetworkAddress a = NetworkAddress::parse(process_address_strs[index]);
if (!a.isValid()) {
parse_error = true;
else
addressesVec.push_back(a);
} else {
conn.coords.push_back(a);
}
} catch (Error& e) {
TraceEvent(SevDebug, "SpecialKeysNetworkParseError").error(e);
parse_error = true;
@ -1670,8 +1672,9 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
}
if (addressesVec.size())
change = specifiedQuorumChange(addressesVec);
wait(conn.resolveHostnames());
if (conn.coordinators().size())
change = specifiedQuorumChange(conn.coordinators());
else
change = noQuorumChange();
@ -1693,10 +1696,10 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
ASSERT(change.isValid());
TraceEvent(SevDebug, "SKSChangeCoordinatorsStart")
.detail("NewAddresses", describe(addressesVec))
.detail("NewAddresses", describe(conn.coordinators()))
.detail("Description", entry.first ? entry.second.get().toString() : "");
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec));
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &conn));
TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish")
.detail("Result", r.present() ? static_cast<int>(r.get()) : -1); // -1 means success