Merge pull request #209 from cie/fix-double-recoveries

Fix double recoveries
This commit is contained in:
Evan Tschannen 2017-11-16 17:16:39 -08:00 committed by GitHub Enterprise
commit 7211a397b0
2 changed files with 77 additions and 69 deletions

View File

@ -126,17 +126,8 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
if(Optional<StringRef>(m.param2) != txnStateStore->readValue(m.param1).get().cast_to<StringRef>()) { // FIXME: Make this check more specific, here or by reading configuration whenever there is a change
auto t = txnStateStore->readValue(m.param1).get();
if (logSystem && m.param1.startsWith( excludedServersPrefix )) {
// If one of our existing tLogs is now excluded, we have to die and recover
auto addr = decodeExcludedServersKey(m.param1);
for( auto tl : logSystem->getLogSystemConfig().tLogs ) {
if(!tl.present() || addr.excludes(tl.interf().commit.getEndpoint().address)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL).detail("addr", addr.toString());
if(confChange) *confChange = true;
}
}
} else if(m.param1 != excludedServersVersionKey) {
if(!m.param1.startsWith( excludedServersPrefix ) && m.param1 != excludedServersVersionKey) {
auto t = txnStateStore->readValue(m.param1).get();
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()).detail("PrevValue", t.present() ? printable(t.get()) : "(none)").detail("toCommit", toCommit!=NULL);
if(confChange) *confChange = true;
}

View File

@ -262,7 +262,7 @@ public:
return results;
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogs( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false )
{
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
@ -568,41 +568,42 @@ public:
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
struct AcrossDatacenterFitness {
struct TLogFitness {
ProcessClass::Fitness tlogFit;
int tlogCount;
AcrossDatacenterFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
TLogFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
AcrossDatacenterFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
TLogFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
AcrossDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
std::set<Optional<Standalone<StringRef>>> dcs;
TLogFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
// std::set<Optional<Standalone<StringRef>>> dcs;
tlogFit = ProcessClass::BestFit;
for(auto it : tlogs) {
dcs.insert(it.first.locality.dcId());
// dcs.insert(it.first.locality.dcId());
tlogFit = std::max(tlogFit, it.second.machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
AcrossDatacenterFitness( vector<OptionalInterface<TLogInterface>> tlogs, std::vector<ProcessClass> processClasses ) {
std::set<Optional<Standalone<StringRef>>> dcs;
tlogFit = ProcessClass::BestFit;
for(int i = 0; i < tlogs.size(); i++) {
ASSERT(tlogs[i].present());
dcs.insert(tlogs[i].interf().locality.dcId());
tlogFit = std::max(tlogFit, processClasses[i].machineClassFitness( ProcessClass::TLog ));
}
tlogCount = tlogs.size();
}
// TLogFitness( vector<OptionalInterface<TLogInterface>> tlogs, std::vector<ProcessClass> processClasses ) {
// std::set<Optional<Standalone<StringRef>>> dcs;
// tlogFit = ProcessClass::BestFit;
// for(int i = 0; i < tlogs.size(); i++) {
// ASSERT(tlogs[i].present());
// dcs.insert(tlogs[i].interf().locality.dcId());
// tlogFit = std::max(tlogFit, processClasses[i].machineClassFitness( ProcessClass::TLog ));
// }
// tlogCount = tlogs.size();
// }
bool operator < (AcrossDatacenterFitness const& r) const {
if(tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
bool operator < (TLogFitness const& r) const {
if (tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
return tlogCount > r.tlogCount;
}
bool operator == (AcrossDatacenterFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
bool operator == (TLogFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -625,7 +626,7 @@ public:
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
auto tlogs = getWorkersForTlogs( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++)
result.tLogs.push_back(tlogs[i].first);
@ -673,7 +674,7 @@ public:
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
( TLogFitness(tlogs) > TLogFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -688,15 +689,46 @@ public:
return false;
}
std::map< Optional<Standalone<StringRef>>, int> id_used;
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
if(masterWorker == id_worker.end())
if(masterWorker == id_worker.end()) {
return false;
}
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
// Get tlog processes
std::vector<std::pair<WorkerInterface, ProcessClass>> tlogs;
for( auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
if ( tlogWorker->second.isExcluded )
return true;
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
// Get proxy classes
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
if ( proxyWorker->second.isExcluded )
return true;
proxyClasses.push_back(proxyWorker->second.processClass);
}
// Get resolver classes
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
if ( resolverWorker->second.isExcluded )
return true;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we still need master for recovery.
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
@ -708,42 +740,26 @@ public:
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
if(oldMasterFit < newMasterFit) return false;
if ( oldMasterFit < newMasterFit )
return false;
if ( oldMasterFit > newMasterFit && oldMasterFit == ProcessClass::ExcludeFit )
return true;
std::vector<ProcessClass> tlogProcessClasses;
for(auto& it : dbi.logSystemConfig.tLogs ) {
auto tlogWorker = id_worker.find(it.interf().locality.processId());
if ( tlogWorker == id_worker.end() )
return false;
tlogProcessClasses.push_back(tlogWorker->second.processClass);
}
AcrossDatacenterFitness oldAcrossFit(dbi.logSystemConfig.tLogs, tlogProcessClasses);
AcrossDatacenterFitness newAcrossFit(getWorkersForTlogsAcrossDatacenters(db.config, id_used, true));
// Check tLog fitness
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
id_used[masterProcessId]++;
if(oldAcrossFit < newAcrossFit) return false;
TLogFitness oldTLogFit(tlogs);
TLogFitness newTLotFit(getWorkersForTlogs(db.config, id_used, true));
if(oldTLogFit < newTLotFit) return false;
std::vector<ProcessClass> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
proxyClasses.push_back(proxyWorker->second.processClass);
}
std::vector<ProcessClass> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
resolverClasses.push_back(resolverWorker->second.processClass);
}
// Check proxy/resolver fitness
InDatacenterFitness oldInFit(dbi.client.proxies, dbi.resolvers, proxyClasses, resolverClasses);
auto datacenters = getDatacenters( db.config, true );
InDatacenterFitness newInFit;
for(auto dcId : datacenters) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, db.config, used );
@ -761,16 +777,17 @@ public:
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
if(oldMasterFit > newMasterFit || oldTLogFit > newTLotFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
.detail("oldAcrossFitT", oldAcrossFit.tlogFit).detail("newAcrossFitT", newAcrossFit.tlogFit)
.detail("oldTLogFitC", oldTLogFit.tlogCount).detail("newTLotFitC", newTLotFit.tlogCount)
.detail("oldTLogFitT", oldTLogFit.tlogFit).detail("newTLotFitT", newTLotFit.tlogFit)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)
.detail("oldInFitRC", oldInFit.resolverCount).detail("newInFitRC", newInFit.resolverCount);
return true;
}
return false;
}