fix: we must commit to the number of log routers we are going to use when recruiting the primary, because it determines the number of log router tags that will be attached to mutations

This commit is contained in:
Evan Tschannen 2018-03-06 16:31:21 -08:00
parent 1194e3a361
commit 8c88041608
4 changed files with 17 additions and 8 deletions

View File

@ -511,7 +511,7 @@ public:
result.remoteTLogs.push_back(remoteLogs[i].first);
}
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.configuration.getDesiredLogRouters(), req.configuration, id_used );
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.logRouterCount, req.configuration, id_used );
for(int i = 0; i < logRouters.size(); i++) {
result.logRouters.push_back(logRouters[i].first);
}
@ -594,6 +594,9 @@ public:
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
auto logRouters = getWorkersForRoleInDatacenter( remoteDcId, ProcessClass::LogRouter, req.configuration.getDesiredLogRouters(), req.configuration, id_used );
result.logRouterCount = logRouters.size() ? logRouters.size() : 1;
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( RoleFitness(tlogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(satelliteLogs, ProcessClass::TLog) > RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId)) ) ||
@ -642,6 +645,7 @@ public:
}
} else {
RecruitFromConfigurationReply result;
result.logRouterCount = 0;
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
id_used[clusterControllerProcessId]++;

View File

@ -85,26 +85,27 @@ struct RecruitFromConfigurationReply {
vector<WorkerInterface> proxies;
vector<WorkerInterface> resolvers;
vector<WorkerInterface> storageServers;
int logRouterCount;
Optional<Key> dcId;
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & satelliteTLogs & proxies & resolvers & storageServers & dcId;
ar & tLogs & satelliteTLogs & proxies & resolvers & storageServers & dcId & logRouterCount;
}
};
struct RecruitRemoteFromConfigurationRequest {
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
explicit RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId)
: configuration(configuration), dcId(dcId) {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & configuration & dcId & reply;
ar & configuration & dcId & logRouterCount & reply;
}
};

View File

@ -1007,6 +1007,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
{
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
if(remoteWorkers.logRouters.size() != self->minRouters) {
TraceEvent("RemoteLogRecruitment_MismatchedLogRouters").detail("minRouters", self->minRouters).detail("workers", remoteWorkers.logRouters.size());
throw master_recovery_failed();
}
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
@ -1078,7 +1083,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
recoveryComplete.push_back( transformErrors( throwErrorOr( logSet->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
self->remoteRecoveryComplete = waitForAll(recoveryComplete);
logSet->logRouters.resize(remoteWorkers.remoteTLogs.size());
self->tLogs.push_back( logSet );
TraceEvent("RemoteLogRecruitment_CompletingRecovery");
return Void();
@ -1114,7 +1118,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(configuration.remoteTLogReplicationFactor > 0) {
logSystem->minRouters = configuration.getDesiredLogRouters();
logSystem->minRouters = recr.logRouterCount;
logSystem->expectedLogSets++;
} else {
logSystem->minRouters = 0;

View File

@ -309,7 +309,7 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
self->dcId_locality[remoteDcId] = loc;
}
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId ) ) );
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.logRouterCount ) ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId] ) );
self->logSystem = newLogSystem;