Add the support of using hostname in ConnectionString.
This PR comes without simulation tests except some unit tests. The simulation tests will be in the PR that uses hostname in code logic.
This commit is contained in:
parent
6e410f7712
commit
a6c482ee91
|
@ -59,22 +59,39 @@ struct ClientLeaderRegInterface {
|
|||
class ClusterConnectionString {
|
||||
public:
|
||||
ClusterConnectionString() {}
|
||||
ClusterConnectionString(std::string const& connectionString);
|
||||
ClusterConnectionString(std::string const& connStr);
|
||||
ClusterConnectionString(std::vector<NetworkAddress>, Key);
|
||||
ClusterConnectionString(std::vector<Hostname> hostnames, Key key);
|
||||
|
||||
std::vector<NetworkAddress> const& coordinators() const { return coord; }
|
||||
std::vector<NetworkAddress>* mutableCoordinators() { return &coord; }
|
||||
std::vector<Hostname> const& hostnames() const { return hosts; }
|
||||
std::vector<Hostname>* mutableHostnames() { return &hosts; }
|
||||
std::unordered_map<NetworkAddress, Hostname>* mutableNetworkAddressToHostname() {
|
||||
return &_networkAddressToHostname;
|
||||
}
|
||||
std::unordered_map<NetworkAddress, Hostname> const& networkAddressToHostname() const {
|
||||
return _networkAddressToHostname;
|
||||
};
|
||||
Key clusterKey() const { return key; }
|
||||
Key clusterKeyName() const {
|
||||
return keyDesc;
|
||||
} // Returns the "name" or "description" part of the clusterKey (the part before the ':')
|
||||
std::string toString() const;
|
||||
static std::string getErrorString(std::string const& source, Error const& e);
|
||||
Future<Void> resolveHostnames();
|
||||
void resetToUnresolved();
|
||||
|
||||
bool hasUnresolvedHostnames = false;
|
||||
|
||||
private:
|
||||
void parseConnString();
|
||||
void parseKey(std::string const& key);
|
||||
|
||||
std::vector<Hostname> hosts;
|
||||
std::unordered_map<NetworkAddress, Hostname> _networkAddressToHostname;
|
||||
std::vector<NetworkAddress> coord;
|
||||
Key key, keyDesc;
|
||||
std::string connectionString;
|
||||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ConnectionStringNeedsPersisted);
|
||||
|
@ -95,6 +112,8 @@ public:
|
|||
// been persisted or if the persistent storage for the record has been modified externally.
|
||||
ClusterConnectionString const& getConnectionString() const;
|
||||
|
||||
ClusterConnectionString* getMutableConnectionString();
|
||||
|
||||
// Sets the connections string held by this object and persists it.
|
||||
virtual Future<Void> setConnectionString(ClusterConnectionString const&) = 0;
|
||||
|
||||
|
@ -124,6 +143,9 @@ public:
|
|||
// Signals to the connection record that it was successfully used to connect to a cluster.
|
||||
void notifyConnected();
|
||||
|
||||
bool hasUnresolvedHostnames() const;
|
||||
Future<Void> resolveHostnames();
|
||||
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
|
||||
|
@ -151,7 +173,10 @@ struct LeaderInfo {
|
|||
UID changeID;
|
||||
static const uint64_t changeIDMask = ~(uint64_t(0b1111111) << 57);
|
||||
Value serializedInfo;
|
||||
bool forward; // If true, serializedInfo is a connection string instead!
|
||||
// If true, serializedInfo is a connection string instead!
|
||||
// If true, it also means the receipient need to update their local cluster file
|
||||
// with the latest list of coordinators
|
||||
bool forward;
|
||||
|
||||
LeaderInfo() : forward(false) {}
|
||||
LeaderInfo(UID changeID) : changeID(changeID), forward(false) {}
|
||||
|
|
|
@ -58,6 +58,10 @@ ClusterConnectionString const& IClusterConnectionRecord::getConnectionString() c
|
|||
return cs;
|
||||
}
|
||||
|
||||
ClusterConnectionString* IClusterConnectionRecord::getMutableConnectionString() {
|
||||
return &cs;
|
||||
}
|
||||
|
||||
Future<bool> IClusterConnectionRecord::upToDate() {
|
||||
ClusterConnectionString temp;
|
||||
return upToDate(temp);
|
||||
|
@ -77,6 +81,14 @@ void IClusterConnectionRecord::setPersisted() {
|
|||
connectionStringNeedsPersisted = false;
|
||||
}
|
||||
|
||||
bool IClusterConnectionRecord::hasUnresolvedHostnames() const {
|
||||
return cs.hasUnresolvedHostnames;
|
||||
}
|
||||
|
||||
Future<Void> IClusterConnectionRecord::resolveHostnames() {
|
||||
return cs.resolveHostnames();
|
||||
}
|
||||
|
||||
std::string ClusterConnectionString::getErrorString(std::string const& source, Error const& e) {
|
||||
if (e.code() == error_code_connection_string_invalid) {
|
||||
return format("Invalid connection string `%s: %d %s", source.c_str(), e.code(), e.what());
|
||||
|
@ -85,28 +97,89 @@ std::string ClusterConnectionString::getErrorString(std::string const& source, E
|
|||
}
|
||||
}
|
||||
|
||||
ClusterConnectionString::ClusterConnectionString(std::string const& connectionString) {
|
||||
auto trimmed = trim(connectionString);
|
||||
|
||||
// Split on '@' into key@addrs
|
||||
int pAt = trimmed.find_first_of('@');
|
||||
if (pAt == trimmed.npos)
|
||||
ACTOR Future<Void> _resolveHostnames(ClusterConnectionString* self) {
|
||||
std::vector<Future<Void>> fs;
|
||||
for (auto const& hostName : self->hostnames()) {
|
||||
fs.push_back(map(INetworkConnections::net()->resolveTCPEndpoint(hostName.host, hostName.service),
|
||||
[=](std::vector<NetworkAddress> const& addresses) -> Void {
|
||||
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
|
||||
addr.flags = 0; // Reset the parsed address to public
|
||||
addr.fromHostname = NetworkAddressFromHostname::True;
|
||||
if (hostName.isTLS) {
|
||||
addr.flags |= NetworkAddress::FLAG_TLS;
|
||||
}
|
||||
self->mutableCoordinators()->push_back(addr);
|
||||
self->mutableNetworkAddressToHostname()->emplace(addr, hostName);
|
||||
return Void();
|
||||
}));
|
||||
}
|
||||
wait(waitForAll(fs));
|
||||
std::sort(self->mutableCoordinators()->begin(), self->mutableCoordinators()->end());
|
||||
if (std::unique(self->mutableCoordinators()->begin(), self->mutableCoordinators()->end()) !=
|
||||
self->mutableCoordinators()->end()) {
|
||||
throw connection_string_invalid();
|
||||
std::string key = trimmed.substr(0, pAt);
|
||||
std::string addrs = trimmed.substr(pAt + 1);
|
||||
}
|
||||
self->hasUnresolvedHostnames = false;
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> ClusterConnectionString::resolveHostnames() {
|
||||
if (!hasUnresolvedHostnames) {
|
||||
return Void();
|
||||
} else {
|
||||
return _resolveHostnames(this);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterConnectionString::resetToUnresolved() {
|
||||
if (hosts.size() > 0) {
|
||||
coord.clear();
|
||||
hosts.clear();
|
||||
_networkAddressToHostname.clear();
|
||||
hasUnresolvedHostnames = true;
|
||||
parseConnString();
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterConnectionString::parseConnString() {
|
||||
// Split on '@' into key@addrs
|
||||
int pAt = connectionString.find_first_of('@');
|
||||
if (pAt == connectionString.npos) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
std::string key = connectionString.substr(0, pAt);
|
||||
std::string addrs = connectionString.substr(pAt + 1);
|
||||
|
||||
parseKey(key);
|
||||
|
||||
coord = NetworkAddress::parseList(addrs);
|
||||
ASSERT(coord.size() > 0); // parseList() always returns at least one address if it doesn't throw
|
||||
std::string curAddr;
|
||||
for (int p = 0; p <= addrs.size();) {
|
||||
int pComma = addrs.find_first_of(',', p);
|
||||
if (pComma == addrs.npos)
|
||||
pComma = addrs.size();
|
||||
curAddr = addrs.substr(p, pComma - p);
|
||||
if (Hostname::isHostname(curAddr)) {
|
||||
hosts.push_back(Hostname::parse(curAddr));
|
||||
} else {
|
||||
coord.push_back(NetworkAddress::parse(curAddr));
|
||||
}
|
||||
p = pComma + 1;
|
||||
}
|
||||
hasUnresolvedHostnames = hosts.size() > 0;
|
||||
ASSERT((coord.size() + hosts.size()) > 0);
|
||||
|
||||
std::sort(coord.begin(), coord.end());
|
||||
// Check that there are no duplicate addresses
|
||||
if (std::unique(coord.begin(), coord.end()) != coord.end())
|
||||
if (std::unique(coord.begin(), coord.end()) != coord.end()) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
|
||||
ClusterConnectionString::ClusterConnectionString(std::string const& connStr) {
|
||||
connectionString = trim(connStr);
|
||||
parseConnString();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/addresses") {
|
||||
std::string input;
|
||||
|
||||
{
|
||||
|
@ -157,6 +230,97 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/hostnames") {
|
||||
std::string input;
|
||||
|
||||
{
|
||||
input = "asdf:2345@localhost:1234";
|
||||
ClusterConnectionString cs(input);
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 1);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
{
|
||||
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
|
||||
ClusterConnectionString cs(input);
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
{
|
||||
input = "0xxdeadbeef:100100100@localhost:34534,host-name:23443";
|
||||
std::string commented("#start of comment\n");
|
||||
commented += input;
|
||||
commented += "\n";
|
||||
commented += "# asdfasdf ##";
|
||||
|
||||
ClusterConnectionString cs(commented);
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
{
|
||||
input = "0xxdeadbeef:100100100@localhost:34534,host-name_part1.host-name_part2:1234:tls";
|
||||
std::string commented("#start of comment\n");
|
||||
commented += input;
|
||||
commented += "\n";
|
||||
commented += "# asdfasdf ##";
|
||||
|
||||
ClusterConnectionString cs(commented);
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(input == cs.toString());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
|
||||
state std::string connectionString = "TestCluster:0@localhost:1234,host-name:5678";
|
||||
std::string hn1 = "localhost", port1 = "1234";
|
||||
state std::string hn2 = "host-name";
|
||||
state std::string port2 = "5678";
|
||||
state std::vector<Hostname> hostnames;
|
||||
hostnames.push_back(Hostname::parse(hn1 + ":" + port1));
|
||||
hostnames.push_back(Hostname::parse(hn2 + ":" + port2));
|
||||
|
||||
NetworkAddress address1 = NetworkAddress::parse("127.0.0.0:1234");
|
||||
NetworkAddress address2 = NetworkAddress::parse("127.0.0.1:5678");
|
||||
|
||||
INetworkConnections::net()->addMockTCPEndpoint(hn1, port1, { address1 });
|
||||
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address2 });
|
||||
|
||||
state ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(cs.coordinators().size() == 0);
|
||||
wait(cs.resolveHostnames());
|
||||
ASSERT(!cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(cs.coordinators().size() == 2);
|
||||
ASSERT(cs.toString() == connectionString);
|
||||
cs.resetToUnresolved();
|
||||
ASSERT(cs.hasUnresolvedHostnames);
|
||||
ASSERT(cs.hostnames().size() == 2);
|
||||
ASSERT(cs.coordinators().size() == 0);
|
||||
ASSERT(cs.toString() == connectionString);
|
||||
|
||||
INetworkConnections::net()->removeMockTCPEndpoint(hn2, port2);
|
||||
NetworkAddress address3 = NetworkAddress::parse("127.0.0.0:5678");
|
||||
INetworkConnections::net()->addMockTCPEndpoint(hn2, port2, { address3 });
|
||||
|
||||
try {
|
||||
wait(cs.resolveHostnames());
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_connection_string_invalid);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/flow/FlatBuffers/LeaderInfo") {
|
||||
{
|
||||
LeaderInfo in;
|
||||
|
@ -241,25 +405,43 @@ ClusterConnectionString::ClusterConnectionString(std::vector<NetworkAddress> ser
|
|||
parseKey(key.toString());
|
||||
}
|
||||
|
||||
ClusterConnectionString::ClusterConnectionString(std::vector<Hostname> hostnames, Key key)
|
||||
: hasUnresolvedHostnames(true), hosts(hostnames) {
|
||||
std::string keyString = key.toString();
|
||||
parseKey(keyString);
|
||||
connectionString = keyString + "@";
|
||||
for (int i = 0; i < hostnames.size(); i++) {
|
||||
if (i) {
|
||||
connectionString += ',';
|
||||
}
|
||||
connectionString += hostnames[i].toString();
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterConnectionString::parseKey(std::string const& key) {
|
||||
// Check the structure of the given key, and fill in this->key and this->keyDesc
|
||||
|
||||
// The key must contain one (and only one) : character
|
||||
int colon = key.find_first_of(':');
|
||||
if (colon == key.npos)
|
||||
if (colon == key.npos) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
std::string desc = key.substr(0, colon);
|
||||
std::string id = key.substr(colon + 1);
|
||||
|
||||
// Check that description contains only allowed characters (a-z, A-Z, 0-9, _)
|
||||
for (auto c = desc.begin(); c != desc.end(); ++c)
|
||||
if (!(isalnum(*c) || *c == '_'))
|
||||
for (auto c = desc.begin(); c != desc.end(); ++c) {
|
||||
if (!(isalnum(*c) || *c == '_')) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
}
|
||||
|
||||
// Check that ID contains only allowed characters (a-z, A-Z, 0-9)
|
||||
for (auto c = id.begin(); c != id.end(); ++c)
|
||||
if (!isalnum(*c))
|
||||
for (auto c = id.begin(); c != id.end(); ++c) {
|
||||
if (!isalnum(*c)) {
|
||||
throw connection_string_invalid();
|
||||
}
|
||||
}
|
||||
|
||||
this->key = StringRef(key);
|
||||
this->keyDesc = StringRef(desc);
|
||||
|
@ -269,10 +451,18 @@ std::string ClusterConnectionString::toString() const {
|
|||
std::string s = key.toString();
|
||||
s += '@';
|
||||
for (int i = 0; i < coord.size(); i++) {
|
||||
if (i) {
|
||||
if (_networkAddressToHostname.find(coord[i]) == _networkAddressToHostname.end()) {
|
||||
if (i) {
|
||||
s += ',';
|
||||
}
|
||||
s += coord[i].toString();
|
||||
}
|
||||
}
|
||||
for (auto const& host : hosts) {
|
||||
if (s.find('@') != s.length() - 1) {
|
||||
s += ',';
|
||||
}
|
||||
s += coord[i].toString();
|
||||
s += host.toString();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -669,7 +859,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
Key traceLogGroup) {
|
||||
state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
|
||||
state std::vector<NetworkAddress> addrs = cs.coordinators();
|
||||
state int idx = 0;
|
||||
state int index = 0;
|
||||
state int successIndex = 0;
|
||||
state Optional<double> incorrectTime;
|
||||
state std::vector<UID> lastCommitProxyUIDs;
|
||||
|
@ -679,7 +869,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
|
||||
deterministicRandom()->randomShuffle(addrs);
|
||||
loop {
|
||||
state ClientLeaderRegInterface clientLeaderServer(addrs[idx]);
|
||||
state ClientLeaderRegInterface clientLeaderServer(addrs[index]);
|
||||
state OpenDatabaseCoordRequest req;
|
||||
|
||||
coordinator->set(clientLeaderServer);
|
||||
|
@ -742,11 +932,11 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
|
|||
auto& ni = rep.get().mutate();
|
||||
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
|
||||
clientInfo->set(ni);
|
||||
successIndex = idx;
|
||||
successIndex = index;
|
||||
} else {
|
||||
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
|
||||
idx = (idx + 1) % addrs.size();
|
||||
if (idx == successIndex) {
|
||||
index = (index + 1) % addrs.size();
|
||||
if (index == successIndex) {
|
||||
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,13 +180,13 @@ std::string formatIpPort(const IPAddress& ip, uint16_t port) {
|
|||
|
||||
Future<Reference<IConnection>> INetworkConnections::connect(const std::string& host,
|
||||
const std::string& service,
|
||||
bool useTLS) {
|
||||
bool isTLS) {
|
||||
// Use map to create an actor that returns an endpoint or throws
|
||||
Future<NetworkAddress> pickEndpoint =
|
||||
map(resolveTCPEndpoint(host, service), [=](std::vector<NetworkAddress> const& addresses) -> NetworkAddress {
|
||||
NetworkAddress addr = addresses[deterministicRandom()->randomInt(0, addresses.size())];
|
||||
addr.fromHostname = true;
|
||||
if (useTLS) {
|
||||
if (isTLS) {
|
||||
addr.flags = NetworkAddress::FLAG_TLS;
|
||||
}
|
||||
return addr;
|
||||
|
@ -272,25 +272,25 @@ TEST_CASE("/flow/network/hostname") {
|
|||
ASSERT(hn1.toString() == hn1s);
|
||||
ASSERT(hn1.host == "localhost");
|
||||
ASSERT(hn1.service == "1234");
|
||||
ASSERT(!hn1.useTLS);
|
||||
ASSERT(!hn1.isTLS);
|
||||
|
||||
auto hn2 = Hostname::parse(hn2s);
|
||||
ASSERT(hn2.toString() == hn2s);
|
||||
ASSERT(hn2.host == "host-name");
|
||||
ASSERT(hn2.service == "1234");
|
||||
ASSERT(!hn2.useTLS);
|
||||
ASSERT(!hn2.isTLS);
|
||||
|
||||
auto hn3 = Hostname::parse(hn3s);
|
||||
ASSERT(hn3.toString() == hn3s);
|
||||
ASSERT(hn3.host == "host.name");
|
||||
ASSERT(hn3.service == "1234");
|
||||
ASSERT(!hn3.useTLS);
|
||||
ASSERT(!hn3.isTLS);
|
||||
|
||||
auto hn4 = Hostname::parse(hn4s);
|
||||
ASSERT(hn4.toString() == hn4s);
|
||||
ASSERT(hn4.host == "host-name_part1.host-name_part2");
|
||||
ASSERT(hn4.service == "1234");
|
||||
ASSERT(hn4.useTLS);
|
||||
ASSERT(hn4.isTLS);
|
||||
|
||||
ASSERT(Hostname::isHostname(hn1s));
|
||||
ASSERT(Hostname::isHostname(hn2s));
|
||||
|
|
|
@ -138,9 +138,9 @@ class Void;
|
|||
struct Hostname {
|
||||
std::string host;
|
||||
std::string service; // decimal port number
|
||||
bool useTLS;
|
||||
bool isTLS;
|
||||
|
||||
Hostname(std::string host, std::string service, bool useTLS) : host(host), service(service), useTLS(useTLS) {}
|
||||
Hostname(std::string host, std::string service, bool isTLS) : host(host), service(service), isTLS(isTLS) {}
|
||||
|
||||
// Allow hostnames in forms like following:
|
||||
// hostname:1234
|
||||
|
@ -155,7 +155,7 @@ struct Hostname {
|
|||
|
||||
static Hostname parse(std::string const& str);
|
||||
|
||||
std::string toString() const { return host + ":" + service + (useTLS ? ":tls" : ""); }
|
||||
std::string toString() const { return host + ":" + service + (isTLS ? ":tls" : ""); }
|
||||
};
|
||||
|
||||
struct IPAddress {
|
||||
|
@ -704,10 +704,10 @@ public:
|
|||
const std::string& service) = 0;
|
||||
|
||||
// Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly
|
||||
// useTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
|
||||
// isTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
|
||||
virtual Future<Reference<IConnection>> connect(const std::string& host,
|
||||
const std::string& service,
|
||||
bool useTLS = false);
|
||||
bool isTLS = false);
|
||||
|
||||
// Listen for connections on the given local address
|
||||
virtual Reference<IListener> listen(NetworkAddress localAddr) = 0;
|
||||
|
|
Loading…
Reference in New Issue