added a fallback satellite configuration, so that we can use two satellites if available, but do not have to failover to the remote datacenter if one satellite is down
This commit is contained in:
parent
8caa6eaecf
commit
a288d5b9a9
|
@ -102,11 +102,19 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
|
|||
info.satelliteTLogUsableDcs = 2;
|
||||
info.satelliteTLogWriteAntiQuorum = 0;
|
||||
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))));
|
||||
info.satelliteTLogReplicationFactorFallback = 2;
|
||||
info.satelliteTLogUsableDcsFallback = 1;
|
||||
info.satelliteTLogWriteAntiQuorumFallback = 0;
|
||||
info.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
} else if(satelliteReplication == "two_satellite_fast") {
|
||||
info.satelliteTLogReplicationFactor = 4;
|
||||
info.satelliteTLogUsableDcs = 2;
|
||||
info.satelliteTLogWriteAntiQuorum = 2;
|
||||
info.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))));
|
||||
info.satelliteTLogReplicationFactorFallback = 2;
|
||||
info.satelliteTLogUsableDcsFallback = 1;
|
||||
info.satelliteTLogWriteAntiQuorumFallback = 0;
|
||||
info.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
} else {
|
||||
throw invalid_option();
|
||||
}
|
||||
|
@ -114,6 +122,9 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
|
|||
dc.tryGet("satellite_log_replicas", info.satelliteTLogReplicationFactor);
|
||||
dc.tryGet("satellite_usable_dcs", info.satelliteTLogUsableDcs);
|
||||
dc.tryGet("satellite_anti_quorum", info.satelliteTLogWriteAntiQuorum);
|
||||
dc.tryGet("satellite_log_replicas_fallback", info.satelliteTLogReplicationFactorFallback);
|
||||
dc.tryGet("satellite_usable_dcs_fallback", info.satelliteTLogUsableDcsFallback);
|
||||
dc.tryGet("satellite_anti_quorum_fallback", info.satelliteTLogWriteAntiQuorumFallback);
|
||||
regions->push_back(info);
|
||||
}
|
||||
std::sort(regions->begin(), regions->end(), RegionInfo::sort_by_priority() );
|
||||
|
@ -137,6 +148,9 @@ void DatabaseConfiguration::setDefaultReplicationPolicy() {
|
|||
if(r.satelliteTLogReplicationFactor > 0 && !r.satelliteTLogPolicy) {
|
||||
r.satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(r.satelliteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
}
|
||||
if(r.satelliteTLogReplicationFactorFallback > 0 && !r.satelliteTLogPolicyFallback) {
|
||||
r.satelliteTLogPolicyFallback = IRepPolicyRef(new PolicyAcross(r.satelliteTLogReplicationFactorFallback, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,7 +186,8 @@ bool DatabaseConfiguration::isValid() const {
|
|||
r.satelliteTLogReplicationFactor >= 0 &&
|
||||
r.satelliteTLogWriteAntiQuorum >= 0 &&
|
||||
r.satelliteTLogUsableDcs >= 1 &&
|
||||
( r.satelliteTLogReplicationFactor == 0 || ( r.satelliteTLogPolicy && r.satellites.size() ) ) ) ) {
|
||||
( r.satelliteTLogReplicationFactor == 0 || ( r.satelliteTLogPolicy && r.satellites.size() ) ) &&
|
||||
( r.satelliteTLogUsableDcsFallback == 0 || ( r.satelliteTLogReplicationFactor > 0 && r.satelliteTLogReplicationFactorFallback > 0 ) ) ) ) {
|
||||
return false;
|
||||
}
|
||||
dcIds.insert(r.dcId);
|
||||
|
@ -252,21 +267,25 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
dcObj["priority"] = r.priority;
|
||||
dcArr.push_back(dcObj);
|
||||
|
||||
if(r.satelliteTLogReplicationFactor == 1 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
|
||||
if(r.satelliteTLogReplicationFactor == 1 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
|
||||
regionObj["satellite_redundancy_mode"] = "one_satellite_single";
|
||||
} else if(r.satelliteTLogReplicationFactor == 2 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
|
||||
} else if(r.satelliteTLogReplicationFactor == 2 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
|
||||
regionObj["satellite_redundancy_mode"] = "one_satellite_double";
|
||||
} else if(r.satelliteTLogReplicationFactor == 3 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
|
||||
} else if(r.satelliteTLogReplicationFactor == 3 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 0) {
|
||||
regionObj["satellite_redundancy_mode"] = "one_satellite_triple";
|
||||
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 0) {
|
||||
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 0 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
|
||||
regionObj["satellite_redundancy_mode"] = "two_satellite_safe";
|
||||
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 2) {
|
||||
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 2 && r.satelliteTLogUsableDcsFallback == 1 && r.satelliteTLogReplicationFactorFallback == 2 && r.satelliteTLogWriteAntiQuorumFallback == 0) {
|
||||
regionObj["satellite_redundancy_mode"] = "two_satellite_fast";
|
||||
} else if(r.satelliteTLogReplicationFactor != 0) {
|
||||
regionObj["satellite_log_replicas"] = r.satelliteTLogReplicationFactor;
|
||||
regionObj["satellite_usable_dcs"] = r.satelliteTLogUsableDcs;
|
||||
regionObj["satellite_anti_quorum"] = r.satelliteTLogWriteAntiQuorum;
|
||||
if(r.satelliteTLogPolicy) regionObj["satellite_log_policy"] = r.satelliteTLogPolicy->info();
|
||||
regionObj["satellite_log_replicas_fallback"] = r.satelliteTLogReplicationFactorFallback;
|
||||
regionObj["satellite_usable_dcs_fallback"] = r.satelliteTLogUsableDcsFallback;
|
||||
regionObj["satellite_anti_quorum_fallback"] = r.satelliteTLogWriteAntiQuorumFallback;
|
||||
if(r.satelliteTLogPolicyFallback) regionObj["satellite_log_policy_fallback"] = r.satelliteTLogPolicyFallback->info();
|
||||
}
|
||||
|
||||
if( r.satelliteDesiredTLogCount != -1 ) {
|
||||
|
|
|
@ -55,9 +55,15 @@ struct RegionInfo {
|
|||
int32_t satelliteTLogWriteAntiQuorum;
|
||||
int32_t satelliteTLogUsableDcs;
|
||||
|
||||
IRepPolicyRef satelliteTLogPolicyFallback;
|
||||
int32_t satelliteTLogReplicationFactorFallback;
|
||||
int32_t satelliteTLogWriteAntiQuorumFallback;
|
||||
int32_t satelliteTLogUsableDcsFallback;
|
||||
|
||||
std::vector<SatelliteInfo> satellites;
|
||||
|
||||
RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(1) {}
|
||||
RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(1),
|
||||
satelliteTLogReplicationFactorFallback(0), satelliteTLogWriteAntiQuorumFallback(0), satelliteTLogUsableDcsFallback(0) {}
|
||||
|
||||
struct sort_by_priority {
|
||||
bool operator ()(RegionInfo const&a, RegionInfo const& b) const { return a.priority > b.priority; }
|
||||
|
@ -65,7 +71,8 @@ struct RegionInfo {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ar & dcId & priority & satelliteTLogPolicy & satelliteDesiredTLogCount & satelliteTLogReplicationFactor & satelliteTLogWriteAntiQuorum & satelliteTLogUsableDcs & satellites;
|
||||
ar & dcId & priority & satelliteTLogPolicy & satelliteDesiredTLogCount & satelliteTLogReplicationFactor & satelliteTLogWriteAntiQuorum & satelliteTLogUsableDcs &
|
||||
satelliteTLogPolicyFallback & satelliteTLogReplicationFactorFallback & satelliteTLogWriteAntiQuorumFallback & satelliteTLogUsableDcsFallback & satellites;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -115,12 +122,12 @@ struct DatabaseConfiguration {
|
|||
}
|
||||
return minRequired;
|
||||
}
|
||||
int32_t minMachinesRequiredPerDatacenter() const {
|
||||
int32_t minMachinesRequiredPerDatacenter() const {
|
||||
int minRequired = std::max( remoteTLogReplicationFactor, std::max(tLogReplicationFactor, storageTeamSize) );
|
||||
for(auto& r : regions) {
|
||||
minRequired = std::max( minRequired, r.satelliteTLogReplicationFactor/std::max(1, r.satelliteTLogUsableDcs) );
|
||||
}
|
||||
return minRequired;
|
||||
return minRequired;
|
||||
}
|
||||
|
||||
//Killing an entire datacenter counts as killing one machine in modes that support it
|
||||
|
@ -128,6 +135,9 @@ struct DatabaseConfiguration {
|
|||
int worstSatellite = regions.size() ? std::numeric_limits<int>::max() : 0;
|
||||
for(auto& r : regions) {
|
||||
worstSatellite = std::min(worstSatellite, r.satelliteTLogReplicationFactor - r.satelliteTLogWriteAntiQuorum);
|
||||
if(r.satelliteTLogUsableDcsFallback > 0) {
|
||||
worstSatellite = std::min(worstSatellite, r.satelliteTLogReplicationFactorFallback - r.satelliteTLogWriteAntiQuorumFallback);
|
||||
}
|
||||
}
|
||||
if(usableRegions > 1 && worstSatellite > 0) {
|
||||
return 1 + std::min(std::max(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, worstSatellite - 1), storageTeamSize - 1);
|
||||
|
|
|
@ -339,6 +339,43 @@ public:
|
|||
return results;
|
||||
}
|
||||
|
||||
//FIXME: This logic will fallback unnecessarily when usable dcs > 1 because it does not check all combinations of potential satellite locations
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForSatelliteLogs( const DatabaseConfiguration& conf, const RegionInfo& region, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool& satelliteFallback, bool checkStable = false ) {
|
||||
int startDC = 0;
|
||||
loop {
|
||||
if(startDC > 0 && startDC >= region.satellites.size() + 1 - (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs)) {
|
||||
if(satelliteFallback || region.satelliteTLogUsableDcsFallback == 0) {
|
||||
throw no_more_servers();
|
||||
} else {
|
||||
if(now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) {
|
||||
throw operation_failed();
|
||||
}
|
||||
satelliteFallback = true;
|
||||
startDC = 0;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
std::set<Optional<Key>> satelliteDCs;
|
||||
for(int s = startDC; s < std::min<int>(startDC + (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs), region.satellites.size()); s++) {
|
||||
satelliteDCs.insert(region.satellites[s].dcId);
|
||||
}
|
||||
|
||||
if(satelliteFallback) {
|
||||
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactorFallback, conf.getDesiredSatelliteLogs(region.dcId)*region.satelliteTLogUsableDcsFallback/region.satelliteTLogUsableDcs, region.satelliteTLogPolicyFallback, id_used, checkStable, satelliteDCs );
|
||||
} else {
|
||||
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactor, conf.getDesiredSatelliteLogs(region.dcId), region.satelliteTLogPolicy, id_used, checkStable, satelliteDCs );
|
||||
}
|
||||
} catch (Error &e) {
|
||||
if(e.code() != error_code_no_more_servers) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
startDC++;
|
||||
}
|
||||
}
|
||||
|
||||
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
|
||||
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
|
||||
|
||||
|
@ -445,6 +482,8 @@ public:
|
|||
}
|
||||
|
||||
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
|
||||
|
||||
std::string toString() const { return format("%d %d &d", bestFit, worstFit, count); }
|
||||
};
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
|
||||
|
@ -495,11 +534,11 @@ public:
|
|||
id_used[clusterControllerProcessId]++;
|
||||
|
||||
ASSERT(dcId.present());
|
||||
|
||||
|
||||
std::set<Optional<Key>> primaryDC;
|
||||
primaryDC.insert(dcId);
|
||||
result.dcId = dcId;
|
||||
|
||||
|
||||
RegionInfo region;
|
||||
for(auto& r : req.configuration.regions) {
|
||||
if(r.dcId == dcId.get()) {
|
||||
|
@ -507,14 +546,14 @@ public:
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if(req.recruitSeedServers) {
|
||||
auto primaryStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.storagePolicy, dcId );
|
||||
for(int i = 0; i < primaryStorageServers.size(); i++) {
|
||||
result.storageServers.push_back(primaryStorageServers[i].first);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
auto tlogs = getWorkersForTlogs( req.configuration, req.configuration.tLogReplicationFactor, req.configuration.getDesiredLogs(), req.configuration.tLogPolicy, id_used, false, primaryDC );
|
||||
for(int i = 0; i < tlogs.size(); i++) {
|
||||
result.tLogs.push_back(tlogs[i].first);
|
||||
|
@ -522,31 +561,9 @@ public:
|
|||
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> satelliteLogs;
|
||||
if(region.satelliteTLogReplicationFactor > 0) {
|
||||
int startDC = 0;
|
||||
loop {
|
||||
if(startDC > 0 && startDC >= region.satellites.size() + 1 - region.satelliteTLogUsableDcs) {
|
||||
throw no_more_servers();
|
||||
}
|
||||
|
||||
try {
|
||||
std::set<Optional<Key>> satelliteDCs;
|
||||
for(int s = startDC; s < std::min<int>(startDC + region.satelliteTLogUsableDcs, region.satellites.size()); s++) {
|
||||
satelliteDCs.insert(region.satellites[s].dcId);
|
||||
}
|
||||
|
||||
satelliteLogs = getWorkersForTlogs( req.configuration, region.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(dcId), region.satelliteTLogPolicy, id_used, false, satelliteDCs );
|
||||
|
||||
for(int i = 0; i < satelliteLogs.size(); i++) {
|
||||
result.satelliteTLogs.push_back(satelliteLogs[i].first);
|
||||
}
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
if(e.code() != error_code_no_more_servers) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
startDC++;
|
||||
satelliteLogs = getWorkersForSatelliteLogs( req.configuration, region, id_used, result.satelliteFallback );
|
||||
for(int i = 0; i < satelliteLogs.size(); i++) {
|
||||
result.satelliteTLogs.push_back(satelliteLogs[i].first);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -728,22 +745,19 @@ public:
|
|||
if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
std::map< Optional<Standalone<StringRef>>, int> id_used;
|
||||
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
|
||||
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
|
||||
|
||||
|
||||
std::set<Optional<Key>> primaryDC;
|
||||
primaryDC.insert(regions[0].dcId);
|
||||
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
|
||||
|
||||
|
||||
if(regions[0].satelliteTLogReplicationFactor > 0) {
|
||||
std::set<Optional<Key>> satelliteDCs;
|
||||
for(auto &s : regions[0].satellites) {
|
||||
satelliteDCs.insert(s.dcId);
|
||||
}
|
||||
getWorkersForTlogs(db.config, regions[0].satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(regions[0].dcId), regions[0].satelliteTLogPolicy, id_used, true, satelliteDCs);
|
||||
bool satelliteFallback = false;
|
||||
getWorkersForSatelliteLogs(db.config, regions[0], id_used, satelliteFallback, true);
|
||||
}
|
||||
|
||||
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
|
||||
|
@ -868,7 +882,6 @@ public:
|
|||
return true;
|
||||
|
||||
std::set<Optional<Key>> primaryDC;
|
||||
std::set<Optional<Key>> satelliteDCs;
|
||||
std::set<Optional<Key>> remoteDC;
|
||||
|
||||
RegionInfo region;
|
||||
|
@ -883,12 +896,6 @@ public:
|
|||
region = r;
|
||||
}
|
||||
}
|
||||
|
||||
if(region.satelliteTLogReplicationFactor > 0) {
|
||||
for(auto &s : region.satellites) {
|
||||
satelliteDCs.insert(s.dcId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check tLog fitness
|
||||
|
@ -897,10 +904,23 @@ public:
|
|||
|
||||
if(oldTLogFit < newTLogFit) return false;
|
||||
|
||||
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog);
|
||||
RoleFitness newSatelliteTLogFit(region.satelliteTLogReplicationFactor > 0 ? getWorkersForTlogs(db.config, region.satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(clusterControllerDcId), region.satelliteTLogPolicy, id_used, true, satelliteDCs) : satellite_tlogs, ProcessClass::TLog);
|
||||
bool oldSatelliteFallback = false;
|
||||
for(auto& logSet : dbi.logSystemConfig.tLogs) {
|
||||
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
|
||||
oldSatelliteFallback = logSet.tLogPolicy->info() != region.satelliteTLogPolicy->info();
|
||||
ASSERT(!oldSatelliteFallback || logSet.tLogPolicy->info() == region.satelliteTLogPolicyFallback->info());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(oldSatelliteTLogFit < newSatelliteTLogFit) return false;
|
||||
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog);
|
||||
bool newSatelliteFallback = false;
|
||||
RoleFitness newSatelliteTLogFit(region.satelliteTLogReplicationFactor > 0 ? getWorkersForSatelliteLogs(db.config, region, id_used, newSatelliteFallback, true) : satellite_tlogs, ProcessClass::TLog);
|
||||
|
||||
if(oldSatelliteTLogFit < newSatelliteTLogFit)
|
||||
return false;
|
||||
if(!oldSatelliteFallback && newSatelliteFallback)
|
||||
return false;
|
||||
|
||||
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
|
||||
RoleFitness newRemoteTLogFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::REMOTE_RECOVERED) ? getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC) : remote_tlogs, ProcessClass::TLog);
|
||||
|
@ -936,14 +956,14 @@ public:
|
|||
|
||||
if(oldInFit.betterFitness(newInFit)) return false;
|
||||
|
||||
if(oldTLogFit > newTLogFit || oldInFit > newInFit || oldSatelliteTLogFit > newSatelliteTLogFit || oldRemoteTLogFit > newRemoteTLogFit || oldLogRoutersFit > newLogRoutersFit) {
|
||||
if(oldTLogFit > newTLogFit || oldInFit > newInFit || (oldSatelliteFallback && !newSatelliteFallback) || oldSatelliteTLogFit > newSatelliteTLogFit || oldRemoteTLogFit > newRemoteTLogFit || oldLogRoutersFit > newLogRoutersFit) {
|
||||
TraceEvent("BetterMasterExists", id).detail("OldMasterFit", oldMasterFit).detail("NewMasterFit", mworker.fitness)
|
||||
.detail("OldTLogFitC", oldTLogFit.count).detail("NewTLogFitC", newTLogFit.count)
|
||||
.detail("OldTLogWorstFitT", oldTLogFit.worstFit).detail("NewTLogWorstFitT", newTLogFit.worstFit)
|
||||
.detail("OldTLogBestFitT", oldTLogFit.bestFit).detail("NewTLogBestFitT", newTLogFit.bestFit)
|
||||
.detail("OldInFitW", oldInFit.worstFit).detail("NewInFitW", newInFit.worstFit)
|
||||
.detail("OldInFitB", oldInFit.bestFit).detail("NewInFitB", newInFit.bestFit)
|
||||
.detail("OldInFitC", oldInFit.count).detail("NewInFitC", newInFit.count);
|
||||
.detail("OldTLogFit", oldTLogFit.toString()).detail("NewTLogFit", newTLogFit.toString())
|
||||
.detail("OldInFit", oldInFit.toString()).detail("NewInFit", newInFit.toString())
|
||||
.detail("OldSatelliteFit", oldSatelliteTLogFit.toString()).detail("NewSatelliteFit", newSatelliteTLogFit.toString())
|
||||
.detail("OldRemoteFit", oldRemoteTLogFit.toString()).detail("NewRemoteFit", newRemoteTLogFit.toString())
|
||||
.detail("OldRouterFit", oldLogRoutersFit.toString()).detail("NewRouterFit", newLogRoutersFit.toString())
|
||||
.detail("OldSatelliteFallback", oldSatelliteFallback).detail("NewSatelliteFallback", newSatelliteFallback);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1230,17 +1250,23 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
|
||||
Void _ = wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
|
||||
try {
|
||||
Void _ = wait( delay(SERVER_KNOBS->CHECK_OUTSTANDING_INTERVAL) );
|
||||
|
||||
checkOutstandingRecruitmentRequests( self );
|
||||
checkOutstandingRemoteRecruitmentRequests( self );
|
||||
checkOutstandingStorageRequests( self );
|
||||
checkOutstandingRecruitmentRequests( self );
|
||||
checkOutstandingRemoteRecruitmentRequests( self );
|
||||
checkOutstandingStorageRequests( self );
|
||||
|
||||
self->checkRecoveryStalled();
|
||||
if (self->betterMasterExists()) {
|
||||
if (!self->db.forceMasterFailure.isSet()) {
|
||||
self->db.forceMasterFailure.send( Void() );
|
||||
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
|
||||
self->checkRecoveryStalled();
|
||||
if (self->betterMasterExists()) {
|
||||
if (!self->db.forceMasterFailure.isSet()) {
|
||||
self->db.forceMasterFailure.send( Void() );
|
||||
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
|
||||
}
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) {
|
||||
TraceEvent(SevError, "CheckOutstandingError").error(e);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
|
|
@ -89,10 +89,13 @@ struct RecruitFromConfigurationReply {
|
|||
vector<WorkerInterface> storageServers;
|
||||
vector<WorkerInterface> oldLogRouters;
|
||||
Optional<Key> dcId;
|
||||
bool satelliteFallback;
|
||||
|
||||
RecruitFromConfigurationReply() : satelliteFallback(false) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & tLogs & satelliteTLogs & proxies & resolvers & storageServers & oldLogRouters & dcId;
|
||||
ar & tLogs & satelliteTLogs & proxies & resolvers & storageServers & oldLogRouters & dcId & satelliteFallback;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -936,25 +936,16 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
g_simulator.remoteTLogPolicy = simconfig.db.getRemoteTLogPolicy();
|
||||
g_simulator.usableRegions = simconfig.db.usableRegions;
|
||||
|
||||
if(simconfig.db.regions.size() == 2) {
|
||||
g_simulator.primaryDcId = simconfig.db.regions[0].dcId;
|
||||
g_simulator.remoteDcId = simconfig.db.regions[1].dcId;
|
||||
g_simulator.hasSatelliteReplication = simconfig.db.regions[0].satelliteTLogReplicationFactor > 0;
|
||||
ASSERT((!simconfig.db.regions[0].satelliteTLogPolicy && !simconfig.db.regions[1].satelliteTLogPolicy) || simconfig.db.regions[0].satelliteTLogPolicy->info() == simconfig.db.regions[1].satelliteTLogPolicy->info());
|
||||
g_simulator.satelliteTLogPolicy = simconfig.db.regions[0].satelliteTLogPolicy;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = simconfig.db.regions[0].satelliteTLogWriteAntiQuorum;
|
||||
|
||||
for(auto s : simconfig.db.regions[0].satellites) {
|
||||
g_simulator.primarySatelliteDcIds.push_back(s.dcId);
|
||||
}
|
||||
for(auto s : simconfig.db.regions[1].satellites) {
|
||||
g_simulator.remoteSatelliteDcIds.push_back(s.dcId);
|
||||
}
|
||||
} else if(simconfig.db.regions.size() == 1) {
|
||||
if(simconfig.db.regions.size() > 0) {
|
||||
g_simulator.primaryDcId = simconfig.db.regions[0].dcId;
|
||||
g_simulator.hasSatelliteReplication = simconfig.db.regions[0].satelliteTLogReplicationFactor > 0;
|
||||
g_simulator.satelliteTLogPolicy = simconfig.db.regions[0].satelliteTLogPolicy;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = simconfig.db.regions[0].satelliteTLogWriteAntiQuorum;
|
||||
if(simconfig.db.regions[0].satelliteTLogUsableDcsFallback > 0) {
|
||||
g_simulator.satelliteTLogPolicy = simconfig.db.regions[0].satelliteTLogPolicyFallback;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = simconfig.db.regions[0].satelliteTLogWriteAntiQuorumFallback;
|
||||
} else {
|
||||
g_simulator.satelliteTLogPolicy = simconfig.db.regions[0].satelliteTLogPolicy;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = simconfig.db.regions[0].satelliteTLogWriteAntiQuorum;
|
||||
}
|
||||
|
||||
for(auto s : simconfig.db.regions[0].satellites) {
|
||||
g_simulator.primarySatelliteDcIds.push_back(s.dcId);
|
||||
|
@ -963,7 +954,16 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
g_simulator.hasSatelliteReplication = false;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = 0;
|
||||
}
|
||||
|
||||
|
||||
if(simconfig.db.regions.size() == 2) {
|
||||
g_simulator.remoteDcId = simconfig.db.regions[1].dcId;
|
||||
ASSERT((!simconfig.db.regions[0].satelliteTLogPolicy && !simconfig.db.regions[1].satelliteTLogPolicy) || simconfig.db.regions[0].satelliteTLogPolicy->info() == simconfig.db.regions[1].satelliteTLogPolicy->info());
|
||||
|
||||
for(auto s : simconfig.db.regions[1].satellites) {
|
||||
g_simulator.remoteSatelliteDcIds.push_back(s.dcId);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(g_simulator.storagePolicy && g_simulator.tLogPolicy);
|
||||
ASSERT(!g_simulator.hasSatelliteReplication || g_simulator.satelliteTLogPolicy);
|
||||
TraceEvent("SimulatorConfig").detail("ConfigString", printable(StringRef(startingConfigString)));
|
||||
|
|
|
@ -1237,6 +1237,7 @@ static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, Proces
|
|||
int extraTlogEligibleMachines = std::numeric_limits<int>::max();
|
||||
for(auto& region : configuration.regions) {
|
||||
extraTlogEligibleMachines = std::min<int>( extraTlogEligibleMachines, dcId_machine[region.dcId].size() - std::max(configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) ) );
|
||||
//FIXME: does not take into account fallback satellite policies
|
||||
if(region.satelliteTLogReplicationFactor > 0) {
|
||||
int totalSatelliteEligible = 0;
|
||||
for(auto& sat : region.satellites) {
|
||||
|
|
|
@ -1617,9 +1617,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
if(region.satelliteTLogReplicationFactor > 0) {
|
||||
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
|
||||
logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorum;
|
||||
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactor;
|
||||
logSystem->tLogs[1]->tLogPolicy = region.satelliteTLogPolicy;
|
||||
if(recr.satelliteFallback) {
|
||||
logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorumFallback;
|
||||
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactorFallback;
|
||||
logSystem->tLogs[1]->tLogPolicy = region.satelliteTLogPolicyFallback;
|
||||
} else {
|
||||
logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorum;
|
||||
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactor;
|
||||
logSystem->tLogs[1]->tLogPolicy = region.satelliteTLogPolicy;
|
||||
}
|
||||
logSystem->tLogs[1]->isLocal = true;
|
||||
logSystem->tLogs[1]->locality = tagLocalitySatellite;
|
||||
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
|
||||
|
|
Loading…
Reference in New Issue