Merge pull request #1320 from bnamasivayam/dc-as-satellite-config

Support config where the primary and remote DC's can be used as satel…
This commit is contained in:
Evan Tschannen 2019-03-19 15:49:24 -07:00 committed by GitHub
commit 20764efa24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 52 deletions

View File

@ -24,6 +24,9 @@ Features
* Added support for IPv6. `(PR #1176) https://github.com/apple/foundationdb/pull/1178`_
* FDB can now simultaneously listen to TLS and unencrypted ports to facilitate smoother migration to TLS. `(PR #1157) https://github.com/apple/foundationdb/pull/1157`_
* Added `DISABLE_POSIX_KERNEL_AIO` knob to fallback to libeio instead of kernel async I/O (KAIO) for systems that do not support KAIO or O_DIRECT flag. `(PR #1283) https://github.com/apple/foundationdb/pull/1283`_
* Added support for a config where the primary and remote DC's can be used as satellites. `(PR #1320) https://github.com/apple/foundationdb/pull/1320`_
* Added support for restoring multiple key ranges in a single restore job. `(PR #1190) https://github.com/apple/foundationdb/pull/1190`_
* Depracated transaction option TRANSACTION_LOGGING_ENABLE. Added two new transaction options DEBUG_TRANSACTION_IDENTIFIER and LOG_TRANSACTION that sets an identifier for the transaction and logs the transaction to the trace file respectively. `(PR #1200) https://github.com/apple/foundationdb/pull/1200`_
Performance
-----------

View File

@ -205,11 +205,14 @@ bool DatabaseConfiguration::isValid() const {
return false;
}
dcIds.insert(r.dcId);
std::set<Key> satelliteDcIds;
satelliteDcIds.insert(Key());
satelliteDcIds.insert(r.dcId);
for(auto& s : r.satellites) {
if(dcIds.count(s.dcId)) {
if (satelliteDcIds.count(s.dcId)) {
return false;
}
dcIds.insert(s.dcId);
satelliteDcIds.insert(s.dcId);
}
}

View File

@ -275,7 +275,7 @@ public:
return results;
}
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, Reference<IReplicationPolicy> const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, Reference<IReplicationPolicy> const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>(), std::vector<UID> exclusionWorkerIds = {}) {
std::map<std::pair<ProcessClass::Fitness,bool>, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
std::vector<LocalityData> unavailableLocals;
@ -285,14 +285,15 @@ public:
logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
logServerMap = (LocalityMap<WorkerDetails>*) logServerSet.getPtr();
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( ProcessClass::TLog );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId())) ) {
fitness_workers[ std::make_pair(fitness,it.second.details.degraded) ].push_back(it.second.details);
}
else {
unavailableLocals.push_back(it.second.details.interf.locality);
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), it.second.details.interf.id()) == exclusionWorkerIds.end()) {
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::TLog);
if (workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId()))) {
fitness_workers[std::make_pair(fitness, it.second.details.degraded)].push_back(it.second.details);
}
else {
unavailableLocals.push_back(it.second.details.interf.locality);
}
}
}
@ -355,7 +356,7 @@ public:
TraceEvent(SevWarn, "GetTLogTeamFailed").detail("Policy", policy->info()).detail("Processes", logServerSet->size()).detail("Workers", id_worker.size()).detail("FitnessGroups", fitness_workers.size())
.detail("TLogZones", ::describeZones(tLocalities)).detail("TLogDataHalls", ::describeDataHalls(tLocalities)).detail("MissingZones", ::describeZones(unavailableLocals))
.detail("MissingDataHalls", ::describeDataHalls(unavailableLocals)).detail("Required", required).detail("DesiredLogs", desired).detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
.detail("CheckStable", checkStable).detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS).backtrace();
.detail("CheckStable", checkStable).detail("NumExclusionWorkers", exclusionWorkerIds.size()).detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS).backtrace();
logServerSet->clear();
logServerSet.clear();
@ -376,7 +377,7 @@ public:
}
//FIXME: This logic will fallback unnecessarily when usable dcs > 1 because it does not check all combinations of potential satellite locations
std::vector<WorkerDetails> getWorkersForSatelliteLogs( const DatabaseConfiguration& conf, const RegionInfo& region, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool& satelliteFallback, bool checkStable = false ) {
std::vector<WorkerDetails> getWorkersForSatelliteLogs( const DatabaseConfiguration& conf, const RegionInfo& region, const RegionInfo& remoteRegion, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool& satelliteFallback, bool checkStable = false ) {
int startDC = 0;
loop {
if(startDC > 0 && startDC >= region.satellites.size() + 1 - (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs)) {
@ -392,15 +393,26 @@ public:
}
try {
bool remoteDCUsedAsSatellite = false;
std::set<Optional<Key>> satelliteDCs;
for(int s = startDC; s < std::min<int>(startDC + (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs), region.satellites.size()); s++) {
satelliteDCs.insert(region.satellites[s].dcId);
if (region.satellites[s].dcId == remoteRegion.dcId) {
remoteDCUsedAsSatellite = true;
}
}
std::vector<UID> exclusionWorkerIds;
// FIXME: If remote DC is used as satellite then this logic only ensures that required number of remote TLogs can be recruited. It does not balance the number of desired TLogs
// across the satellite and remote sides.
if (remoteDCUsedAsSatellite) {
std::map< Optional<Standalone<StringRef>>, int> tmpIdUsed;
auto remoteLogs = getWorkersForTlogs(conf, conf.getRemoteTLogReplicationFactor(), conf.getRemoteTLogReplicationFactor(), conf.getRemoteTLogPolicy(), tmpIdUsed, false, { remoteRegion.dcId }, {});
std::transform(remoteLogs.begin(), remoteLogs.end(), std::back_inserter(exclusionWorkerIds), [](const WorkerDetails &in) { return in.interf.id(); });
}
if(satelliteFallback) {
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactorFallback, conf.getDesiredSatelliteLogs(region.dcId)*region.satelliteTLogUsableDcsFallback/region.satelliteTLogUsableDcs, region.satelliteTLogPolicyFallback, id_used, checkStable, satelliteDCs );
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactorFallback, conf.getDesiredSatelliteLogs(region.dcId)*region.satelliteTLogUsableDcsFallback/region.satelliteTLogUsableDcs, region.satelliteTLogPolicyFallback, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
} else {
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactor, conf.getDesiredSatelliteLogs(region.dcId), region.satelliteTLogPolicy, id_used, checkStable, satelliteDCs );
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactor, conf.getDesiredSatelliteLogs(region.dcId), region.satelliteTLogPolicy, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
}
} catch (Error &e) {
if(e.code() != error_code_no_more_servers) {
@ -560,7 +572,7 @@ public:
std::set<Optional<Key>> remoteDC;
remoteDC.insert(req.dcId);
auto remoteLogs = getWorkersForTlogs( req.configuration, req.configuration.getRemoteTLogReplicationFactor(), req.configuration.getDesiredRemoteLogs(), req.configuration.getRemoteTLogPolicy(), id_used, false, remoteDC );
auto remoteLogs = getWorkersForTlogs( req.configuration, req.configuration.getRemoteTLogReplicationFactor(), req.configuration.getDesiredRemoteLogs(), req.configuration.getRemoteTLogPolicy(), id_used, false, remoteDC, req.exclusionWorkerIds );
for(int i = 0; i < remoteLogs.size(); i++) {
result.remoteTLogs.push_back(remoteLogs[i].interf);
}
@ -602,10 +614,13 @@ public:
result.dcId = dcId;
RegionInfo region;
RegionInfo remoteRegion;
for(auto& r : req.configuration.regions) {
if(r.dcId == dcId.get()) {
region = r;
break;
}
else {
remoteRegion = r;
}
}
@ -623,7 +638,7 @@ public:
std::vector<WorkerDetails> satelliteLogs;
if(region.satelliteTLogReplicationFactor > 0) {
satelliteLogs = getWorkersForSatelliteLogs( req.configuration, region, id_used, result.satelliteFallback );
satelliteLogs = getWorkersForSatelliteLogs( req.configuration, region, remoteRegion, id_used, result.satelliteFallback );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].interf);
}
@ -821,7 +836,7 @@ public:
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.getDesiredLogs(), db.config.tLogPolicy, id_used, true, primaryDC);
if(regions[0].satelliteTLogReplicationFactor > 0) {
bool satelliteFallback = false;
getWorkersForSatelliteLogs(db.config, regions[0], id_used, satelliteFallback, true);
getWorkersForSatelliteLogs(db.config, regions[0], regions[1], id_used, satelliteFallback, true);
}
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
@ -960,12 +975,14 @@ public:
std::set<Optional<Key>> remoteDC;
RegionInfo region;
RegionInfo remoteRegion;
if(db.config.regions.size() && clusterControllerDcId.present()) {
primaryDC.insert(clusterControllerDcId);
for(auto& r : db.config.regions) {
if(r.dcId != clusterControllerDcId.get()) {
ASSERT(remoteDC.empty());
remoteDC.insert(r.dcId);
remoteRegion = r;
} else {
ASSERT(region.dcId == StringRef());
region = r;
@ -975,7 +992,8 @@ public:
// Check tLog fitness
RoleFitness oldTLogFit(tlogs, ProcessClass::TLog);
RoleFitness newTLogFit(getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.getDesiredLogs(), db.config.tLogPolicy, id_used, true, primaryDC), ProcessClass::TLog);
auto newTLogs = getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.getDesiredLogs(), db.config.tLogPolicy, id_used, true, primaryDC);
RoleFitness newTLogFit(newTLogs, ProcessClass::TLog);
if(oldTLogFit < newTLogFit) return false;
@ -990,7 +1008,8 @@ public:
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog);
bool newSatelliteFallback = false;
RoleFitness newSatelliteTLogFit(region.satelliteTLogReplicationFactor > 0 ? getWorkersForSatelliteLogs(db.config, region, id_used, newSatelliteFallback, true) : satellite_tlogs, ProcessClass::TLog);
auto newSatelliteTLogs = region.satelliteTLogReplicationFactor > 0 ? getWorkersForSatelliteLogs(db.config, region, remoteRegion, id_used, newSatelliteFallback, true) : satellite_tlogs;
RoleFitness newSatelliteTLogFit(newSatelliteTLogs, ProcessClass::TLog);
if(oldSatelliteTLogFit < newSatelliteTLogFit)
return false;
@ -998,9 +1017,13 @@ public:
return false;
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
std::vector<UID> exclusionWorkerIds;
auto fn = [](const WorkerDetails &in) { return in.interf.id(); };
std::transform(newTLogs.begin(), newTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
std::transform(newSatelliteTLogs.begin(), newSatelliteTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
RoleFitness newRemoteTLogFit(
(db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ?
getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC)
getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC, exclusionWorkerIds)
: remote_tlogs, ProcessClass::TLog);
if(oldRemoteTLogFit < newRemoteTLogFit) return false;
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));

View File

@ -104,14 +104,15 @@ struct RecruitRemoteFromConfigurationRequest {
DatabaseConfiguration configuration;
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount) {}
RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId, int logRouterCount, const std::vector<UID> &exclusionWorkerIds) : configuration(configuration), dcId(dcId), logRouterCount(logRouterCount), exclusionWorkerIds(exclusionWorkerIds){}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, configuration, dcId, logRouterCount, reply);
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};

View File

@ -165,15 +165,18 @@ public:
bool foundDuplicate = false;
std::set<Optional<Key>> zones;
std::set<Optional<Key>> dcs;
for(auto& loc : tLogLocalities) {
if(zones.count(loc.zoneId())) {
foundDuplicate = true;
break;
}
zones.insert(loc.zoneId());
zones.insert(loc.dcId());
}
bool moreThanOneDC = dcs.size() > 1 ? true : false;
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1)) ? (g_network->isSimulated() && !foundDuplicate ? SevError : SevWarnAlways) : SevInfo, "CheckSatelliteTagLocations").detail("MinUsed", minUsed).detail("MaxUsed", maxUsed).detail("MinUsedBest", minUsedBest).detail("MaxUsedBest", maxUsedBest).detail("DuplicateZones", foundDuplicate);
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1)) ? (g_network->isSimulated() && !foundDuplicate && !moreThanOneDC ? SevError : SevWarnAlways) : SevInfo, "CheckSatelliteTagLocations").detail("MinUsed", minUsed).detail("MaxUsed", maxUsed).detail("MinUsedBest", minUsedBest).detail("MaxUsedBest", maxUsedBest).detail("DuplicateZones", foundDuplicate).detail("NumOfDCs", dcs.size());
}
int bestLocationFor( Tag tag ) {

View File

@ -882,32 +882,6 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
bool needsRemote = generateFearless;
if(generateFearless) {
StatusObject primarySatelliteObj;
primarySatelliteObj["id"] = "2";
primarySatelliteObj["priority"] = 1;
primarySatelliteObj["satellite"] = 1;
primaryDcArr.push_back(primarySatelliteObj);
StatusObject remoteSatelliteObj;
remoteSatelliteObj["id"] = "3";
remoteSatelliteObj["priority"] = 1;
remoteSatelliteObj["satellite"] = 1;
remoteDcArr.push_back(remoteSatelliteObj);
if(datacenters > 4) {
StatusObject primarySatelliteObjB;
primarySatelliteObjB["id"] = "4";
primarySatelliteObjB["priority"] = 1;
primarySatelliteObjB["satellite"] = 1;
primaryDcArr.push_back(primarySatelliteObjB);
StatusObject remoteSatelliteObjB;
remoteSatelliteObjB["id"] = "5";
remoteSatelliteObjB["priority"] = 1;
remoteSatelliteObjB["satellite"] = 1;
remoteDcArr.push_back(remoteSatelliteObjB);
}
if(datacenters > 4) {
//FIXME: we cannot use one satellite replication with more than one satellite per region because canKillProcesses does not respect usable_dcs
int satellite_replication_type = g_random->randomInt(0,3);
@ -1013,6 +987,36 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
if (g_random->random01() < 0.25) db.desiredLogRouterCount = g_random->randomInt(1,7);
if (g_random->random01() < 0.25) db.remoteDesiredTLogCount = g_random->randomInt(1,7);
bool useNormalDCsAsSatellites = datacenters > 4 && minimumRegions < 2 && g_random->random01() < 0.3;
StatusObject primarySatelliteObj;
primarySatelliteObj["id"] = useNormalDCsAsSatellites ? "1" : "2";
primarySatelliteObj["priority"] = 1;
primarySatelliteObj["satellite"] = 1;
primaryDcArr.push_back(primarySatelliteObj);
StatusObject remoteSatelliteObj;
remoteSatelliteObj["id"] = useNormalDCsAsSatellites ? "0" : "3";
remoteSatelliteObj["priority"] = 1;
remoteSatelliteObj["satellite"] = 1;
remoteDcArr.push_back(remoteSatelliteObj);
if (datacenters > 4) {
StatusObject primarySatelliteObjB;
primarySatelliteObjB["id"] = useNormalDCsAsSatellites ? "2" : "4";
primarySatelliteObjB["priority"] = 1;
primarySatelliteObjB["satellite"] = 1;
primaryDcArr.push_back(primarySatelliteObjB);
StatusObject remoteSatelliteObjB;
remoteSatelliteObjB["id"] = useNormalDCsAsSatellites ? "2" : "5";
remoteSatelliteObjB["priority"] = 1;
remoteSatelliteObjB["satellite"] = 1;
remoteDcArr.push_back(remoteSatelliteObjB);
}
if (useNormalDCsAsSatellites) {
datacenters = 3;
}
}
primaryObj["datacenters"] = primaryDcArr;

View File

@ -320,7 +320,10 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
TraceEvent(SevWarn, "UnknownRemoteDCID", self->dbgid).detail("RemoteId", printable(remoteDcId)).detail("Loc", loc);
}
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())) ) ) );
std::vector<UID> exclusionWorkerIds;
std::transform(recr.tLogs.begin(), recr.tLogs.end(), std::back_inserter(exclusionWorkerIds), [](const WorkerInterface &in) { return in.id(); });
std::transform(recr.satelliteTLogs.begin(), recr.satelliteTLogs.end(), std::back_inserter(exclusionWorkerIds), [](const WorkerInterface &in) { return in.id(); });
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())), exclusionWorkerIds) ) );
self->primaryLocality = self->dcId_locality[recr.dcId];
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );