Change ClusterConnectionString's status flag from boolean to enum.

So that when multiple threads try to resolve hostnames concurrently, we should have only one resolving and others wait on the result. Otherwise, they would add duplicated resolved results to coordinators.
This commit is contained in:
Renxuan Wang 2022-02-18 14:30:53 -08:00
parent 6d101732e1
commit c29c522d3e
2 changed files with 79 additions and 44 deletions

View File

@ -58,13 +58,28 @@ struct ClientLeaderRegInterface {
// - There is no address present more than once
class ClusterConnectionString {
public:
enum ConnectionStringStatus { RESOLVED, RESOLVING, UNRESOLVED };
ClusterConnectionString() {}
ClusterConnectionString(const std::string& connStr);
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
ClusterConnectionString(const std::vector<Hostname>& hosts, Key key);
ClusterConnectionString(const ClusterConnectionString& rhs) { operator=(rhs); }
ClusterConnectionString& operator=(const ClusterConnectionString& rhs) {
// Copy everything except AsyncTrigger resolveFinish.
status = rhs.status;
coords = rhs.coords;
hostnames = rhs.hostnames;
networkAddressToHostname = rhs.networkAddressToHostname;
key = rhs.key;
keyDesc = rhs.keyDesc;
connectionString = rhs.connectionString;
return *this;
}
std::vector<NetworkAddress> const& coordinators() const { return coords; }
void addResolved(Hostname hostname, NetworkAddress address) {
void addResolved(const Hostname& hostname, const NetworkAddress& address) {
coords.push_back(address);
networkAddressToHostname.emplace(address, hostname);
}
@ -80,7 +95,8 @@ public:
void resolveHostnamesBlocking();
void resetToUnresolved();
bool hasUnresolvedHostnames = false;
ConnectionStringStatus status = RESOLVED;
AsyncTrigger resolveFinish;
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
@ -139,7 +155,7 @@ public:
// Signals to the connection record that it was successfully used to connect to a cluster.
void notifyConnected();
bool hasUnresolvedHostnames() const;
ClusterConnectionString::ConnectionStringStatus connectionStringStatus() const;
Future<Void> resolveHostnames();
// This one should only be used when resolving asynchronously is impossible. For all other cases, resolveHostnames()
// should be preferred.

View File

@ -77,8 +77,8 @@ void IClusterConnectionRecord::setPersisted() {
connectionStringNeedsPersisted = false;
}
bool IClusterConnectionRecord::hasUnresolvedHostnames() const {
return cs.hasUnresolvedHostnames;
ClusterConnectionString::ConnectionStringStatus IClusterConnectionRecord::connectionStringStatus() const {
return cs.status;
}
Future<Void> IClusterConnectionRecord::resolveHostnames() {
@ -98,39 +98,56 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E
}
ACTOR Future<Void> resolveHostnamesImpl(ClusterConnectionString* self) {
std::vector<Future<Void>> fs;
for (auto const& hostName : self->hostnames) {
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostName.host, hostName.service),
[=](std::vector<NetworkAddress> const& addresses) -> Void {
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
addr.flags = 0; // Reset the parsed address to public
addr.fromHostname = NetworkAddressFromHostname::True;
if (hostName.isTLS) {
addr.flags |= NetworkAddress::FLAG_TLS;
}
self->addResolved(hostName, addr);
return Void();
}));
loop {
if (self->status == ClusterConnectionString::UNRESOLVED) {
self->status = ClusterConnectionString::RESOLVING;
std::vector<Future<Void>> fs;
for (auto const& hostname : self->hostnames) {
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostname.host, hostname.service),
[=](std::vector<NetworkAddress> const& addresses) -> Void {
NetworkAddress address =
addresses[deterministicRandom()->randomInt(0, addresses.size())];
address.flags = 0; // Reset the parsed address to public
address.fromHostname = NetworkAddressFromHostname::True;
if (hostname.isTLS) {
address.flags |= NetworkAddress::FLAG_TLS;
}
self->addResolved(hostname, address);
return Void();
}));
}
wait(waitForAll(fs));
std::sort(self->coords.begin(), self->coords.end());
if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) {
self->status = ClusterConnectionString::UNRESOLVED;
self->resolveFinish.trigger();
throw connection_string_invalid();
}
self->status = ClusterConnectionString::RESOLVED;
self->resolveFinish.trigger();
break;
} else if (self->status == ClusterConnectionString::RESOLVING) {
wait(self->resolveFinish.onTrigger());
if (self->status == ClusterConnectionString::RESOLVED) {
break;
}
// Otherwise, this means other threads failed on resolve, so here we go back to the loop and try to resolve
// again.
} else {
// status is RESOLVED, nothing to do.
break;
}
}
wait(waitForAll(fs));
std::sort(self->coords.begin(), self->coords.end());
if (std::unique(self->coords.begin(), self->coords.end()) != self->coords.end()) {
throw connection_string_invalid();
}
self->hasUnresolvedHostnames = false;
return Void();
}
Future<Void> ClusterConnectionString::resolveHostnames() {
if (!hasUnresolvedHostnames) {
return Void();
} else {
return resolveHostnamesImpl(this);
}
return resolveHostnamesImpl(this);
}
void ClusterConnectionString::resolveHostnamesBlocking() {
if (hasUnresolvedHostnames) {
if (status != RESOLVED) {
status = RESOLVING;
for (auto const& hostname : hostnames) {
std::vector<NetworkAddress> addresses =
INetworkConnections::net()->resolveTCPEndpointBlocking(hostname.host, hostname.service);
@ -140,14 +157,14 @@ void ClusterConnectionString::resolveHostnamesBlocking() {
if (hostname.isTLS) {
address.flags |= NetworkAddress::FLAG_TLS;
}
coords.push_back(address);
networkAddressToHostname.emplace(address, hostname);
addResolved(hostname, address);
}
std::sort(coords.begin(), coords.end());
if (std::unique(coords.begin(), coords.end()) != coords.end()) {
status = UNRESOLVED;
throw connection_string_invalid();
}
hasUnresolvedHostnames = false;
status = RESOLVED;
}
}
@ -156,7 +173,7 @@ void ClusterConnectionString::resetToUnresolved() {
coords.clear();
hostnames.clear();
networkAddressToHostname.clear();
hasUnresolvedHostnames = true;
status = UNRESOLVED;
parseConnString();
}
}
@ -184,7 +201,9 @@ void ClusterConnectionString::parseConnString() {
}
p = pComma + 1;
}
hasUnresolvedHostnames = hostnames.size() > 0;
if (hostnames.size() > 0) {
status = UNRESOLVED;
}
ASSERT((coords.size() + hostnames.size()) > 0);
std::sort(coords.begin(), coords.end());
@ -256,7 +275,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
{
input = "asdf:2345@localhost:1234";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 1);
ASSERT(input == cs.toString());
}
@ -264,7 +283,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
{
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
ClusterConnectionString cs(input);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -277,7 +296,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -290,7 +309,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
commented += "# asdfasdf ##";
ClusterConnectionString cs(commented);
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(input == cs.toString());
}
@ -314,16 +333,16 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address2 });
state ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
wait(cs.resolveHostnames());
ASSERT(!cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::RESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 2);
ASSERT(cs.toString() == connectionString);
cs.resetToUnresolved();
ASSERT(cs.hasUnresolvedHostnames);
ASSERT(cs.status == ClusterConnectionString::UNRESOLVED);
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
ASSERT(cs.toString() == connectionString);
@ -422,7 +441,7 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") {
}
ClusterConnectionString::ClusterConnectionString(const std::vector<NetworkAddress>& servers, Key key)
: coords(servers) {
: status(RESOLVED), coords(servers) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";
@ -435,7 +454,7 @@ ClusterConnectionString::ClusterConnectionString(const std::vector<NetworkAddres
}
ClusterConnectionString::ClusterConnectionString(const std::vector<Hostname>& hosts, Key key)
: hasUnresolvedHostnames(true), hostnames(hosts) {
: status(UNRESOLVED), hostnames(hosts) {
std::string keyString = key.toString();
parseKey(keyString);
connectionString = keyString + "@";