fix: tlogFitness did not consider it better to have one tlog of a better fitness

fix: checkStable was not used in all places in better master exists
fix: we need to call checkOutstanding on worker registration in all cases
fix: in case persistentData is keyValueStoreMemory, we need to make sure it is fully recovered before writing to it
This commit is contained in:
Evan Tschannen 2018-01-04 11:33:02 -08:00
parent 96c479dc71
commit f2c4beed9f
2 changed files with 26 additions and 13 deletions

View File

@ -569,28 +569,33 @@ public:
};
struct TLogFitness {
ProcessClass::Fitness tlogFit;
ProcessClass::Fitness bestFit;
ProcessClass::Fitness worstFit;
int tlogCount;
TLogFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
TLogFitness( ProcessClass::Fitness bestFit, ProcessClass::Fitness worstFit, int tlogCount) : bestFit(bestFit), worstFit(worstFit), tlogCount(tlogCount) {}
TLogFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
TLogFitness() : bestFit( ProcessClass::NeverAssign ), worstFit( ProcessClass::NeverAssign ), tlogCount(0) {}
TLogFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
tlogFit = ProcessClass::BestFit;
worstFit = ProcessClass::BestFit;
bestFit = ProcessClass::NeverAssign;
for(auto it : tlogs) {
tlogFit = std::max(tlogFit, it.second.machineClassFitness( ProcessClass::TLog ));
auto thisFit = it.second.machineClassFitness( ProcessClass::TLog );
worstFit = std::max(worstFit, thisFit);
bestFit = std::min(bestFit, thisFit);
}
tlogCount = tlogs.size();
}
bool operator < (TLogFitness const& r) const {
if (tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
if (worstFit != r.worstFit) return worstFit < r.worstFit;
if (bestFit != r.bestFit) return bestFit < r.bestFit;
return tlogCount > r.tlogCount;
}
bool operator == (TLogFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
bool operator == (TLogFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && tlogCount == r.tlogCount; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -661,7 +666,7 @@ public:
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -749,8 +754,8 @@ public:
InDatacenterFitness newInFit;
for(auto dcId : datacenters) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, db.config, used );
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used, true );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, db.config, used, true );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, db.config.getDesiredProxies()-1, db.config, used, first_proxy, true );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config.getDesiredResolvers()-1, db.config, used, first_resolver, true );
@ -767,7 +772,8 @@ public:
if(oldMasterFit > newMasterFit || oldTLogFit > newTLotFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldTLogFitC", oldTLogFit.tlogCount).detail("newTLotFitC", newTLotFit.tlogCount)
.detail("oldTLogFitT", oldTLogFit.tlogFit).detail("newTLotFitT", newTLotFit.tlogFit)
.detail("oldTLogWorstFitT", oldTLogFit.worstFit).detail("newTLotWorstFitT", newTLotFit.worstFit)
.detail("oldTLogBestFitT", oldTLogFit.bestFit).detail("newTLotBestFitT", newTLotFit.bestFit)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)
@ -1390,7 +1396,6 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, req.isExcluded );
checkOutstandingRequests( self );
return;
}
@ -1408,6 +1413,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
info->second.interf = w;
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
checkOutstandingRequests( self );
return;
}

View File

@ -1284,6 +1284,13 @@ ACTOR Future<Void> checkEmptyQueue(TLogData* self) {
}
}
ACTOR Future<Void> checkRecovered(TLogData* self) {
TraceEvent("TLogCheckRecoveredBegin", self->dbgid);
Optional<Value> v = wait( self->persistentData->readValue(StringRef()) );
TraceEvent("TLogCheckRecoveredEnd", self->dbgid);
return Void();
}
ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality, Promise<Void> oldLog, Promise<Void> recovered, PromiseStream<InitializeTLogRequest> tlogRequests ) {
state double startt = now();
state Reference<LogData> logData;
@ -1740,7 +1747,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(restoreFromDisk) {
Void _ = wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
} else {
Void _ = wait( checkEmptyQueue(&self) );
Void _ = wait( checkEmptyQueue(&self) && checkRecovered(&self) );
}
if(recovered.canBeSet()) recovered.send(Void());