The master will tell the cluster controller if it is going to take a long time to recruit new logs in its DC; the cluster controller can determine if the other DC would be better and recruit there.

The cluster controller will not switch to the other data center if remote logs are too far behind.
We will not recruit in DCs with negative priority.
This commit is contained in:
Evan Tschannen 2018-06-13 18:14:14 -07:00
parent cad435499e
commit 889889323e
10 changed files with 179 additions and 51 deletions

View File

@ -61,7 +61,7 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
std::string idStr;
dc.get("id", idStr);
info.dcId = idStr;
dc.get("priority", info.priority);
dc.tryGet("priority", info.priority);
dc.tryGet("satellite_logs", info.satelliteDesiredTLogCount);
std::string satelliteReplication;
if(dc.tryGet("satellite_redundancy_mode", satelliteReplication)) {

View File

@ -89,11 +89,12 @@ public:
std::map<NetworkAddress, std::string> traceLogGroupMap;
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
bool recoveryStalled;
DatabaseConfiguration config; // Asynchronously updated via master registration
DatabaseConfiguration fullyRecoveredConfig;
Database db;
DBInfo() : masterRegistrationCount(0),
DBInfo() : masterRegistrationCount(0), recoveryStalled(false),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo( LiteralStringRef("DB") ) ) ),
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskDefaultEndpoint, true ) ) // SOMEDAY: Locality!
@ -556,40 +557,50 @@ public:
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
if(req.configuration.regions.size() > 1) {
std::vector<RegionInfo> regions = req.configuration.regions;
if(regions[0].priority == regions[1].priority && clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
std::swap(regions[0], regions[1]);
}
if(regions[0].priority < 0) {
throw no_more_servers();
}
bool setPrimaryDesired = false;
try {
auto reply = findWorkersForConfiguration(req, req.configuration.regions[0].dcId);
auto reply = findWorkersForConfiguration(req, regions[0].dcId);
setPrimaryDesired = true;
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[0].dcId);
dcPriority.push_back(req.configuration.regions[1].dcId);
dcPriority.push_back(regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
desiredDcIds.set(dcPriority);
if(reply.isError()) {
throw reply.getError();
} else if(clusterControllerDcId.present() && req.configuration.regions[0].dcId == clusterControllerDcId.get()) {
} else if(clusterControllerDcId.present() && regions[0].dcId == clusterControllerDcId.get()) {
return reply.get();
}
throw no_more_servers();
} catch( Error& e ) {
if (e.code() != error_code_no_more_servers) {
if (e.code() != error_code_no_more_servers || regions[1].priority < 0 || now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
throw;
}
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
auto reply = findWorkersForConfiguration(req, req.configuration.regions[1].dcId);
auto reply = findWorkersForConfiguration(req, regions[1].dcId);
if(!setPrimaryDesired) {
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[1].dcId);
dcPriority.push_back(req.configuration.regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
dcPriority.push_back(regions[0].dcId);
desiredDcIds.set(dcPriority);
}
if(reply.isError()) {
throw reply.getError();
} else if(clusterControllerDcId.present() && req.configuration.regions[1].dcId == clusterControllerDcId.get()) {
} else if(clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
return reply.get();
}
throw;
}
} else if(req.configuration.regions.size() == 1) {
if(req.configuration.regions[0].priority < 0) {
throw no_more_servers();
}
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[0].dcId);
desiredDcIds.set(dcPriority);
@ -690,36 +701,51 @@ public:
}
}
void checkPrimaryDC() {
if(db.config.regions.size() > 1 && clusterControllerDcId.present() && db.config.regions[0].dcId != clusterControllerDcId.get()) {
try {
std::map< Optional<Standalone<StringRef>>, int> id_used;
getWorkerForRoleInDatacenter(db.config.regions[0].dcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
getWorkerForRoleInDatacenter(db.config.regions[0].dcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
void checkRegions(const std::vector<RegionInfo>& regions) {
if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) {
return;
}
try {
std::map< Optional<Standalone<StringRef>>, int> id_used;
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(db.config.regions[0].dcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(regions[0].dcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
if(db.config.regions[0].satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto &s : db.config.regions[0].satellites) {
satelliteDCs.insert(s.dcId);
}
getWorkersForTlogs(db.config, db.config.regions[0].satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(db.config.regions[0].dcId), db.config.regions[0].satelliteTLogPolicy, id_used, true, satelliteDCs);
if(regions[0].satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto &s : regions[0].satellites) {
satelliteDCs.insert(s.dcId);
}
getWorkersForTlogs(db.config, regions[0].satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(regions[0].dcId), regions[0].satelliteTLogPolicy, id_used, true, satelliteDCs);
}
getWorkerForRoleInDatacenter( db.config.regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( db.config.regions[0].dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
vector<Optional<Key>> dcPriority;
dcPriority.push_back(db.config.regions[0].dcId);
dcPriority.push_back(db.config.regions[1].dcId);
desiredDcIds.set(dcPriority);
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
vector<Optional<Key>> dcPriority;
dcPriority.push_back(regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
desiredDcIds.set(dcPriority);
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}
}
void checkRecoveryStalled() {
if(db.serverInfo->get().recoveryState < RecoveryState::RECOVERY_TRANSACTION && db.recoveryStalled ) {
if(db.config.regions.size() > 1 && clusterControllerDcId.present()) {
auto regions = db.config.regions;
if(clusterControllerDcId.get() == regions[0].dcId) {
std::swap(regions[0], regions[1]);
}
ASSERT(clusterControllerDcId.get() == regions[1].dcId);
checkRegions(regions);
}
}
}
@ -732,7 +758,10 @@ public:
return false;
}
checkPrimaryDC();
if(db.config.regions.size() > 1 && clusterControllerDcId.present() && db.config.regions[0].priority > db.config.regions[1].priority &&
db.config.regions[0].dcId != clusterControllerDcId.get() && versionDifferenceUpdated && datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE) {
checkRegions(db.config.regions);
}
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
@ -917,9 +946,11 @@ public:
DBInfo db;
Database cx;
double startTime;
Version datacenterVersionDifference;
bool versionDifferenceUpdated;
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now())
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0), versionDifferenceUpdated(false)
{
auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID();
@ -987,6 +1018,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
iMaster = newMaster.get();
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
db->forceMasterFailure = Promise<Void>();
auto dbInfo = ServerDBInfo( LiteralStringRef("DB") );
@ -1182,6 +1214,8 @@ ACTOR Future<Void> doCheckOutstandingMasterRequests( ClusterControllerData* self
}
void checkOutstandingMasterRequests( ClusterControllerData* self ) {
self->checkRecoveryStalled();
if( !self->betterMasterExistsChecker.isReady() )
return;
@ -1465,6 +1499,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
db->masterRegistrationCount = req.registrationCount;
db->recoveryStalled = req.recoveryStalled;
if ( req.configuration.present() ) {
db->config = req.configuration.get();
@ -1893,6 +1928,65 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
}
}
ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *self ) {
loop {
self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED && self->db.config.remoteTLogReplicationFactor == 0) {
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0;
Void _ = wait(self->db.serverInfo->onChange());
continue;
}
state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::REMOTE_RECOVERED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.hasBestPolicy) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
primaryLog = tLog.interf();
break;
}
}
}
if(!logSet.isLocal) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
remoteLog = tLog.interf();
break;
}
}
}
}
}
if(!primaryLog.present() || !remoteLog.present()) {
Void _ = wait(self->db.serverInfo->onChange());
continue;
}
state Future<Void> onChange = self->db.serverInfo->onChange();
loop {
state Future<TLogQueuingMetricsReply> primaryMetrics = primaryLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() );
state Future<TLogQueuingMetricsReply> remoteMetrics = remoteLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() );
Void _ = wait( ( success(primaryMetrics) && success(remoteMetrics) ) || onChange );
if(onChange.isReady()) {
break;
}
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = primaryMetrics.get().v - remoteMetrics.get().v;
Void _ = wait( delay(SERVER_KNOBS->VERSION_LAG_METRIC_INTERVAL) || onChange );
if(onChange.isReady()) {
break;
}
}
}
}
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators ) {
state ClusterControllerData self( interf );
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
@ -1910,6 +2004,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
addActor.send( monitorClientTxnInfoConfigs(&self.db) );
addActor.send( updatedChangingDatacenters(&self) );
addActor.send( updatedChangedDatacenters(&self) );
addActor.send( updateDatacenterVersionDifference(&self) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {

View File

@ -202,6 +202,7 @@ struct RegisterMasterRequest {
Optional<DatabaseConfiguration> configuration;
vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
@ -210,7 +211,7 @@ struct RegisterMasterRequest {
template <class Ar>
void serialize( Ar& ar ) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A200040001LL );
ar & dbName & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & reply;
ar & dbName & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & recoveryStalled & reply;
}
};

View File

@ -55,8 +55,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
// Versions
init( MAX_VERSIONS_IN_FLIGHT, 100000000 );
init( VERSIONS_PER_SECOND, 1000000 );
init( VERSIONS_PER_SECOND, 1e6 );
init( MAX_VERSIONS_IN_FLIGHT, 100 * VERSIONS_PER_SECOND );
init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND;
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 1 * VERSIONS_PER_SECOND);
init( MAX_COMMIT_BATCH_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 2.0; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
@ -252,6 +252,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MIN_BALANCE_DIFFERENCE, 10000 );
init( SECONDS_BEFORE_NO_FAILURE_DELAY, 8 * 3600 );
init( MAX_TXS_SEND_MEMORY, 1e7 ); if( randomize && BUGGIFY ) MAX_TXS_SEND_MEMORY = 1e5;
init( MAX_RECOVERY_VERSIONS, 200 * VERSIONS_PER_SECOND ); if( randomize && BUGGIFY ) MAX_RECOVERY_VERSIONS = VERSIONS_PER_SECOND;
init( MAX_RECOVERY_TIME, 20.0 ); if( randomize && BUGGIFY ) MAX_RECOVERY_TIME = 1.0;
// Resolver
init( SAMPLE_OFFSET_PER_KEY, 100 );
@ -272,6 +274,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_TLOG_FITNESS, ProcessClass::GoodFit );

View File

@ -60,11 +60,10 @@ public:
int64_t MAX_QUEUE_COMMIT_BYTES;
// Versions
int VERSIONS_PER_SECOND;
int MAX_VERSIONS_IN_FLIGHT;
int MAX_READ_TRANSACTION_LIFE_VERSIONS;
int MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
int VERSIONS_PER_SECOND;
double MAX_COMMIT_BATCH_INTERVAL; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
// Data distribution queue
@ -195,6 +194,8 @@ public:
int64_t MIN_BALANCE_DIFFERENCE;
double SECONDS_BEFORE_NO_FAILURE_DELAY;
int64_t MAX_TXS_SEND_MEMORY;
int64_t MAX_RECOVERY_VERSIONS;
double MAX_RECOVERY_TIME;
// Resolver
int64_t SAMPLE_OFFSET_PER_KEY;
@ -215,6 +216,8 @@ public:
double WORKER_FAILURE_TIME;
double CHECK_BETTER_MASTER_INTERVAL;
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
double VERSION_LAG_METRIC_INTERVAL;
int64_t MAX_VERSION_DIFFERENCE;
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -490,7 +490,8 @@ struct ILogSystem {
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) = 0;
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config,
LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) = 0;
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState

View File

@ -762,7 +762,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {
StatusObject primaryObj;
primaryObj["id"] = "0";
primaryObj["priority"] = 0;
primaryObj["priority"] = 2;
StatusObject remoteObj;
remoteObj["id"] = "1";

View File

@ -924,10 +924,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return waitForAll(lockResults);
}
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) {
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) {
// Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch
// is only provisional until the caller updates the coordinated DBCoreState
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags );
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags, recruitmentStalled );
}
virtual LogSystemConfig getLogSystemConfig() {
@ -1595,7 +1595,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Void();
}
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags ) {
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount,
int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags, Reference<AsyncVar<bool>> recruitmentStalled ) {
state double startTime = now();
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
logSystem->logSystemType = 2;
@ -1651,6 +1652,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(oldLogSystem->lockResults[lockNum].isCurrent && oldLogSystem->lockResults[lockNum].logSet->isLocal) {
break;
}
state Future<Void> stalledAfter = setAfter(recruitmentStalled, SERVER_KNOBS->MAX_RECOVERY_TIME, true);
loop {
auto versions = TagPartitionedLogSystem::getDurableVersion(logSystem->dbgid, oldLogSystem->lockResults[lockNum]);
if(versions.present()) {
@ -1659,6 +1661,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Void _ = wait( TagPartitionedLogSystem::getDurableVersionChanged(oldLogSystem->lockResults[lockNum]) );
}
stalledAfter.cancel();
break;
}
lockNum++;
@ -1674,6 +1677,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("NewEpochStartVersion", oldLogSystem->getDebugID()).detail("StartVersion", logSystem->tLogs[0]->startVersion).detail("EpochEnd", oldLogSystem->knownCommittedVersion + 1).detail("Locality", primaryLocality).detail("OldLogRouterTags", oldLogSystem->logRouterTags);
if(oldLogSystem->logRouterTags > 0 || logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, logSystem->tLogs[0]->hasBestPolicy, false);
if(oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion > SERVER_KNOBS->MAX_RECOVERY_VERSIONS) {
//make sure we can recover in the other DC.
for(auto& lockResult : oldLogSystem->lockResults) {
if(lockResult.logSet->locality == remoteLocality) {
if( TagPartitionedLogSystem::getDurableVersion(logSystem->dbgid, lockResult).present() ) {
recruitmentStalled->set(true);
}
}
}
}
} else {
oldLogSystem->logSystemConfigChanged.trigger();
}

View File

@ -219,6 +219,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::set<UID> resolverNeedingChanges;
PromiseStream<Future<Void>> addActor;
Reference<AsyncVar<bool>> recruitmentStalled;
MasterData(
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
@ -246,7 +247,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
txnStateStore(0),
memoryLimit(2e9),
addActor(addActor),
hasConfiguration(false)
hasConfiguration(false),
recruitmentStalled( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) )
{
}
~MasterData() { if(txnStateStore) txnStateStore->close(); }
@ -311,10 +313,10 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() ) ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
} else {
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
}
return Void();
@ -442,6 +444,7 @@ Future<Void> sendMasterRegistration( MasterData* self, LogSystemConfig const& lo
masterReq.registrationCount = ++self->registrationCount;
masterReq.priorCommittedLogServers = priorCommittedLogServers;
masterReq.recoveryState = self->recoveryState;
masterReq.recoveryStalled = self->recruitmentStalled->get();
return brokenPromiseToNever( self->clusterController.registerMaster.getReply( masterReq ) );
}
@ -704,7 +707,7 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
ACTOR Future<Void> triggerUpdates( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
loop {
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() );
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() || self->recruitmentStalled->onChange() );
if(self->cstate.fullyRecovered.isSet())
return Void();

View File

@ -739,6 +739,13 @@ Future<Void> delayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>>
}
}
ACTOR template <class T>
Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
Void _ = wait( delay( time ) );
var->set( val );
return Void();
}
Future<bool> allTrue( const std::vector<Future<bool>>& all );
Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );