during recovery, only send the full serverDBInfo to processes that are part of the new generation

This commit is contained in:
Evan Tschannen 2019-12-09 13:17:49 -08:00
parent bcce5968a4
commit 5e5e618da0
1 changed files with 83 additions and 12 deletions

View File

@ -90,6 +90,8 @@ public:
struct DBInfo {
Reference<AsyncVar<ClientDBInfo>> clientInfo;
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> serverInfo;
CachedSerialization<ServerDBInfo> serverInfoMasterOnly;
std::set<NetworkAddress> requiredAddresses;
ProcessIssuesMap workersWithIssues;
std::map<NetworkAddress, double> incompatibleConnections;
AsyncTrigger forceMasterFailure;
@ -1294,13 +1296,12 @@ public:
serversFailed("ServersFailed", clusterControllerMetrics),
serversUnfailed("ServersUnfailed", clusterControllerMetrics)
{
CachedSerialization<ServerDBInfo> newInfoCache = db.serverInfo->get();
auto& serverInfo = newInfoCache.mutate();
auto& serverInfo = db.serverInfoMasterOnly.mutate();
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.masterLifetime.ccID = id;
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
db.serverInfo->set( newInfoCache );
db.serverInfo->set( db.serverInfoMasterOnly );
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true);
}
@ -1355,8 +1356,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
auto cachedInfo = CachedSerialization<ServerDBInfo>();
auto& dbInfo = cachedInfo.mutate();
db->serverInfoMasterOnly = CachedSerialization<ServerDBInfo>();
auto& dbInfo = db->serverInfoMasterOnly.mutate();
dbInfo.master = iMaster;
dbInfo.id = deterministicRandom()->randomUniqueID();
@ -1368,7 +1369,8 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
dbInfo.latencyBandConfig = db->serverInfo->get().read().latencyBandConfig;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( cachedInfo );
db->requiredAddresses.clear();
db->serverInfo->set( db->serverInfoMasterOnly );
state Future<Void> spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
@ -1407,12 +1409,17 @@ ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID k
std::vector<NetworkAddress> incompatiblePeers,
ReplyPromise<CachedSerialization<ServerDBInfo>> reply) {
state Optional<UID> issueID;
state bool useMasterOnly = false;
setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID);
for(auto it : incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
while (db->serverInfo->get().read().id == knownServerInfoID) {
loop {
useMasterOnly = db->serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS && !db->requiredAddresses.count(reply.getEndpoint().getPrimaryAddress());
if((useMasterOnly ? db->serverInfoMasterOnly.read().id : db->serverInfo->get().read().id) != knownServerInfoID) {
break;
}
choose {
when (wait( yieldedFuture(db->serverInfo->onChange()) )) {}
when (wait( delayJittered( 300 ) )) { break; } // The server might be long gone!
@ -1421,7 +1428,7 @@ ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID k
removeIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issueID);
reply.send( db->serverInfo->get() );
reply.send( useMasterOnly ? db->serverInfoMasterOnly : db->serverInfo->get() );
return Void();
}
@ -1446,7 +1453,29 @@ void checkOutstandingRecruitmentRequests( ClusterControllerData* self ) {
for( int i = 0; i < self->outstandingRecruitmentRequests.size(); i++ ) {
RecruitFromConfigurationRequest& req = self->outstandingRecruitmentRequests[i];
try {
req.reply.send( self->findWorkersForConfiguration( req ) );
RecruitFromConfigurationReply rep = self->findWorkersForConfiguration( req );
for(auto& it : rep.oldLogRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.proxies) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.resolvers) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.satelliteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.tLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRecruitmentRequests, i-- );
} catch (Error& e) {
if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) {
@ -1463,7 +1492,17 @@ void checkOutstandingRemoteRecruitmentRequests( ClusterControllerData* self ) {
for( int i = 0; i < self->outstandingRemoteRecruitmentRequests.size(); i++ ) {
RecruitRemoteFromConfigurationRequest& req = self->outstandingRemoteRecruitmentRequests[i];
try {
req.reply.send( self->findRemoteWorkersForConfiguration( req ) );
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
for(auto& it : rep.remoteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.logRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.serverInfo->trigger();
req.reply.send( rep );
swapAndPop( &self->outstandingRemoteRecruitmentRequests, i-- );
} catch (Error& e) {
if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) {
@ -1849,7 +1888,29 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
TEST(true); //ClusterController RecruitTLogsRequest
loop {
try {
req.reply.send( self->findWorkersForConfiguration( req ) );
auto rep = self->findWorkersForConfiguration( req );
for(auto& it : rep.oldLogRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.proxies) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.resolvers) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.satelliteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.tLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
@ -1873,7 +1934,17 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
TEST(true); //ClusterController RecruitTLogsRequest
loop {
try {
req.reply.send( self->findRemoteWorkersForConfiguration( req ) );
RecruitRemoteFromConfigurationReply rep = self->findRemoteWorkersForConfiguration( req );
for(auto& it : rep.remoteTLogs) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
for(auto& it : rep.logRouters) {
self->db.requiredAddresses.insert(it.address());
if( it.tLog.getEndpoint().addresses.secondaryAddress.present() ) self->db.requiredAddresses.insert(it.tLog.getEndpoint().addresses.secondaryAddress.get());
}
self->db.serverInfo->trigger();
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && self->remoteStartTime.present() && now() - self->remoteStartTime.get() >= SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {