Merge pull request #2698 from etschannen/feature-recruit-delay

The CC waits until no new workers register before starting a bad recruitment
This commit is contained in:
Evan Tschannen 2020-02-20 14:42:37 -08:00 committed by GitHub
commit 8129f74a10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 65 additions and 50 deletions

View File

@ -51,25 +51,23 @@ struct WorkerInfo : NonCopyable {
ReplyPromise<RegisterWorkerReply> reply;
Generation gen;
int reboots;
double lastAvailableTime;
ProcessClass initialClass;
ClusterControllerPriorityInfo priorityInfo;
WorkerDetails details;
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo() : gen(-1), reboots(0), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), lastAvailableTime(now()), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
WorkerInfo( WorkerInfo&& r ) BOOST_NOEXCEPT : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), lastAvailableTime(r.lastAvailableTime), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)) {}
reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)) {}
void operator=( WorkerInfo&& r ) BOOST_NOEXCEPT {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
gen = r.gen;
reboots = r.reboots;
lastAvailableTime = r.lastAvailableTime;
initialClass = r.initialClass;
priorityInfo = r.priorityInfo;
details = std::move(r.details);
@ -395,7 +393,7 @@ public:
if(satelliteFallback || region.satelliteTLogUsableDcsFallback == 0) {
throw no_more_servers();
} else {
if(now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
if(!goodRecruitmentTime.isReady()) {
throw operation_failed();
}
satelliteFallback = true;
@ -641,18 +639,8 @@ public:
result.logRouters.push_back(logRouters[i].interf);
}
if(!remoteStartTime.present()) {
double maxAvailableTime = 0;
for(auto& it : result.remoteTLogs) {
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
}
for(auto& it : result.logRouters) {
maxAvailableTime = std::max(maxAvailableTime, id_worker[it.locality.processId()].lastAvailableTime);
}
remoteStartTime = maxAvailableTime;
}
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
if( !goodRemoteRecruitmentTime.isReady() &&
( ( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredRemoteLogs(), ProcessClass::TLog).betterCount(RoleFitness(remoteLogs, ProcessClass::TLog)) ) ||
( RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount, ProcessClass::LogRouter).betterCount(RoleFitness(logRouters, ProcessClass::LogRouter)) ) ) ) {
throw operation_failed();
@ -729,7 +717,7 @@ public:
}
}
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
if( !goodRecruitmentTime.isReady() &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), ProcessClass::TLog).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
@ -766,7 +754,7 @@ public:
}
throw no_more_servers();
} catch( Error& e ) {
if (now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY && regions[1].dcId != clusterControllerDcId.get()) {
if (!goodRemoteRecruitmentTime.isReady() && regions[1].dcId != clusterControllerDcId.get()) {
throw operation_failed();
}
@ -884,7 +872,7 @@ public:
.detail("DesiredProxies", req.configuration.getDesiredProxies()).detail("ActualProxies", result.proxies.size())
.detail("DesiredResolvers", req.configuration.getDesiredResolvers()).detail("ActualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
if( !goodRecruitmentTime.isReady() &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(bestFitness.proxy) ||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers(), ProcessClass::Resolver).betterCount(bestFitness.resolver) ) ) {
@ -1243,11 +1231,13 @@ public:
ActorCollection ac;
UpdateWorkerList updateWorkerList;
Future<Void> outstandingRequestChecker;
Future<Void> outstandingRemoteRequestChecker;
DBInfo db;
Database cx;
double startTime;
Optional<double> remoteStartTime;
Future<Void> goodRecruitmentTime;
Future<Void> goodRemoteRecruitmentTime;
Version datacenterVersionDifference;
bool versionDifferenceUpdated;
PromiseStream<Future<Void>> addActor;
@ -1271,8 +1261,9 @@ public:
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false),
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), gotProcessClasses(false),
gotFullyRecoveredConfig(false), startTime(now()), goodRecruitmentTime(Never()),
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
@ -1320,7 +1311,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
id_used[cluster->clusterControllerProcessId]++;
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( ( masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.interf.locality.processId() == cluster->clusterControllerProcessId )
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
&& !cluster->goodRecruitmentTime.isReady() ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ));
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
continue;
@ -1594,9 +1585,11 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
try {
wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
while( !self->goodRecruitmentTime.isReady() ) {
wait(self->goodRecruitmentTime);
}
checkOutstandingRecruitmentRequests( self );
checkOutstandingRemoteRecruitmentRequests( self );
checkOutstandingStorageRequests( self );
checkBetterDDOrRK(self);
@ -1606,7 +1599,23 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().read().master.id());
}
} catch( Error &e ) {
if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) {
if(e.code() != error_code_no_more_servers) {
TraceEvent(SevError, "CheckOutstandingError").error(e);
}
}
return Void();
}
ACTOR Future<Void> doCheckOutstandingRemoteRequests( ClusterControllerData* self ) {
try {
wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
while( !self->goodRemoteRecruitmentTime.isReady() ) {
wait(self->goodRemoteRecruitmentTime);
}
checkOutstandingRemoteRecruitmentRequests( self );
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
TraceEvent(SevError, "CheckOutstandingError").error(e);
}
}
@ -1614,10 +1623,13 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
}
void checkOutstandingRequests( ClusterControllerData* self ) {
if( !self->outstandingRequestChecker.isReady() )
return;
if( self->outstandingRemoteRequestChecker.isReady() ) {
self->outstandingRemoteRequestChecker = doCheckOutstandingRemoteRequests(self);
}
self->outstandingRequestChecker = doCheckOutstandingRequests(self);
if( self->outstandingRequestChecker.isReady() ) {
self->outstandingRequestChecker = doCheckOutstandingRequests(self);
}
}
ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Standalone<StringRef>> processID ) {
@ -1625,7 +1637,6 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
auto watcher = cluster->id_worker.find(processID);
ASSERT(watcher != cluster->id_worker.end());
watcher->second.lastAvailableTime = now();
watcher->second.reboots++;
wait( delay( g_network->isSimulated() ? SERVER_KNOBS->SIM_SHUTDOWN_TIMEOUT : SERVER_KNOBS->SHUTDOWN_TIMEOUT ) );
}
@ -1867,7 +1878,7 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
if (e.code() == error_code_no_more_servers && self->goodRecruitmentTime.isReady()) {
self->outstandingRecruitmentRequests.push_back( req );
TraceEvent(SevWarn, "RecruitFromConfigurationNotAvailable", self->id).error(e);
return Void();
@ -1879,7 +1890,7 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
throw; // goodbye, cluster controller
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -1895,7 +1906,7 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
req.reply.send( rep );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && self->remoteStartTime.present() && now() - self->remoteStartTime.get() >= SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {
if (e.code() == error_code_no_more_servers && self->goodRemoteRecruitmentTime.isReady()) {
self->outstandingRemoteRecruitmentRequests.push_back( req );
TraceEvent(SevWarn, "RecruitRemoteFromConfigurationNotAvailable", self->id).error(e);
return Void();
@ -1907,7 +1918,7 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
throw; // goodbye, cluster controller
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -2010,6 +2021,8 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if(info == self->id_worker.end()) {
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerId",w.id()).detail("ProcessId", w.locality.processId()).detail("ZoneId", w.locality.zoneId()).detail("DataHall", w.locality.dataHallId()).detail("PClass", req.processClass.toString()).detail("Workers", self->id_worker.size());
self->goodRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY);
self->goodRemoteRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY);
} else {
TraceEvent("ClusterControllerWorkerAlreadyRegistered", self->id).suppressFor(1.0).detail("WorkerId",w.id()).detail("ProcessId", w.locality.processId()).detail("ZoneId", w.locality.zoneId()).detail("DataHall", w.locality.dataHallId()).detail("PClass", req.processClass.toString()).detail("Workers", self->id_worker.size());
}
@ -2674,7 +2687,7 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
throw;
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
@ -2748,7 +2761,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
throw;
}
}
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
wait( lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}

View File

@ -79,7 +79,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_BYTES, 2<<30 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_BYTES = 0;
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();

View File

@ -82,7 +82,6 @@ public:
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;
double TXS_POPPED_MAX_DELAY;

View File

@ -1222,12 +1222,8 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
//This delay is divided into multiple delays to avoid marking the tlog as degraded because of a single SlowTask
state int loopCount = 0;
while(loopCount < SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT) {
wait(delay(SERVER_KNOBS->TLOG_DEGRADED_DURATION/SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid);
TEST(true); //6.0 TLog degraded
self->degraded->set(true);

View File

@ -1613,12 +1613,8 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
//This delay is divided into multiple delays to avoid marking the tlog as degraded because of a single SlowTask
state int loopCount = 0;
while(loopCount < SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT) {
wait(delay(SERVER_KNOBS->TLOG_DEGRADED_DURATION/SERVER_KNOBS->TLOG_DEGRADED_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
wait(lowPriorityDelay(SERVER_KNOBS->TLOG_DEGRADED_DURATION));
TraceEvent(SevWarnAlways, "TLogDegraded", self->dbgid);
TEST(true); //TLog degraded
self->degraded->set(true);

View File

@ -105,6 +105,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
//GenericActors
init( BUGGIFY_FLOW_LOCK_RELEASE_DELAY, 1.0 );
init( LOW_PRIORITY_DELAY_COUNT, 5 );
//IAsyncFile
init( INCREMENTAL_DELETE_TRUNCATE_AMOUNT, 5e8 ); //500MB

View File

@ -125,6 +125,7 @@ public:
//GenericActors
double BUGGIFY_FLOW_LOCK_RELEASE_DELAY;
int LOW_PRIORITY_DELAY_COUNT;
//IAsyncFile
int64_t INCREMENTAL_DELETE_TRUNCATE_AMOUNT;

View File

@ -83,3 +83,12 @@ ACTOR Future<bool> quorumEqualsTrue( std::vector<Future<bool>> futures, int requ
}
}
}
ACTOR Future<Void> lowPriorityDelay( double waitTime ) {
state int loopCount = 0;
while(loopCount < FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT) {
wait(delay(waitTime/FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT, TaskPriority::Low));
loopCount++;
}
return Void();
}

View File

@ -823,6 +823,7 @@ Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Refer
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
Future<Void> lowPriorityDelay( double const& waitTime );
ACTOR template <class T>
Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {