fix: servers which opened the database would use the full list of proxies

This commit is contained in:
Evan Tschannen 2020-01-10 12:20:30 -08:00
parent 032797ca5c
commit da1be272cb
3 changed files with 33 additions and 20 deletions

View File

@ -670,6 +670,25 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
}
}
void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::vector<MasterProxyInterface>& lastProxies ) {
if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) {
std::vector<UID> proxyUIDs;
for(auto& proxy : ni.proxies) {
proxyUIDs.push_back(proxy.id());
}
if(proxyUIDs != lastProxyUIDs) {
lastProxyUIDs = proxyUIDs;
lastProxies = ni.proxies;
deterministicRandom()->randomShuffle(lastProxies);
lastProxies.resize(CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS);
for(int i = 0; i < lastProxies.size(); i++) {
TraceEvent("ServerConnectedProxy").detail("Proxy", lastProxies[i].id());
}
}
ni.proxies = lastProxies;
}
}
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, MonitorLeaderInfo info, Standalone<VectorRef<ClientVersionRef>> supportedVersions, Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnFile->getConnectionString();
@ -730,24 +749,8 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
connFile->notifyConnected();
auto& ni = rep.get().mutate();
if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) {
std::vector<UID> proxyUIDs;
for(auto& proxy : ni.proxies) {
proxyUIDs.push_back(proxy.id());
}
if(proxyUIDs != lastProxyUIDs) {
lastProxyUIDs = proxyUIDs;
lastProxies = ni.proxies;
deterministicRandom()->randomShuffle(lastProxies);
lastProxies.resize(CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS);
for(int i = 0; i < lastProxies.size(); i++) {
TraceEvent("ClientConnectedProxy").detail("Proxy", lastProxies[i].id());
}
}
ni.proxies = lastProxies;
}
clientInfo->set( rep.get().read() );
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
clientInfo->set( ni );
successIdx = idx;
} else if(idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));

View File

@ -59,6 +59,8 @@ Future<Void> monitorLeaderForProxies( Value const& key, vector<NetworkAddress> c
Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile, Reference<AsyncVar<ClientDBInfo>> const& clientInfo, Standalone<VectorRef<ClientVersionRef>> const& supportedVersions, Key const& traceLogGroup );
void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::vector<MasterProxyInterface>& lastProxies );
#pragma region Implementation
Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Value>> const& outSerializedLeaderInfo );

View File

@ -69,15 +69,23 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
state std::vector<MasterProxyInterface> lastProxies;
loop {
info->set( db->get().client );
ClientDBInfo ni = db->get().client;
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
info->set( ni );
wait( db->onChange() );
}
}
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
state std::vector<MasterProxyInterface> lastProxies;
loop {
info->set( db->get().read().client );
ClientDBInfo ni = db->get().read().client;
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
info->set( ni );
wait( db->onChange() );
}
}