From ea26bc1c43920d3fe3945e7ee14e60ff7af81647 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Thu, 7 Sep 2017 15:32:08 -0700 Subject: [PATCH] passed first tests which kill entire datacenters added configuration options for the remote data center and satellite data centers updated cluster controller recruitment logic refactors how master writes core state updated log recovery, and log system peeking --- fdbclient/ManagementAPI.actor.cpp | 114 +++++-- fdbrpc/Locality.cpp | 17 + fdbrpc/Locality.h | 7 +- fdbrpc/simulator.h | 5 +- fdbserver/ClusterController.actor.cpp | 268 +++++++++++---- fdbserver/ClusterRecruitmentInterface.h | 39 ++- fdbserver/CoordinatedState.actor.cpp | 13 +- fdbserver/CoordinatedState.h | 1 + fdbserver/DBCoreState.h | 7 +- fdbserver/DataDistribution.actor.cpp | 116 +++---- fdbserver/DatabaseConfiguration.cpp | 109 +++++- fdbserver/DatabaseConfiguration.h | 43 ++- fdbserver/LogRouter.actor.cpp | 4 +- fdbserver/LogSystem.h | 9 +- fdbserver/LogSystemConfig.h | 21 +- fdbserver/LogSystemPeekCursor.actor.cpp | 63 ++-- fdbserver/Status.actor.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 277 ++++++++++----- fdbserver/masterserver.actor.cpp | 354 ++++++++++---------- 19 files changed, 993 insertions(+), 476 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index f3be1296e2..e16a484e16 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -65,7 +65,11 @@ std::map configForToken( std::string const& mode ) { std::string key = mode.substr(0, pos); std::string value = mode.substr(pos+1); - if( (key == "logs" || key == "proxies" || key == "resolvers") && isInteger(value) ) { + if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "satellite_logs") && isInteger(value) ) { + out[p+key] = value; + } + + if( key == "primary_dc" || key == "remote_dc" || key == "primary_satellite_dcs" || key == "remote_satellite_dcs" ) { out[p+key] = value; } @@ -87,48 +91,40 @@ std::map configForToken( std::string const& mode ) { return out; } - std::string redundancy, log_replicas, remote_replicas; + std::string redundancy, log_replicas; IRepPolicyRef storagePolicy; IRepPolicyRef tLogPolicy; - IRepPolicyRef remoteTLogPolicy; - //FIXME: add modes for real remote policies - + bool redundancySpecified = true; if (mode == "single") { redundancy="1"; log_replicas="1"; - remote_replicas="1"; - storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyOne()); + storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyOne()); } else if(mode == "double" || mode == "fast_recovery_double") { redundancy="2"; log_replicas="2"; - remote_replicas="2"; - storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); + storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "triple" || mode == "fast_recovery_triple") { redundancy="3"; log_replicas="3"; - remote_replicas="3"; - storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); + storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "two_datacenter") { redundancy="3"; log_replicas="3"; - remote_replicas="3"; - storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); + storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "three_datacenter") { redundancy="3"; log_replicas="3"; - remote_replicas="3"; - storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAnd({ + storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAnd({ IRepPolicyRef(new PolicyAcross(3, "dcid", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))) })); } else if(mode == "three_data_hall") { redundancy="3"; log_replicas="4"; - remote_replicas="4"; storagePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne()))); - tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall", + tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else @@ -137,7 +133,6 @@ std::map configForToken( std::string const& mode ) { out[p+"storage_replicas"] = out[p+"storage_quorum"] = redundancy; out[p+"log_replicas"] = log_replicas; - out[p+"remote_log_replication"] = remote_replicas; out[p+"log_anti_quorum"] = "0"; BinaryWriter policyWriter(IncludeVersion()); @@ -147,11 +142,92 @@ std::map configForToken( std::string const& mode ) { policyWriter = BinaryWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, tLogPolicy); out[p+"log_replication_policy"] = policyWriter.toStringRef().toString(); + return out; + } + + std::string remote_redundancy, remote_log_replicas; + IRepPolicyRef remoteStoragePolicy; + IRepPolicyRef remoteTLogPolicy; + bool remoteRedundancySpecified = true; + if (mode == "remote_single") { + remote_redundancy="1"; + remote_log_replicas="1"; + remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyOne()); + } else if(mode == "remote_double") { + remote_redundancy="2"; + remote_log_replicas="2"; + remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); + } else if(mode == "remote_triple") { + remote_redundancy="3"; + remote_log_replicas="3"; + remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); + } else if(mode == "remote_three_data_hall") { + remote_redundancy="3"; + remote_log_replicas="4"; + remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne()))); + remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall", + IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) + )); + } else + remoteRedundancySpecified = false; + if (remoteRedundancySpecified) { + out[p+"remote_storage_replicas"] = + out[p+"remote_storage_quorum"] = remote_redundancy; + out[p+"remote_log_replicas"] = + out[p+"log_routers"] = remote_log_replicas; + + BinaryWriter policyWriter(IncludeVersion()); + serializeReplicationPolicy(policyWriter, remoteStoragePolicy); + out[p+"remote_storage_policy"] = policyWriter.toStringRef().toString(); policyWriter = BinaryWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, remoteTLogPolicy); - out[p+"remote_replication_policy"] = policyWriter.toStringRef().toString(); + out[p+"remote_log_policy"] = policyWriter.toStringRef().toString(); + return out; + } + std::string satellite_log_replicas, satellite_anti_quorum, satellite_usable_dcs; + IRepPolicyRef satelliteTLogPolicy; + bool satelliteRedundancySpecified = true; + if (mode == "one_satellite_single") { + satellite_anti_quorum="0"; + satellite_usable_dcs="1"; + satellite_log_replicas="1"; + satelliteTLogPolicy = IRepPolicyRef(new PolicyOne()); + } else if(mode == "one_satellite_double") { + satellite_anti_quorum="0"; + satellite_usable_dcs="1"; + satellite_log_replicas="2"; + satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); + } else if(mode == "one_satellite_triple") { + satellite_anti_quorum="0"; + satellite_usable_dcs="1"; + satellite_log_replicas="3"; + satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); + } else if(mode == "two_satellite_safe") { + satellite_anti_quorum="0"; + satellite_usable_dcs="2"; + satellite_log_replicas="4"; + satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", + IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) + )); + } else if(mode == "two_satellite_fast") { + satellite_anti_quorum="2"; + satellite_usable_dcs="2"; + satellite_log_replicas="4"; + satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", + IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) + )); + } else + satelliteRedundancySpecified = false; + if (satelliteRedundancySpecified) { + out[p+"satellite_anti_quorum"] = satellite_anti_quorum; + out[p+"satellite_usable_dcs"] = satellite_usable_dcs; + out[p+"satellite_log_replicas"] = satellite_log_replicas; + + BinaryWriter policyWriter(IncludeVersion()); + serializeReplicationPolicy(policyWriter, satelliteTLogPolicy); + out[p+"satellite_log_policy"] = policyWriter.toStringRef().toString(); return out; } diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index 4418a162d3..ecb5b9191f 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -108,6 +108,23 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) { default: return ProcessClass::WorstFit; } + case ProcessClass::LogRouter: + switch( _class ) { + case ProcessClass::LogRouterClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::ResolutionClass: + return ProcessClass::BestOtherFit; + case ProcessClass::TransactionClass: + return ProcessClass::BestOtherFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::TesterClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; + } default: return ProcessClass::NeverAssign; } diff --git a/fdbrpc/Locality.h b/fdbrpc/Locality.h index bafae3f90b..eec0ecc81e 100644 --- a/fdbrpc/Locality.h +++ b/fdbrpc/Locality.h @@ -26,9 +26,9 @@ struct ProcessClass { // This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items! - enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, InvalidClass = -1 }; + enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, LogRouterClass, InvalidClass = -1 }; enum Fitness { BestFit, GoodFit, BestOtherFit, UnsetFit, WorstFit, NeverAssign }; - enum ClusterRole { Storage, TLog, Proxy, Master, Resolver }; + enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter }; enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 }; int16_t _class; int16_t _source; @@ -46,6 +46,7 @@ public: else if (s=="unset") _class = UnsetClass; else if (s=="stateless") _class = StatelessClass; else if (s=="log") _class = LogClass; + else if (s=="router") _class = LogRouterClass; else _class = InvalidClass; } @@ -59,6 +60,7 @@ public: else if (classStr=="unset") _class = UnsetClass; else if (classStr=="stateless") _class = StatelessClass; else if (classStr=="log") _class = LogClass; + else if (classStr=="router") _class = LogRouterClass; else _class = InvalidClass; if (sourceStr=="command_line") _source = CommandLineSource; @@ -87,6 +89,7 @@ public: case TesterClass: return "test"; case StatelessClass: return "stateless"; case LogClass: return "log"; + case LogRouterClass: return "router"; default: return "invalid"; } } diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 0fbdf54a81..6395514d65 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -89,6 +89,7 @@ public: case ProcessClass::TesterClass: return false; case ProcessClass::StatelessClass: return false; case ProcessClass::LogClass: return true; + case ProcessClass::LogRouterClass: return false; default: return false; } } @@ -207,8 +208,8 @@ public: std::map currentlyRebootingProcesses; class ClusterConnectionString* extraDB; IRepPolicyRef storagePolicy; - IRepPolicyRef tLogPolicy; - int tLogWriteAntiQuorum; + IRepPolicyRef tLogPolicy; + int tLogWriteAntiQuorum; //Used by workloads that perform reconfigurations int testerCount; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 75b556ce7d..975f9963e7 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -152,13 +152,13 @@ public: std::pair getStorageWorker( RecruitStorageRequest const& req ) { std::set>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() ); - std::set>> excludedDCs( req.excludeDCs.begin(), req.excludeDCs.end() ); + std::set>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() ); std::set excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() ); for( auto& it : id_worker ) if( workerAvailable( it.second, false ) && !excludedMachines.count(it.second.interf.locality.zoneId()) && - !excludedDCs.count(it.second.interf.locality.dcId()) && + ( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) && !addressExcluded(excludedAddresses, it.second.interf.address()) && it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) { return std::make_pair(it.second.interf, it.second.processClass); @@ -171,7 +171,7 @@ public: ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage ); if( workerAvailable( it.second, false ) && !excludedMachines.count(it.second.interf.locality.zoneId()) && - !excludedDCs.count(it.second.interf.locality.dcId()) && + ( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) && !addressExcluded(excludedAddresses, it.second.interf.address()) && fit < bestFit ) { bestFit = fit; @@ -211,7 +211,7 @@ public: throw no_more_servers(); } -std::vector> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional>, int>& id_used, bool checkStable = false, std::set additionalExlusions = std::set() ) +std::vector> getWorkersForTlogs( DatabaseConfiguration const& conf, std::map< Optional>, int>& id_used, bool checkStable = false, std::set> dcIds = std::set>(), std::set additionalExlusions = std::set() ) { std::map>> fitness_workers; std::vector> results; @@ -225,7 +225,7 @@ std::vector> getWorkersForTlogsAcrossDa for( auto& it : id_worker ) { auto fitness = it.second.processClass.machineClassFitness( ProcessClass::TLog ); - if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && !additionalExlusions.count(it.second.interf.address()) && fitness != ProcessClass::NeverAssign ) { + if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && !additionalExlusions.count(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.interf.locality.dcId())) ) { fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass)); } else { @@ -420,7 +420,7 @@ std::vector> getWorkersForTlogsAcrossDa throw no_more_servers(); } - vector> getWorkersForRoleInDatacenter(Optional> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional>, int>& id_used, WorkerFitnessInfo minWorker, bool checkStable = false ) { + vector> getWorkersForRoleInDatacenter(Optional> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional>, int>& id_used, Optional minWorker = Optional(), bool checkStable = false ) { std::map, vector>> fitness_workers; vector> results; if (amount <= 0) @@ -428,7 +428,8 @@ std::vector> getWorkersForTlogsAcrossDa for( auto& it : id_worker ) { auto fitness = it.second.processClass.machineClassFitness( role ); - if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.id() != minWorker.worker.first.id() && (fitness < minWorker.fitness || (fitness == minWorker.fitness && id_used[it.first] <= minWorker.used)) && it.second.interf.locality.dcId()==dcId ) { + if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId && + ( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) { fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass)); } } @@ -548,78 +549,161 @@ std::vector> getWorkersForTlogsAcrossDa return result; } - RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) { - RecruitFromConfigurationReply result; + RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) { + RecruitRemoteFromConfigurationReply result; std::map< Optional>, int> id_used; - id_used[masterProcessId]++; - auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used ); - std::set additionalExclusions; - for(int i = 0; i < tlogs.size(); i++) { - result.tLogs.push_back(tlogs[i].first); - additionalExclusions.insert(tlogs[i].first.address()); + std::set> remoteDC; + remoteDC.insert(req.dcId); + + auto remoteLogs = getWorkersForTlogs( req.configuration, id_used, false, remoteDC ); + for(int i = 0; i < remoteLogs.size(); i++) { + result.remoteTLogs.push_back(remoteLogs[i].first); } - std::map< Optional>, int> id_used2; - auto remoteTlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used2, false, additionalExclusions ); - for(int i = 0; i < remoteTlogs.size(); i++) { - result.remoteTLogs.push_back(remoteTlogs[i].first); - result.logRouters.push_back(remoteTlogs[i].first); + auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.configuration.logRouterCount, req.configuration, id_used ); + for(int i = 0; i < logRouters.size(); i++) { + result.logRouters.push_back(logRouters[i].first); } - auto datacenters = getDatacenters( req.configuration ); - - InDatacenterFitness bestFitness; - int numEquivalent = 1; - - for(auto dcId : datacenters ) { - auto used = id_used; - auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, used ); - auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, used ); - - auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy ); - auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver ); - - proxies.push_back(first_proxy.worker); - resolvers.push_back(first_resolver.worker); - - auto fitness = InDatacenterFitness(proxies, resolvers); - if(fitness < bestFitness) { - bestFitness = fitness; - numEquivalent = 1; - result.resolvers = vector(); - result.proxies = vector(); - for(int i = 0; i < resolvers.size(); i++) - result.resolvers.push_back(resolvers[i].first); - for(int i = 0; i < proxies.size(); i++) - result.proxies.push_back(proxies[i].first); - } else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) { - result.resolvers = vector(); - result.proxies = vector(); - for(int i = 0; i < resolvers.size(); i++) - result.resolvers.push_back(resolvers[i].first); - for(int i = 0; i < proxies.size(); i++) - result.proxies.push_back(proxies[i].first); - } - } - - ASSERT(bestFitness != InDatacenterFitness()); - - TraceEvent("findWorkersForConfig").detail("replication", req.configuration.tLogReplicationFactor) - .detail("desiredLogs", req.configuration.getDesiredLogs()).detail("actualLogs", result.tLogs.size()) - .detail("desiredProxies", req.configuration.getDesiredProxies()).detail("actualProxies", result.proxies.size()) - .detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size()); - + //FIXME: fitness for logs is wrong if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY && - ( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) || - bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) { + ( AcrossDatacenterFitness(remoteLogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ) ) { throw operation_failed(); } return result; } + RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional dcId ) { + RecruitFromConfigurationReply result; + std::map< Optional>, int> id_used; + id_used[masterProcessId]++; + ASSERT(dcId == req.configuration.primaryDcId || dcId == req.configuration.remoteDcId); + std::set> primaryDC; + primaryDC.insert(dcId == req.configuration.primaryDcId ? req.configuration.primaryDcId : req.configuration.remoteDcId); + result.remoteDcId = dcId == req.configuration.primaryDcId ? req.configuration.remoteDcId : req.configuration.primaryDcId; + + auto tlogs = getWorkersForTlogs( req.configuration, id_used, false, primaryDC ); + for(int i = 0; i < tlogs.size(); i++) { + result.tLogs.push_back(tlogs[i].first); + } + + if(req.configuration.satelliteTLogReplicationFactor > 0) { + std::set> satelliteDCs; + if( dcId == req.configuration.primaryDcId ) { + satelliteDCs.insert( req.configuration.primarySatelliteDcIds.begin(), req.configuration.primarySatelliteDcIds.end() ); + } else { + satelliteDCs.insert( req.configuration.remoteSatelliteDcIds.begin(), req.configuration.remoteSatelliteDcIds.end() ); + } + auto satelliteLogs = getWorkersForTlogs( req.configuration, id_used, false, satelliteDCs ); + + for(int i = 0; i < satelliteLogs.size(); i++) { + result.satelliteTLogs.push_back(satelliteLogs[i].first); + } + } + + auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, id_used ); + auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, id_used ); + + auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, id_used, first_proxy ); + auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, id_used, first_resolver ); + + proxies.push_back(first_proxy.worker); + resolvers.push_back(first_resolver.worker); + + auto fitness = InDatacenterFitness(proxies, resolvers); + for(int i = 0; i < resolvers.size(); i++) + result.resolvers.push_back(resolvers[i].first); + for(int i = 0; i < proxies.size(); i++) + result.proxies.push_back(proxies[i].first); + + //FIXME: fitness for logs is wrong + if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY && + ( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) || + fitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) { + throw operation_failed(); + } + + return result; + } + + RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) { + if(req.configuration.remoteTLogReplicationFactor > 0) { + try { + return findWorkersForConfiguration(req, req.configuration.primaryDcId); + } catch( Error& e ) { + if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) { + TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e); + return findWorkersForConfiguration(req, req.configuration.remoteDcId); + } else { + throw; + } + } + } else { + RecruitFromConfigurationReply result; + std::map< Optional>, int> id_used; + id_used[masterProcessId]++; + + auto tlogs = getWorkersForTlogs( req.configuration, id_used ); + for(int i = 0; i < tlogs.size(); i++) { + result.tLogs.push_back(tlogs[i].first); + } + + auto datacenters = getDatacenters( req.configuration ); + + InDatacenterFitness bestFitness; + int numEquivalent = 1; + + for(auto dcId : datacenters ) { + auto used = id_used; + auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, used ); + auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, used ); + + auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy ); + auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver ); + + proxies.push_back(first_proxy.worker); + resolvers.push_back(first_resolver.worker); + + auto fitness = InDatacenterFitness(proxies, resolvers); + if(fitness < bestFitness) { + bestFitness = fitness; + numEquivalent = 1; + result.resolvers = vector(); + result.proxies = vector(); + for(int i = 0; i < resolvers.size(); i++) + result.resolvers.push_back(resolvers[i].first); + for(int i = 0; i < proxies.size(); i++) + result.proxies.push_back(proxies[i].first); + } else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) { + result.resolvers = vector(); + result.proxies = vector(); + for(int i = 0; i < resolvers.size(); i++) + result.resolvers.push_back(resolvers[i].first); + for(int i = 0; i < proxies.size(); i++) + result.proxies.push_back(proxies[i].first); + } + } + + ASSERT(bestFitness != InDatacenterFitness()); + + TraceEvent("findWorkersForConfig").detail("replication", req.configuration.tLogReplicationFactor) + .detail("desiredLogs", req.configuration.getDesiredLogs()).detail("actualLogs", result.tLogs.size()) + .detail("desiredProxies", req.configuration.getDesiredProxies()).detail("actualProxies", result.proxies.size()) + .detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size()); + + if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY && + ( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) || + bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) { + throw operation_failed(); + } + + return result; + } + } + bool betterMasterExists() { ServerDBInfo dbi = db.serverInfo->get(); std::map< Optional>, int> id_used; @@ -653,7 +737,7 @@ std::vector> getWorkersForTlogsAcrossDa tlogProcessClasses.push_back(tlogWorker->second.processClass); } AcrossDatacenterFitness oldAcrossFit(dbi.logSystemConfig.tLogs[0].tLogs, tlogProcessClasses); - AcrossDatacenterFitness newAcrossFit(getWorkersForTlogsAcrossDatacenters(db.config, id_used, true)); + AcrossDatacenterFitness newAcrossFit(getWorkersForTlogs(db.config, id_used, true)); if(oldAcrossFit < newAcrossFit) return false; @@ -713,8 +797,10 @@ std::vector> getWorkersForTlogsAcrossDa Standalone lastProcessClasses; bool gotProcessClasses; Optional> masterProcessId; + Optional> masterDcId; UID id; std::vector outstandingRecruitmentRequests; + std::vector outstandingRemoteRecruitmentRequests; std::vector> outstandingStorageRequests; ActorCollection ac; UpdateWorkerList updateWorkerList; @@ -771,6 +857,7 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster rmq.lifetime = db->serverInfo->get().masterLifetime; cluster->masterProcessId = masterWorker.first.locality.processId(); + cluster->masterDcId = masterWorker.first.locality.dcId(); ErrorOr newMaster = wait( masterWorker.first.master.tryGetReply( rmq ) ); if (newMaster.present()) { TraceEvent("CCWDB", cluster->id).detail("Recruited", newMaster.get().id()); @@ -915,6 +1002,24 @@ void checkOutstandingRecruitmentRequests( ClusterControllerData* self ) { } } +void checkOutstandingRemoteRecruitmentRequests( ClusterControllerData* self ) { + for( int i = 0; i < self->outstandingRemoteRecruitmentRequests.size(); i++ ) { + RecruitRemoteFromConfigurationRequest& req = self->outstandingRemoteRecruitmentRequests[i]; + try { + req.reply.send( self->findRemoteWorkersForConfiguration( req ) ); + std::swap( self->outstandingRemoteRecruitmentRequests[i--], self->outstandingRemoteRecruitmentRequests.back() ); + self->outstandingRemoteRecruitmentRequests.pop_back(); + } catch (Error& e) { + if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) { + TraceEvent(SevWarn, "RecruitRemoteTLogMatchingSetNotAvailable", self->id).error(e); + } else { + TraceEvent(SevError, "RecruitRemoteTLogsRequestError", self->id).error(e); + throw; + } + } + } +} + void checkOutstandingStorageRequests( ClusterControllerData* self ) { for( int i = 0; i < self->outstandingStorageRequests.size(); i++ ) { auto& req = self->outstandingStorageRequests[i]; @@ -948,12 +1053,15 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) { ACTOR Future doCheckOutstandingMasterRequests( ClusterControllerData* self ) { Void _ = wait( delay(SERVER_KNOBS->CHECK_BETTER_MASTER_INTERVAL) ); + //FIXME: re-enable betterMasterExists + /* if (self->betterMasterExists()) { if (!self->db.forceMasterFailure.isSet()) { self->db.forceMasterFailure.send( Void() ); TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id()); } } + */ return Void(); } @@ -966,6 +1074,7 @@ void checkOutstandingMasterRequests( ClusterControllerData* self ) { void checkOutstandingRequests( ClusterControllerData* self ) { checkOutstandingRecruitmentRequests( self ); + checkOutstandingRemoteRecruitmentRequests( self ); checkOutstandingStorageRequests( self ); checkOutstandingMasterRequests( self ); } @@ -1196,6 +1305,30 @@ ACTOR Future clusterRecruitFromConfiguration( ClusterControllerData* self, } } +ACTOR Future clusterRecruitRemoteFromConfiguration( ClusterControllerData* self, RecruitRemoteFromConfigurationRequest req ) { + // At the moment this doesn't really need to be an actor (it always completes immediately) + TEST(true); //ClusterController RecruitTLogsRequest + loop { + try { + req.reply.send( self->findRemoteWorkersForConfiguration( req ) ); + return Void(); + } catch (Error& e) { + if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) { + self->outstandingRemoteRecruitmentRequests.push_back( req ); + TraceEvent(SevWarn, "RecruitRemoteFromConfigurationNotAvailable", self->id).error(e); + return Void(); + } else if(e.code() == error_code_operation_failed || e.code() == error_code_no_more_servers) { + //recruitment not good enough, try again + } + else { + TraceEvent(SevError, "RecruitRemoteFromConfigurationError", self->id).error(e); + throw; // goodbye, cluster controller + } + } + Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) ); + } +} + void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest const& req ) { req.reply.send( Void() ); @@ -1518,6 +1651,9 @@ ACTOR Future clusterControllerCore( ClusterControllerFullInterface interf, when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) { addActor.send( clusterRecruitFromConfiguration( &self, req ) ); } + when( RecruitRemoteFromConfigurationRequest req = waitNext( interf.recruitRemoteFromConfiguration.getFuture() ) ) { + addActor.send( clusterRecruitRemoteFromConfiguration( &self, req ) ); + } when( RecruitStorageRequest req = waitNext( interf.recruitStorage.getFuture() ) ) { clusterRecruitStorage( &self, req ); } diff --git a/fdbserver/ClusterRecruitmentInterface.h b/fdbserver/ClusterRecruitmentInterface.h index 322fd81df8..0b88c7efeb 100644 --- a/fdbserver/ClusterRecruitmentInterface.h +++ b/fdbserver/ClusterRecruitmentInterface.h @@ -35,6 +35,7 @@ struct ClusterControllerFullInterface { ClusterInterface clientInterface; RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration; + RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration; RequestStream< struct RecruitStorageRequest > recruitStorage; RequestStream< struct RegisterWorkerRequest > registerWorker; RequestStream< struct GetWorkersRequest > getWorkers; @@ -48,6 +49,7 @@ struct ClusterControllerFullInterface { void initEndpoints() { clientInterface.initEndpoints(); recruitFromConfiguration.getEndpoint( TaskClusterController ); + recruitRemoteFromConfiguration.getEndpoint( TaskClusterController ); recruitStorage.getEndpoint( TaskClusterController ); registerWorker.getEndpoint( TaskClusterController ); getWorkers.getEndpoint( TaskClusterController ); @@ -58,7 +60,7 @@ struct ClusterControllerFullInterface { template void serialize( Ar& ar ) { ASSERT( ar.protocolVersion() >= 0x0FDB00A200040001LL ); - ar & clientInterface & recruitFromConfiguration & recruitStorage & registerWorker & getWorkers & registerMaster & getServerDBInfo; + ar & clientInterface & recruitFromConfiguration & recruitRemoteFromConfiguration & recruitStorage & registerWorker & getWorkers & registerMaster & getServerDBInfo; } }; @@ -78,14 +80,39 @@ struct RecruitFromConfigurationRequest { struct RecruitFromConfigurationReply { vector tLogs; - vector remoteTLogs; - vector logRouters; + vector satelliteTLogs; vector proxies; vector resolvers; + Optional remoteDcId; template void serialize( Ar& ar ) { - ar & tLogs & remoteTLogs & logRouters & proxies & resolvers; + ar & tLogs & satelliteTLogs & proxies & resolvers & remoteDcId; + } +}; + +struct RecruitRemoteFromConfigurationRequest { + DatabaseConfiguration configuration; + Optional dcId; + ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply; + + RecruitRemoteFromConfigurationRequest() {} + explicit RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional const& dcId) + : configuration(configuration), dcId(dcId) {} + + template + void serialize( Ar& ar ) { + ar & configuration & dcId & reply; + } +}; + +struct RecruitRemoteFromConfigurationReply { + vector remoteTLogs; + vector logRouters; + + template + void serialize( Ar& ar ) { + ar & remoteTLogs & logRouters; } }; @@ -102,13 +129,13 @@ struct RecruitStorageReply { struct RecruitStorageRequest { std::vector>> excludeMachines; //< Don't recruit any of these machines std::vector excludeAddresses; //< Don't recruit any of these addresses - std::vector>> excludeDCs; //< Don't recruit from any of these data centers + std::vector>> includeDCs; bool criticalRecruitment; //< True if machine classes are to be ignored ReplyPromise< RecruitStorageReply > reply; template void serialize( Ar& ar ) { - ar & excludeMachines & excludeAddresses & excludeDCs & criticalRecruitment & reply; + ar & excludeMachines & excludeAddresses & includeDCs & criticalRecruitment & reply; } }; diff --git a/fdbserver/CoordinatedState.actor.cpp b/fdbserver/CoordinatedState.actor.cpp index 59f26cf87d..a1db7cf017 100644 --- a/fdbserver/CoordinatedState.actor.cpp +++ b/fdbserver/CoordinatedState.actor.cpp @@ -300,8 +300,19 @@ struct MovableCoordinatedStateImpl { } }; +void MovableCoordinatedState::operator=(MovableCoordinatedState&& av) { + if(impl) { + delete impl; + } + impl = av.impl; + av.impl = 0; +} MovableCoordinatedState::MovableCoordinatedState( class ServerCoordinators const& coord ) : impl( new MovableCoordinatedStateImpl(coord) ) {} -MovableCoordinatedState::~MovableCoordinatedState() { delete impl; } +MovableCoordinatedState::~MovableCoordinatedState() { + if(impl) { + delete impl; + } +} Future MovableCoordinatedState::read() { return MovableCoordinatedStateImpl::read(impl); } Future MovableCoordinatedState::onConflict() { return impl->onConflict(); } Future MovableCoordinatedState::setExclusive(Value v) { return impl->setExclusive(v); } diff --git a/fdbserver/CoordinatedState.h b/fdbserver/CoordinatedState.h index 81c7c234bd..17517d62a9 100644 --- a/fdbserver/CoordinatedState.h +++ b/fdbserver/CoordinatedState.h @@ -60,6 +60,7 @@ private: class MovableCoordinatedState : NonCopyable { public: MovableCoordinatedState( class ServerCoordinators const& ); + void operator=(MovableCoordinatedState&& av); ~MovableCoordinatedState(); Future read(); diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index 12e236f5ea..4d9aba3e26 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -42,17 +42,18 @@ struct CoreTLogSet { IRepPolicyRef tLogPolicy; bool isLocal; bool hasBest; + int8_t locality; - CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {} + CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {} bool operator == (CoreTLogSet const& rhs) const { return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && - hasBest == rhs.hasBest && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); + hasBest == rhs.hasBest && locality == rhs.locality && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info()))); } template void serialize(Archive& ar) { - ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest; + ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality; } }; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 5d76856e0f..e75ccbb6a0 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -437,9 +437,7 @@ struct DDTeamCollection { PromiseStream> addActor; Database cx; UID masterId; - int teamSize; - IRepPolicyRef replicationPolicy; - KeyValueStoreType storeType; + DatabaseConfiguration configuration; bool doBuildTeams; Future teamBuilder; @@ -479,12 +477,10 @@ struct DDTeamCollection { MoveKeysLock const& lock, PromiseStream const& output, Reference const& shardsAffectedByTeamFailure, - int teamSize, - IRepPolicyRef replicationPolicy, - KeyValueStoreType storeType, + DatabaseConfiguration configuration, PromiseStream< std::pair> > const& serverChanges ) :cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ), - teamSize( teamSize ), replicationPolicy(replicationPolicy), storeType( storeType ), serverChanges(serverChanges), + configuration(configuration), serverChanges(serverChanges), initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ), initializationDoneActor(logOnCompletion(initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ), unhealthyServers(0) @@ -682,10 +678,10 @@ struct DDTeamCollection { void addSubsetOfEmergencyTeams() { for( int i = 0; i < teams.size(); i++ ) { - if( teams[i]->servers.size() > teamSize ) { + if( teams[i]->servers.size() > configuration.storageTeamSize ) { auto& serverIds = teams[i]->getServerIDs(); bool foundTeam = false; - for( int j = 0; j < std::max( 1, (int)(serverIds.size() - teamSize + 1) ) && !foundTeam; j++ ) { + for( int j = 0; j < std::max( 1, (int)(serverIds.size() - configuration.storageTeamSize + 1) ) && !foundTeam; j++ ) { auto& serverTeams = server_info[serverIds[j]]->teams; for( int k = 0; k < serverTeams.size(); k++ ) { auto &testTeam = serverTeams[k]->getServerIDs(); @@ -703,7 +699,7 @@ struct DDTeamCollection { } } if( !foundTeam ) { - addTeam(serverIds.begin(), serverIds.begin() + teamSize ); + addTeam(serverIds.begin(), serverIds.begin() + configuration.storageTeamSize ); } } } @@ -724,7 +720,7 @@ struct DDTeamCollection { void evaluateTeamQuality() { int teamCount = teams.size(), serverCount = allServers.size(); - double teamsPerServer = (double)teamCount * teamSize / serverCount; + double teamsPerServer = (double)teamCount * configuration.storageTeamSize / serverCount; ASSERT( serverCount == server_info.size() ); @@ -801,8 +797,8 @@ struct DDTeamCollection { Void _ = wait( yield( TaskDataDistributionLaunch ) ); // Add team, if valid - if(history->size() == self->teamSize) { - auto valid = self->replicationPolicy->validate(*history, processes); + if(history->size() == self->configuration.storageTeamSize) { + auto valid = self->configuration.storagePolicy->validate(*history, processes); if(!valid) { return Void(); } @@ -858,8 +854,8 @@ struct DDTeamCollection { } } - if(totalServers.size() < teamSize ) { - TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Not enough servers for a team").detail("Servers",totalServers.size()).detail("teamSize", teamSize); + if(totalServers.size() < configuration.storageTeamSize ) { + TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Not enough servers for a team").detail("Servers",totalServers.size()).detail("teamSize", configuration.storageTeamSize); return addedTeams; } @@ -894,7 +890,7 @@ struct DDTeamCollection { int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; for( int i = 0; i < maxAttempts && i < 100; i++) { team.clear(); - auto success = totalServers.selectReplicas(replicationPolicy, forcedAttributes, team); + auto success = totalServers.selectReplicas(configuration.storagePolicy, forcedAttributes, team); if(!success) { break; } @@ -902,7 +898,7 @@ struct DDTeamCollection { if(forcedAttributes.size() > 0) { team.push_back((UID*)totalServers.getObject(forcedAttributes[0])); } - if( team.size() != teamSize) { + if( team.size() != configuration.storageTeamSize) { maxAttempts += 1; } @@ -917,7 +913,7 @@ struct DDTeamCollection { } } - if( bestTeam.size() == teamSize) { + if( bestTeam.size() == configuration.storageTeamSize) { vector processIDs; for (auto process = bestTeam.begin(); process < bestTeam.end(); process++) { @@ -935,7 +931,7 @@ struct DDTeamCollection { TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Unable to make desiredTeams"); break; } - if(++loopCount > 2*teamsToBuild*(teamSize+1) ) { + if(++loopCount > 2*teamsToBuild*(configuration.storageTeamSize+1) ) { break; } } @@ -967,7 +963,7 @@ struct DDTeamCollection { uniqueMachines = machines.size(); // If there are too few machines to even build teams or there are too few represented datacenters, build no new teams - if( uniqueMachines >= self->teamSize ) { + if( uniqueMachines >= self->configuration.storageTeamSize ) { desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER*serverCount; // Count only properly sized teams against the desired number of teams. This is to prevent "emergency" merged teams (see MoveKeys) @@ -976,13 +972,13 @@ struct DDTeamCollection { // Also exclude teams who have members in the wrong configuration, since we don't want these teams either int teamCount = 0; for(int i = 0; i < self->teams.size(); i++) { - if( self->teams[i]->getServerIDs().size() == self->teamSize && !self->teams[i]->isWrongConfiguration() ) { + if( self->teams[i]->getServerIDs().size() == self->configuration.storageTeamSize && !self->teams[i]->isWrongConfiguration() ) { teamCount++; } } TraceEvent("BuildTeamsBegin", self->masterId).detail("DesiredTeams", desiredTeams).detail("UniqueMachines", uniqueMachines) - .detail("TeamSize", self->teamSize).detail("Servers", serverCount) + .detail("TeamSize", self->configuration.storageTeamSize).detail("Servers", serverCount) .detail("CurrentTrackedTeams", self->teams.size()).detail("TeamCount", teamCount); if( desiredTeams > teamCount ) { @@ -996,18 +992,23 @@ struct DDTeamCollection { state int teamsToBuild = desiredTeams - teamCount; state vector> builtTeams; - int addedTeams = wait( self->addAllTeams( self, desiredServerVector, &builtTeams, teamsToBuild ) ); - - if( addedTeams < teamsToBuild ) { - for( int i = 0; i < builtTeams.size(); i++ ) { - std::sort(builtTeams[i].begin(), builtTeams[i].end()); - self->addTeam( builtTeams[i].begin(), builtTeams[i].end() ); - } - TraceEvent("AddAllTeams", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", builtTeams.size()); - } - else { + if( self->configuration.storageTeamSize > 3) { int addedTeams = self->addTeamsBestOf( teamsToBuild ); TraceEvent("AddTeamsBestOf", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", addedTeams); + } else { + int addedTeams = wait( self->addAllTeams( self, desiredServerVector, &builtTeams, teamsToBuild ) ); + + if( addedTeams < teamsToBuild ) { + for( int i = 0; i < builtTeams.size(); i++ ) { + std::sort(builtTeams[i].begin(), builtTeams[i].end()); + self->addTeam( builtTeams[i].begin(), builtTeams[i].end() ); + } + TraceEvent("AddAllTeams", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", builtTeams.size()); + } + else { + int addedTeams = self->addTeamsBestOf( teamsToBuild ); + TraceEvent("AddTeamsBestOf", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", addedTeams); + } } } } @@ -1116,7 +1117,7 @@ struct DDTeamCollection { ACTOR Future teamTracker( DDTeamCollection *self, Reference team) { state int lastServersLeft = team->getServerIDs().size(); state bool lastAnyUndesired = false; - state bool wrongSize = team->getServerIDs().size() != self->teamSize; + state bool wrongSize = team->getServerIDs().size() != self->configuration.storageTeamSize; state bool lastReady = self->initialFailureReactionDelay.isReady(); state bool lastHealthy = team->isHealthy(); state bool lastOptimal = team->isOptimal(); @@ -1156,12 +1157,12 @@ ACTOR Future teamTracker( DDTeamCollection *self, Referencesize(); - bool matchesPolicy = self->replicationPolicy->validate(teamLocality->getEntries(), teamLocality); + bool matchesPolicy = self->configuration.storagePolicy->validate(teamLocality->getEntries(), teamLocality); if( !self->initialFailureReactionDelay.isReady() ) change.push_back( self->initialFailureReactionDelay ); - bool recheck = lastReady != self->initialFailureReactionDelay.isReady() && ( !matchesPolicy || anyUndesired || team->getServerIDs().size() != self->teamSize ); + bool recheck = lastReady != self->initialFailureReactionDelay.isReady() && ( !matchesPolicy || anyUndesired || team->getServerIDs().size() != self->configuration.storageTeamSize ); lastReady = self->initialFailureReactionDelay.isReady(); if( serversLeft != lastServersLeft || anyUndesired != lastAnyUndesired || anyWrongConfiguration != lastWrongConfiguration || wrongSize || recheck ) { @@ -1170,7 +1171,7 @@ ACTOR Future teamTracker( DDTeamCollection *self, ReferencehealthyTeamCount).detail("IsWrongConfiguration", anyWrongConfiguration); - bool healthy = matchesPolicy && !anyUndesired && team->getServerIDs().size() == self->teamSize && team->getServerIDs().size() == serversLeft; + bool healthy = matchesPolicy && !anyUndesired && team->getServerIDs().size() == self->configuration.storageTeamSize && team->getServerIDs().size() == serversLeft; team->setHealthy( healthy ); // Unhealthy teams won't be chosen by bestTeam team->setWrongConfiguration( anyWrongConfiguration ); @@ -1216,7 +1217,7 @@ ACTOR Future teamTracker( DDTeamCollection *self, ReferencegetPriority(); - if( serversLeft < self->teamSize ) { + if( serversLeft < self->configuration.storageTeamSize ) { if( serversLeft == 0 ) team->setPriority( PRIORITY_TEAM_0_LEFT ); else if( serversLeft == 1 ) @@ -1226,7 +1227,7 @@ ACTOR Future teamTracker( DDTeamCollection *self, ReferencesetPriority( PRIORITY_TEAM_UNHEALTHY ); } - else if ( team->getServerIDs().size() != self->teamSize ) + else if ( team->getServerIDs().size() != self->configuration.storageTeamSize ) team->setPriority( PRIORITY_TEAM_UNHEALTHY ); else if( anyUndesired ) team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER ); @@ -1444,8 +1445,8 @@ ACTOR Future serverMetricsPolling( TCServerInfo *server) { //Returns the KeyValueStoreType of server if it is different from self->storeType ACTOR Future keyValueStoreTypeTracker(DDTeamCollection *self, TCServerInfo *server) { state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID(TaskDataDistribution))); - if(type == self->storeType) - Void _ = wait(Never()); + if(type == self->configuration.storageServerStoreType && ( server->lastKnownInterface.locality.dcId() == self->configuration.primaryDcId || server->lastKnownInterface.locality.dcId() == self->configuration.remoteDcId ) ) + Void _ = wait(Future(Never())); return type; } @@ -1469,7 +1470,7 @@ ACTOR Future storageServerTracker( state Future> interfaceChanged = server->onInterfaceChanged; state Future storeTracker = keyValueStoreTypeTracker( self, server ); - state bool hasWrongStoreType = false; + state bool hasWrongStoreTypeOrDC = false; changes.send( std::make_pair(server->id, server->lastKnownInterface) ); @@ -1525,7 +1526,7 @@ ACTOR Future storageServerTracker( } //If this storage server has the wrong key-value store type, then mark it undesired so it will be replaced with a server having the correct type - if(hasWrongStoreType) { + if(hasWrongStoreTypeOrDC) { TraceEvent(SevWarn, "UndesiredStorageServer", masterId).detail("Server", server->id).detail("StoreType", "?"); status.isUndesired = true; status.isWrongConfiguration = true; @@ -1547,7 +1548,7 @@ ACTOR Future storageServerTracker( failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId ); //We need to recruit new storage servers if the key value store type has changed - if(hasWrongStoreType) + if(hasWrongStoreTypeOrDC) self->restartRecruiting.trigger(); if( lastIsUndesired && !status.isUndesired ) @@ -1583,7 +1584,7 @@ ACTOR Future storageServerTracker( //Restart the storeTracker for the new interface storeTracker = keyValueStoreTypeTracker(self, server); - hasWrongStoreType = false; + hasWrongStoreTypeOrDC = false; self->restartTeamBuilder.trigger(); if(restartRecruiting) self->restartRecruiting.trigger(); @@ -1595,11 +1596,11 @@ ACTOR Future storageServerTracker( TraceEvent("KeyValueStoreTypeChanged", masterId) .detail("ServerID", server->id) .detail("StoreType", type.toString()) - .detail("DesiredType", self->storeType.toString()); + .detail("DesiredType", self->configuration.storageServerStoreType.toString()); TEST(true); //KeyValueStore type changed storeTracker = Never(); - hasWrongStoreType = true; + hasWrongStoreTypeOrDC = true; } when( Void _ = wait( server->wakeUpTracker.getFuture() ) ) { server->wakeUpTracker = Promise(); @@ -1649,7 +1650,7 @@ ACTOR Future initializeStorage( DDTeamCollection *self, RecruitStorageRepl state UID interfaceId = g_random->randomUniqueID(); InitializeStorageRequest isr; - isr.storeType = self->storeType; + isr.storeType = self->configuration.storageServerStoreType; isr.seedTag = invalidTag; isr.reqId = g_random->randomUniqueID(); isr.interfaceId = interfaceId; @@ -1721,6 +1722,9 @@ ACTOR Future storageRecruiter( DDTeamCollection *self, Referenceconfiguration.primaryDcId); + rsr.includeDCs.push_back(self->configuration.remoteDcId); + TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting").detail("State", "Sending request to CC") .detail("Exclusions", rsr.excludeAddresses.size()).detail("Critical", rsr.criticalRecruitment); @@ -1760,13 +1764,11 @@ ACTOR Future dataDistributionTeamCollection( Reference shardsAffectedByTeamFailure, MoveKeysLock lock, PromiseStream output, - UID masterId, int teamSize, - IRepPolicyRef replicationPolicy, - KeyValueStoreType storeType, + UID masterId, DatabaseConfiguration configuration, PromiseStream< std::pair> > serverChanges, Future readyToStart ) { - state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, replicationPolicy, storeType, serverChanges ); + state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, serverChanges ); state Future loggingTrigger = Void(); state PromiseStream serverRemoved; @@ -2095,7 +2097,7 @@ ACTOR Future dataDistribution( TraceEvent("DDInitGotInitialDD", mi.id()).detail("b","").detail("e", "").detail("src", "[no items]").detail("dest", "[no items]").trackLatest("InitialDD"); } - if (initData->mode) break; + if (initData->mode && false) break; //FIXME: re-enable data distribution TraceEvent("DataDistributionDisabled", mi.id()); TraceEvent("MovingData", mi.id()) @@ -2129,7 +2131,7 @@ ACTOR Future dataDistribution( actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tci, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, configuration.storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); - actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration.storageTeamSize, configuration.storagePolicy, configuration.storageServerStoreType, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) ); + actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) ); Void _ = wait( waitForAll( actors ) ); return Void(); @@ -2154,15 +2156,17 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro false ); + DatabaseConfiguration conf; + conf.storageTeamSize = teamSize; + conf.storagePolicy = policy; + DDTeamCollection* collection = new DDTeamCollection( database, UID(0, 0), MoveKeysLock(), PromiseStream(), Reference(new ShardsAffectedByTeamFailure()), - teamSize, - policy, - KeyValueStoreType(), + conf, PromiseStream>>() ); diff --git a/fdbserver/DatabaseConfiguration.cpp b/fdbserver/DatabaseConfiguration.cpp index 5a1508da7d..0c43cf3fc4 100644 --- a/fdbserver/DatabaseConfiguration.cpp +++ b/fdbserver/DatabaseConfiguration.cpp @@ -34,11 +34,23 @@ void DatabaseConfiguration::resetInternal() { autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES; autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS; autoDesiredTLogCount = CLIENT_KNOBS->DEFAULT_AUTO_LOGS; - storagePolicy = IRepPolicyRef(); - tLogPolicy = IRepPolicyRef(); - remoteTLogCount = 0; - remoteTLogReplicationFactor = 0; - remoteTLogPolicy = IRepPolicyRef(); + primaryDcId = remoteDcId = Optional>(); + tLogPolicy = storagePolicy = remoteTLogPolicy = remoteStoragePolicy = satelliteTLogPolicy = IRepPolicyRef(); + + remoteDesiredTLogCount = remoteTLogReplicationFactor = remoteDurableStorageQuorum = remoteStorageTeamSize = satelliteDesiredTLogCount = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = logRouterCount = 0; + primarySatelliteDcIds.clear(); + remoteSatelliteDcIds.clear(); +} + +void parse( std::vector>>* dcs, ValueRef const& v ) { + int lastBegin = 0; + for(int i = 0; i < v.size(); i++) { + if(v[i] == ',') { + dcs->push_back(v.substr(lastBegin,i)); + lastBegin = i + 1; + } + } + dcs->push_back(v.substr(lastBegin)); } void parse( int* i, ValueRef const& v ) { @@ -53,8 +65,10 @@ void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) { void DatabaseConfiguration::setDefaultReplicationPolicy() { storagePolicy = IRepPolicyRef(new PolicyAcross(storageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne()))); + remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(remoteStorageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne()))); tLogPolicy = IRepPolicyRef(new PolicyAcross(tLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne()))); remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(remoteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne()))); + satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(satelliteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne()))); } bool DatabaseConfiguration::isValid() const { @@ -74,9 +88,19 @@ bool DatabaseConfiguration::isValid() const { autoDesiredTLogCount >= 1 && storagePolicy && tLogPolicy && - remoteTLogCount >= 0 && - remoteTLogReplicationFactor >=0 && - remoteTLogPolicy; + remoteDesiredTLogCount >= 0 && + remoteTLogReplicationFactor >= 0 && + remoteTLogPolicy && + ( remoteTLogReplicationFactor == 0 || ( primaryDcId.present() && remoteDcId.present() && remoteDurableStorageQuorum >= 1 && logRouterCount >= 1 ) ) && + remoteStoragePolicy && + remoteDurableStorageQuorum <= remoteStorageTeamSize && + satelliteDesiredTLogCount >= 0 && + satelliteTLogReplicationFactor >= 0 && + satelliteTLogWriteAntiQuorum >= 0 && + satelliteTLogUsableDcs >= 0 && + ( satelliteTLogReplicationFactor == 0 || ( primarySatelliteDcIds.size() && remoteSatelliteDcIds.size() && remoteTLogReplicationFactor > 0 ) ) && + satelliteTLogPolicy && + logRouterCount >= 0; } std::map DatabaseConfiguration::toMap() const { @@ -108,6 +132,56 @@ std::map DatabaseConfiguration::toMap() const { result["storage_engine"] = "memory"; else result["storage_engine"] = "custom"; + + if(primaryDcId.present()) { + result["primary_dc"] = printable(primaryDcId.get()); + } + if(remoteDcId.present()) { + result["remote_dc"] = printable(remoteDcId.get()); + } + if(primarySatelliteDcIds.size()) { + std::string primaryDcStr = ""; + bool first = true; + for(auto& it : primarySatelliteDcIds) { + if(it.present()) { + if(!first) { + primaryDcStr += ","; + first = false; + } + primaryDcStr += printable(it.get()); + } + } + result["primary_satellite_dcs"] = primaryDcStr; + } + if(remoteSatelliteDcIds.size()) { + std::string remoteDcStr = ""; + bool first = true; + for(auto& it : remoteSatelliteDcIds) { + if(it.present()) { + if(!first) { + remoteDcStr += ","; + first = false; + } + remoteDcStr += printable(it.get()); + } + } + result["remote_satellite_dcs"] = remoteDcStr; + } + + if(satelliteTLogReplicationFactor > 0) { + result["satellite_replication"] = format("%d", satelliteTLogReplicationFactor); + } + + if( remoteDurableStorageQuorum == remoteStorageTeamSize && remoteDurableStorageQuorum > 0) { + if( remoteTLogReplicationFactor == 1 && remoteDurableStorageQuorum == 1 ) + result["remote_redundancy_mode"] = "remote_single"; + else if( remoteTLogReplicationFactor == 2 && remoteDurableStorageQuorum == 2 ) + result["remote_redundancy_mode"] = "remote_double"; + else if( remoteTLogReplicationFactor == 3 && remoteDurableStorageQuorum == 3 ) + result["remote_redundancy_mode"] = "remote_triple"; + else + result["remote_redundancy_mode"] = "custom"; + } } return result; @@ -144,9 +218,22 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) { else if (ck == LiteralStringRef("auto_logs")) parse(&autoDesiredTLogCount, value); else if (ck == LiteralStringRef("storage_replication_policy")) parseReplicationPolicy(&storagePolicy, value); else if (ck == LiteralStringRef("log_replication_policy")) parseReplicationPolicy(&tLogPolicy, value); - else if (ck == LiteralStringRef("remote_logs")) parse(&remoteTLogCount, value); - else if (ck == LiteralStringRef("remote_log_replication")) parse(&remoteTLogReplicationFactor, value); - else if (ck == LiteralStringRef("remote_replication_policy")) parseReplicationPolicy(&remoteTLogPolicy, value); + else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value); + else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value); + else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value); + else if (ck == LiteralStringRef("remote_storage_policy")) parseReplicationPolicy(&remoteStoragePolicy, value); + else if (ck == LiteralStringRef("satellite_log_policy")) parseReplicationPolicy(&satelliteTLogPolicy, value); + else if (ck == LiteralStringRef("remote_storage_quorum")) parse(&remoteDurableStorageQuorum, value); + else if (ck == LiteralStringRef("remote_storage_replicas")) parse(&remoteStorageTeamSize, value); + else if (ck == LiteralStringRef("satellite_logs")) parse(&satelliteDesiredTLogCount, value); + else if (ck == LiteralStringRef("satellite_log_replicas")) parse(&satelliteTLogReplicationFactor, value); + else if (ck == LiteralStringRef("satellite_anti_quorum")) parse(&satelliteTLogWriteAntiQuorum, value); + else if (ck == LiteralStringRef("satellite_usable_dcs")) parse(&satelliteTLogUsableDcs, value); + else if (ck == LiteralStringRef("primary_dc")) primaryDcId = value; + else if (ck == LiteralStringRef("remote_dc")) remoteDcId = value; + else if (ck == LiteralStringRef("primary_satellite_dcs")) parse(&primarySatelliteDcIds, value); + else if (ck == LiteralStringRef("remote_satellite_dcs")) parse(&remoteSatelliteDcIds, value); + else if (ck == LiteralStringRef("log_routers")) parse(&logRouterCount, value); else return false; return true; // All of the above options currently require recovery to take effect } diff --git a/fdbserver/DatabaseConfiguration.h b/fdbserver/DatabaseConfiguration.h index 5d1c7d9343..40f4c2d66f 100644 --- a/fdbserver/DatabaseConfiguration.h +++ b/fdbserver/DatabaseConfiguration.h @@ -42,14 +42,21 @@ struct DatabaseConfiguration { std::string toString() const; std::map toMap() const; + int expectedLogSets() { + int result = 1; + if( satelliteTLogReplicationFactor > 0) { + result++; + } + if( remoteTLogReplicationFactor > 0) { + result++; + } + return result; + } // SOMEDAY: think about changing storageTeamSize to durableStorageQuorum int32_t minMachinesRequired() const { return std::max(tLogReplicationFactor*2, storageTeamSize); } int32_t maxMachineFailuresTolerated() const { return std::min(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, durableStorageQuorum - 1); } - // Redundancy Levels - IRepPolicyRef storagePolicy; - // MasterProxy Servers int32_t masterProxyCount; int32_t autoMasterProxyCount; @@ -59,21 +66,41 @@ struct DatabaseConfiguration { int32_t autoResolverCount; // TLogs + IRepPolicyRef tLogPolicy; int32_t desiredTLogCount; int32_t autoDesiredTLogCount; int32_t tLogWriteAntiQuorum; int32_t tLogReplicationFactor; KeyValueStoreType tLogDataStoreType; - IRepPolicyRef tLogPolicy; - int32_t remoteTLogCount; - int32_t remoteTLogReplicationFactor; - IRepPolicyRef remoteTLogPolicy; + Optional> primaryDcId; - // Storage servers + // Storage Servers + IRepPolicyRef storagePolicy; int32_t durableStorageQuorum; int32_t storageTeamSize; KeyValueStoreType storageServerStoreType; + // Remote TLogs + int32_t remoteDesiredTLogCount; + int32_t remoteTLogReplicationFactor; + int32_t logRouterCount; + IRepPolicyRef remoteTLogPolicy; + Optional> remoteDcId; + + // Remote Storage Servers + IRepPolicyRef remoteStoragePolicy; + int32_t remoteDurableStorageQuorum; + int32_t remoteStorageTeamSize; + + // Satellite TLogs + IRepPolicyRef satelliteTLogPolicy; + int32_t satelliteDesiredTLogCount; + int32_t satelliteTLogReplicationFactor; + int32_t satelliteTLogWriteAntiQuorum; + int32_t satelliteTLogUsableDcs; + std::vector>> primarySatelliteDcIds; + std::vector>> remoteSatelliteDcIds; + // Excluded servers (no state should be here) bool isExcludedServer( NetworkAddress ) const; std::set getExcludedServers() const; diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 09fc395511..64e163195e 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -384,8 +384,8 @@ ACTOR Future logRouterCore( ACTOR Future checkRemoved(Reference> db, uint64_t recoveryCount, TLogInterface myInterface, int logSet) { loop{ - if (db->get().recoveryCount >= recoveryCount && logSet < db->get().logSystemConfig.tLogs.size() && - !std::count(db->get().logSystemConfig.tLogs[logSet].logRouters.begin(), db->get().logSystemConfig.tLogs[logSet].logRouters.end(), myInterface.id())) { + if (db->get().recoveryCount >= recoveryCount && ( logSet >= db->get().logSystemConfig.expectedLogSets || ( logSet < db->get().logSystemConfig.tLogs.size() && + !std::count(db->get().logSystemConfig.tLogs[logSet].logRouters.begin(), db->get().logSystemConfig.tLogs[logSet].logRouters.end(), myInterface.id()) ) )) { throw worker_removed(); } Void _ = wait(db->onChange()); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 2ad5441205..a97e815ce3 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -51,8 +51,9 @@ public: std::map logEntryMap; bool isLocal; bool hasBest; + int8_t locality; - LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {} + LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {} int bestLocationFor( Tag tag ) { return hasBest ? tag.id % logServers.size() : -1; @@ -100,7 +101,9 @@ public: if(hasBest) { for(auto& t : tags) { - newLocations.push_back(bestLocationFor(t)); + if(t.locality == locality || t.locality == tagLocalitySpecial) { + newLocations.push_back(bestLocationFor(t)); + } } } @@ -498,7 +501,7 @@ struct ILogSystem { // Call only on an ILogSystem obtained from recoverAndEndEpoch() // Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable) - virtual Future> newEpoch( vector availableLogServers, vector availableRemoteLogServers, vector availableLogRouters, DatabaseConfiguration const& config, LogEpoch recoveryCount ) = 0; + virtual Future> newEpoch( struct RecruitFromConfigurationReply const& recr, Future const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) = 0; // Call only on an ILogSystem obtained from recoverAndEndEpoch() // Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 46f8ea6fb4..1dcafa65bb 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -61,17 +61,18 @@ struct TLogSet { int32_t tLogWriteAntiQuorum, tLogReplicationFactor; std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers IRepPolicyRef tLogPolicy; + int8_t locality; bool isLocal; bool hasBest; - TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {} + TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {} std::string toString() const { - return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBest, logRouters.size(), describe(tLogs).c_str()); + return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBest, logRouters.size(), describe(tLogs).c_str(), locality); } bool operator == ( const TLogSet& rhs ) const { - if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBest != rhs.hasBest || tLogs.size() != rhs.tLogs.size()) { + if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBest != rhs.hasBest || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality) { return false; } if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) { @@ -86,7 +87,7 @@ struct TLogSet { } bool isEqualIds(TLogSet const& r) const { - if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBest != r.hasBest || tLogs.size() != r.tLogs.size()) { + if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBest != r.hasBest || tLogs.size() != r.tLogs.size() || locality != r.locality) { return false; } if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) { @@ -102,7 +103,7 @@ struct TLogSet { template void serialize( Ar& ar ) { - ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest; + ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality; } }; @@ -142,8 +143,10 @@ struct LogSystemConfig { int logSystemType; std::vector tLogs; std::vector oldTLogs; + int expectedLogSets; + int minRouters; - LogSystemConfig() : logSystemType(0) {} + LogSystemConfig() : logSystemType(0), minRouters(0), expectedLogSets(0) {} std::string toString() const { return format("type: %d oldGenerations: %d %s", logSystemType, oldTLogs.size(), describe(tLogs).c_str()); @@ -164,11 +167,11 @@ struct LogSystemConfig { bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); } bool isEqual(LogSystemConfig const& r) const { - return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs; + return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && minRouters == r.minRouters && expectedLogSets == r.expectedLogSets; } bool isEqualIds(LogSystemConfig const& r) const { - if( logSystemType!=r.logSystemType || tLogs.size() != r.tLogs.size() || oldTLogs.size() != r.oldTLogs.size() ) { + if( logSystemType!=r.logSystemType || tLogs.size() != r.tLogs.size() || oldTLogs.size() != r.oldTLogs.size() || minRouters != r.minRouters || expectedLogSets != r.expectedLogSets ) { return false; } for(int i = 0; i < tLogs.size(); i++ ) { @@ -198,7 +201,7 @@ struct LogSystemConfig { template void serialize( Ar& ar ) { - ar & logSystemType & tLogs & oldTLogs; + ar & logSystemType & tLogs & oldTLogs & minRouters & expectedLogSets; } }; diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 935cd8dc7e..3e5ea0a28a 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -455,26 +455,33 @@ ArenaReader* ILogSystem::SetPeekCursor::reader() { return serverCursors[currentS void ILogSystem::SetPeekCursor::calcHasMessage() { - if(nextVersion.present()) serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() ); - if( serverCursors[bestSet][bestServer]->hasMessage() ) { - messageVersion = serverCursors[bestSet][bestServer]->version(); - currentSet = bestSet; - currentCursor = bestServer; - hasNextMessage = true; + if(bestSet >= 0 && bestServer >= 0) { + if(nextVersion.present()) { + //TraceEvent("LPC_calcNext").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage).detail("nextVersion", nextVersion.get().toString()); + serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() ); + } + if( serverCursors[bestSet][bestServer]->hasMessage() ) { + messageVersion = serverCursors[bestSet][bestServer]->version(); + currentSet = bestSet; + currentCursor = bestServer; + hasNextMessage = true; - for (auto& cursors : serverCursors) { - for(auto& c : cursors) { - c->advanceTo(messageVersion); + //TraceEvent("LPC_calc1").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage); + + for (auto& cursors : serverCursors) { + for(auto& c : cursors) { + c->advanceTo(messageVersion); + } } + + return; } - return; - } - - auto bestVersion = serverCursors[bestSet][bestServer]->version(); - for (auto& cursors : serverCursors) { - for (auto& c : cursors) { - c->advanceTo(bestVersion); + auto bestVersion = serverCursors[bestSet][bestServer]->version(); + for (auto& cursors : serverCursors) { + for (auto& c : cursors) { + c->advanceTo(bestVersion); + } } } @@ -482,8 +489,10 @@ void ILogSystem::SetPeekCursor::calcHasMessage() { if(useBestSet) { updateMessage(bestSet, false); // Use Quorum logic + //TraceEvent("LPC_calc2").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage); if(!hasNextMessage) { updateMessage(bestSet, true); + //TraceEvent("LPC_calc3").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage); } } else { for(int i = 0; i < logSets.size() && !hasNextMessage; i++) { @@ -491,12 +500,13 @@ void ILogSystem::SetPeekCursor::calcHasMessage() { updateMessage(i, false); // Use Quorum logic } } - + //TraceEvent("LPC_calc4").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage); for(int i = 0; i < logSets.size() && !hasNextMessage; i++) { if(i != bestSet) { updateMessage(i, true); } } + //TraceEvent("LPC_calc5").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage); } } @@ -508,6 +518,7 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) { auto& serverCursor = serverCursors[logIdx][i]; if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get()); sortedVersions.push_back(std::pair(serverCursor->version(), i)); + //TraceEvent("LPC_update1").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage).detail("serverVer", serverCursor->version().toString()).detail("i", i); } if(usePolicy) { @@ -529,13 +540,14 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) { messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first; } - for(int i = 0; i < serverCursors[logIdx].size(); i++) { - auto& c = serverCursors[logIdx][i]; - auto start = c->version(); - c->advanceTo(messageVersion); - if( start < messageVersion && messageVersion < c->version() ) { - advancedPast = true; - TEST(true); //Merge peek cursor advanced past desired sequence + for (auto& cursors : serverCursors) { + for (auto& c : cursors) { + auto start = c->version(); + c->advanceTo(messageVersion); + if( start < messageVersion && messageVersion < c->version() ) { + advancedPast = true; + TEST(true); //Merge peek cursor advanced past desired sequence + } } } @@ -587,6 +599,7 @@ ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer //TraceEvent("LPC_getMore1", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag); if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) { ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage()); + //TraceEvent("LPC_getMore2", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag); Void _ = wait( self->serverCursors[self->bestSet][self->bestServer]->getMore() || self->serverCursors[self->bestSet][self->bestServer]->onFailed() ); self->useBestSet = true; } else { @@ -602,6 +615,7 @@ ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet]->tLogPolicy); } if(bestSetValid) { + //TraceEvent("LPC_getMore3", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag); vector> q; for (auto& c : self->serverCursors[self->bestSet]) { if (!c->hasMessage()) { @@ -613,6 +627,7 @@ ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer } else { //FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them vector> q; + //TraceEvent("LPC_getMore4", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag); for(auto& cursors : self->serverCursors) { for (auto& c :cursors) { if (!c->hasMessage()) { diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 67e4d24528..6c417b27ca 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -1141,7 +1141,7 @@ static StatusObject configurationFetcher(Optional conf, S else if (configuration.autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS) statusObj["auto_logs"] = configuration.autoDesiredTLogCount; - statusObj["remote_logs"] = configuration.remoteTLogCount; + statusObj["remote_logs"] = configuration.remoteDesiredTLogCount; if(configuration.storagePolicy) { statusObj["storage_policy"] = configuration.storagePolicy->info(); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 87129aef09..24805f9d80 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -53,6 +53,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> tLogs; + int expectedLogSets; int minRouters; // new members @@ -61,6 +62,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted remoteRecovery; Future remoteRecoveryComplete; bool recoveryCompleteWrittenToCoreState; + bool remoteLogsWrittenToCoreState; Optional epochEndVersion; std::set< Tag > epochEndTags; @@ -70,7 +72,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted oldLogData; - TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits::max()) {} + TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits::max()), expectedLogSets(0) {} virtual void stopRejoins() { rejoins = Future(); @@ -109,6 +111,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSystem( new TagPartitionedLogSystem(dbgid, locality) ); logSystem->tLogs.resize(lsConf.tLogs.size()); + logSystem->expectedLogSets = lsConf.expectedLogSets; + logSystem->minRouters = lsConf.minRouters; for( int i = 0; i < lsConf.tLogs.size(); i++ ) { Reference logSet = Reference( new LogSet() ); logSystem->tLogs[i] = logSet; @@ -125,9 +129,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = tLogSet.tLogLocalities; logSet->isLocal = tLogSet.isLocal; logSet->hasBest = tLogSet.hasBest; + logSet->locality = tLogSet.locality; logSet->updateLocalitySet(); - - if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet->logRouters.size()); } logSystem->oldLogData.resize(lsConf.oldTLogs.size()); @@ -149,6 +152,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = tLogData.tLogLocalities; logSet->isLocal = tLogData.isLocal; logSet->hasBest = tLogData.hasBest; + logSet->locality = tLogData.locality; //logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs } logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd; @@ -181,8 +185,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = tLogSet.tLogLocalities; logSet->isLocal = tLogSet.isLocal; logSet->hasBest = tLogSet.hasBest; + logSet->locality = tLogSet.locality; //logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs - if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet->logRouters.size()); } //logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd; @@ -205,6 +209,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = tLogSet.tLogLocalities; logSet->isLocal = tLogSet.isLocal; logSet->hasBest = tLogSet.hasBest; + logSet->locality = tLogSet.locality; //logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs } logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd; @@ -231,6 +236,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogPolicy; coreSet.isLocal = t->isLocal; coreSet.hasBest = t->hasBest; + coreSet.locality = t->locality; newState.tLogs.push_back(coreSet); } @@ -249,6 +255,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogPolicy; coreSet.isLocal = t->isLocal; coreSet.hasBest = t->hasBest; + coreSet.locality = t->locality; newState.oldTLogData[i].tLogs.push_back(coreSet); } newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd; @@ -259,15 +266,29 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted onCoreStateChanged() { - ASSERT(recoveryComplete.isValid()); - if( recoveryComplete.isReady() ) + ASSERT(recoveryComplete.isValid() && remoteRecovery.isValid() ); + if( recoveryComplete.isReady() && remoteRecovery.isReady() ) { return Never(); - return recoveryComplete; + } + if( remoteRecovery.isReady() ) { + return recoveryComplete; + } + if( recoveryComplete.isReady() ) { + return remoteRecovery; + } + return recoveryComplete || remoteRecovery; } virtual void coreStateWritten( DBCoreState const& newState ) { - if( !newState.oldTLogData.size() ) + if( !newState.oldTLogData.size() ) { recoveryCompleteWrittenToCoreState = true; + } + for(auto& t : newState.tLogs) { + if(!t.isLocal) { + remoteLogsWrittenToCoreState = true; + break; + } + } } virtual Future onError() { @@ -327,14 +348,31 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); } + int bestSet = -1; + for(int t = 0; t < tLogs.size(); t++) { + if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) { + bestSet = t; + break; + } + } if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) { - return Reference( new ILogSystem::SetPeekCursor( tLogs, 0, tLogs[0]->logServers.size() ? tLogs[0]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) ); + return Reference( new ILogSystem::SetPeekCursor( tLogs, std::max(0,bestSet), + bestSet >= 0 ? ( tLogs[bestSet]->logServers.size() ? tLogs[bestSet]->bestLocationFor( tag ) : -1 ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) ); } else { std::vector< Reference > cursors; std::vector< LogMessageVersion > epochEnds; - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( tLogs, 0, tLogs[0]->logServers.size() ? tLogs[0]->bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) ); + cursors.push_back( Reference( new ILogSystem::SetPeekCursor( tLogs, std::max(0,bestSet), + bestSet >= 0 ? ( tLogs[bestSet]->logServers.size() ? tLogs[bestSet]->bestLocationFor( tag ) : -1 ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) ); for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) { - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 0, oldLogData[i].tLogs[0]->logServers.size() ? oldLogData[i].tLogs[0]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) ); + int bestOldSet = -1; + for(int t = 0; t < oldLogData[i].tLogs.size(); t++) { + if(oldLogData[i].tLogs[t]->hasBest && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) { + bestOldSet = t; + break; + } + } + cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, std::max(0,bestOldSet), + bestOldSet >= 0 ? ( oldLogData[i].tLogs[bestOldSet]->logServers.size() ? oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ) : -1 ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) ); epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd)); } @@ -352,27 +390,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers[tLogs[0]->bestLocationFor( tag )] : Reference>>(), tag, begin, getPeekEnd(), false, false ) ); } else { - if(tLogs.size() < 2) { + int bestSet = -1; + for(int t = 0; t < tLogs.size(); t++) { + if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) { + bestSet = t; + break; + } + } + + if(tLogs.size() < 1 || bestSet < 0) { return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); } - if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) { - return Reference( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ? - tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] : - Reference>>(), tag, begin, getPeekEnd(), false, false ) ); - } else { - TEST(true); //peekSingle used during non-copying tlog recovery - std::vector< Reference > cursors; - std::vector< LogMessageVersion > epochEnds; - cursors.push_back( Reference( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ? - tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] : - Reference>>(), tag, oldLogData[0].epochEnd, getPeekEnd(), false, false) ) ); - for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) { - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) ); - epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd)); - } - - return Reference( new ILogSystem::MultiCursor(cursors, epochEnds) ); - } + + return Reference( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers.size() ? + tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )] : + Reference>>(), tag, begin, getPeekEnd(), false, false ) ); } } @@ -444,33 +476,38 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> newEpoch( vector availableLogServers, vector availableRemoteLogServers, vector availableLogRouters, DatabaseConfiguration const& config, LogEpoch recoveryCount ) { + virtual Future> newEpoch( RecruitFromConfigurationReply const& recr, Future const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) { // Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch // is only provisional until the caller updates the coordinated DBCoreState - return newEpoch( Reference::addRef(this), availableLogServers, availableRemoteLogServers, availableLogRouters, config, recoveryCount ); + return newEpoch( Reference::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality ); } virtual LogSystemConfig getLogSystemConfig() { LogSystemConfig logSystemConfig; logSystemConfig.logSystemType = logSystemType; + logSystemConfig.expectedLogSets = expectedLogSets; + logSystemConfig.minRouters = minRouters; - logSystemConfig.tLogs.resize(tLogs.size()); for( int i = 0; i < tLogs.size(); i++ ) { - TLogSet& log = logSystemConfig.tLogs[i]; Reference logSet = tLogs[i]; - log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; - log.tLogReplicationFactor = logSet->tLogReplicationFactor; - log.tLogPolicy = logSet->tLogPolicy; - log.tLogLocalities = logSet->tLogLocalities; - log.isLocal = logSet->isLocal; - log.hasBest = logSet->hasBest; + if(logSet->isLocal || remoteLogsWrittenToCoreState) { + logSystemConfig.tLogs.push_back(TLogSet()); + TLogSet& log = logSystemConfig.tLogs.back(); + log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; + log.tLogReplicationFactor = logSet->tLogReplicationFactor; + log.tLogPolicy = logSet->tLogPolicy; + log.tLogLocalities = logSet->tLogLocalities; + log.isLocal = logSet->isLocal; + log.hasBest = logSet->hasBest; + log.locality = logSet->locality; - for( int i = 0; i < logSet->logServers.size(); i++ ) { - log.tLogs.push_back(logSet->logServers[i]->get()); - } + for( int i = 0; i < logSet->logServers.size(); i++ ) { + log.tLogs.push_back(logSet->logServers[i]->get()); + } - for( int i = 0; i < logSet->logRouters.size(); i++ ) { - log.logRouters.push_back(logSet->logRouters[i]->get()); + for( int i = 0; i < logSet->logRouters.size(); i++ ) { + log.logRouters.push_back(logSet->logRouters[i]->get()); + } } } @@ -488,6 +525,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities; log.isLocal = logSet->isLocal; log.hasBest = logSet->hasBest; + log.locality = logSet->locality; for( int i = 0; i < logSet->logServers.size(); i++ ) { log.tLogs.push_back(logSet->logServers[i]->get()); @@ -507,8 +545,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> logs; vector> oldLogs; for(auto& t : tLogs) { - for( int i = 0; i < t->logServers.size(); i++ ) { - logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress())); + if(t->isLocal || remoteLogsWrittenToCoreState) { + for( int i = 0; i < t->logServers.size(); i++ ) { + logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress())); + } } } if(!recoveryCompleteWrittenToCoreState) { @@ -632,6 +672,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = coreSet.tLogLocalities; logSet->isLocal = coreSet.isLocal; logSet->hasBest = coreSet.hasBest; + logSet->locality = coreSet.locality; logFailed.push_back(failed); } oldLogData.resize(prevState.oldTLogData.size()); @@ -654,6 +695,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities = log.tLogLocalities; logSet->isLocal = log.isLocal; logSet->hasBest = log.hasBest; + logSet->locality = log.locality; } oldData.epochEnd = old.epochEnd; } @@ -861,23 +903,32 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, vector remoteTLogWorkers, vector logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int logNum ) + ACTOR static Future newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int8_t remoteLocality ) { + state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers ); + + state Reference logSet = Reference( new LogSet() ); + logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor; + logSet->tLogPolicy = configuration.remoteTLogPolicy; + logSet->isLocal = false; + logSet->hasBest = true; + logSet->locality = remoteLocality; + //recruit temporary log routers and update registration with them - state int tempLogRouters = std::max(logRouterWorkers.size(), minTag + 1); + state int tempLogRouters = std::max(remoteWorkers.logRouters.size(), minTag + 1); state vector> logRouterInitializationReplies; for( int i = 0; i < tempLogRouters; i++) { InitializeLogRouterRequest req; req.recoveryCount = recoveryCount; req.routerTag = Tag(tagLocalityLogRouter, i); - req.logSet = logNum; - logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( logRouterWorkers[i%logRouterWorkers.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + req.logSet = self->tLogs.size(); + logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); } Void _ = wait( waitForAll(logRouterInitializationReplies) ); for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) { - self->tLogs[logNum]->logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[i].get()) ) ) ); + logSet->logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[i].get()) ) ) ); } state double startTime = now(); @@ -885,9 +936,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> remoteTLogInitializationReplies; - vector< InitializeTLogRequest > remoteTLogReqs( remoteTLogWorkers.size() ); + vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() ); - for( int i = 0; i < remoteTLogWorkers.size(); i++ ) { + for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) { InitializeTLogRequest &req = remoteTLogReqs[i]; req.recruitmentID = remoteRecruitmentID; req.storeType = configuration.tLogDataStoreType; @@ -898,60 +949,71 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[logNum]->tLogLocalities.resize( remoteTLogWorkers.size() ); - self->tLogs[logNum]->logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size - self->tLogs[logNum]->updateLocalitySet(remoteTLogWorkers); + logSet->tLogLocalities.resize( remoteWorkers.remoteTLogs.size() ); + logSet->logServers.resize( remoteWorkers.remoteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size + logSet->updateLocalitySet(remoteWorkers.remoteTLogs); vector locations; for( Tag tag : oldLogSystem->epochEndTags ) { locations.clear(); - self->tLogs[logNum]->getPushLocations( vector(1, tag), locations, 0 ); + logSet->getPushLocations( vector(1, tag), locations, 0 ); for(int loc : locations) remoteTLogReqs[ loc ].recoverTags.push_back( tag ); } - for( int i = 0; i < remoteTLogWorkers.size(); i++ ) - remoteTLogInitializationReplies.push_back( transformErrors( throwErrorOr( remoteTLogWorkers[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) + remoteTLogInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); Void _ = wait( waitForAll(remoteTLogInitializationReplies) ); for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) { - self->tLogs[logNum]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(remoteTLogInitializationReplies[i].get()) ) ); - self->tLogs[logNum]->tLogLocalities[i] = remoteTLogWorkers[i].locality; + logSet->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(remoteTLogInitializationReplies[i].get()) ) ); + logSet->tLogLocalities[i] = remoteWorkers.remoteTLogs[i].locality; } std::vector> recoveryComplete; - for( int i = 0; i < self->tLogs[logNum]->logServers.size(); i++) - recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + for( int i = 0; i < logSet->logServers.size(); i++) + recoveryComplete.push_back( transformErrors( throwErrorOr( logSet->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); self->remoteRecoveryComplete = waitForAll(recoveryComplete); - self->tLogs[logNum]->logRouters.resize(logRouterWorkers.size()); + logSet->logRouters.resize(remoteWorkers.remoteTLogs.size()); + self->tLogs.push_back( logSet ); return Void(); } - ACTOR static Future> newEpoch( - Reference oldLogSystem, vector tLogWorkers, vector remoteTLogWorkers, vector logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount ) + ACTOR static Future> newEpoch( Reference oldLogSystem, RecruitFromConfigurationReply recr, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) { state double startTime = now(); state Reference logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) ); state UID recruitmentID = g_random->randomUniqueID(); logSystem->logSystemType = 2; - logSystem->tLogs.resize(2); + logSystem->minRouters = configuration.logRouterCount; + logSystem->expectedLogSets = 1; - logSystem->tLogs[0] = Reference( new LogSet() ); + logSystem->tLogs.push_back( Reference( new LogSet() ) ); logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum; logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor; logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy; logSystem->tLogs[0]->isLocal = true; logSystem->tLogs[0]->hasBest = true; - - logSystem->tLogs[1] = Reference( new LogSet() ); - logSystem->tLogs[1]->tLogReplicationFactor = configuration.remoteTLogReplicationFactor; - logSystem->tLogs[1]->tLogPolicy = configuration.remoteTLogPolicy; - logSystem->tLogs[1]->isLocal = false; - logSystem->tLogs[1]->hasBest = true; + logSystem->tLogs[0]->locality = primaryLocality; + if(configuration.satelliteTLogReplicationFactor > 0) { + logSystem->tLogs.push_back( Reference( new LogSet() ) ); + logSystem->tLogs[1]->tLogWriteAntiQuorum = configuration.satelliteTLogWriteAntiQuorum; + logSystem->tLogs[1]->tLogReplicationFactor = configuration.satelliteTLogReplicationFactor; + logSystem->tLogs[1]->tLogPolicy = configuration.satelliteTLogPolicy; + logSystem->tLogs[1]->isLocal = true; + logSystem->tLogs[1]->hasBest = false; + logSystem->tLogs[1]->locality = -99; + logSystem->expectedLogSets++; + } + + if(configuration.remoteTLogReplicationFactor > 0) { + logSystem->expectedLogSets++; + } + if(oldLogSystem->tLogs.size()) { logSystem->oldLogData.push_back(OldLogData()); logSystem->oldLogData[0].tLogs = oldLogSystem->tLogs; @@ -963,9 +1025,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> initializationReplies; - vector< InitializeTLogRequest > reqs( tLogWorkers.size() ); + vector< InitializeTLogRequest > reqs( recr.tLogs.size() ); - for( int i = 0; i < tLogWorkers.size(); i++ ) { + for( int i = 0; i < recr.tLogs.size(); i++ ) { InitializeTLogRequest &req = reqs[i]; req.recruitmentID = recruitmentID; req.storeType = configuration.tLogDataStoreType; @@ -975,9 +1037,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->tLogLocalities.resize( tLogWorkers.size() ); - logSystem->tLogs[0]->logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size - logSystem->tLogs[0]->updateLocalitySet(tLogWorkers); + logSystem->tLogs[0]->tLogLocalities.resize( recr.tLogs.size() ); + logSystem->tLogs[0]->logServers.resize( recr.tLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size + logSystem->tLogs[0]->updateLocalitySet(recr.tLogs); std::vector locations; state uint16_t minTag = 0; @@ -989,25 +1051,68 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + for( int i = 0; i < recr.tLogs.size(); i++ ) + initializationReplies.push_back( transformErrors( throwErrorOr( recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + + state std::vector> recoveryComplete; + + if(configuration.satelliteTLogReplicationFactor > 0) { + state vector> satelliteInitializationReplies; + vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() ); + + for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) { + InitializeTLogRequest &req = sreqs[i]; + req.recruitmentID = recruitmentID; + req.storeType = configuration.tLogDataStoreType; + req.recoverFrom = oldLogSystem->getLogSystemConfig(); + req.recoverAt = oldLogSystem->epochEndVersion.get(); + req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; + req.epoch = recoveryCount; + } + + logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() ); + logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size + logSystem->tLogs[1]->updateLocalitySet(recr.satelliteTLogs); + + for( Tag tag : oldLogSystem->getEpochEndTags() ) { + minTag = std::min(minTag, tag.id); + locations.clear(); + logSystem->tLogs[1]->getPushLocations( vector(1, tag), locations, 0 ); + for(int loc : locations) + sreqs[ loc ].recoverTags.push_back( tag ); + } + + for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) + satelliteInitializationReplies.push_back( transformErrors( throwErrorOr( recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor( sreqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + + Void _ = wait( waitForAll( satelliteInitializationReplies ) ); + + for( int i = 0; i < satelliteInitializationReplies.size(); i++ ) { + logSystem->tLogs[1]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(satelliteInitializationReplies[i].get()) ) ); + logSystem->tLogs[1]->tLogLocalities[i] = recr.satelliteTLogs[i].locality; + } + + for( int i = 0; i < logSystem->tLogs[1]->logServers.size(); i++) + recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[1]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + } Void _ = wait( waitForAll( initializationReplies ) ); for( int i = 0; i < initializationReplies.size(); i++ ) { logSystem->tLogs[0]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(initializationReplies[i].get()) ) ); - logSystem->tLogs[0]->tLogLocalities[i] = tLogWorkers[i].locality; + logSystem->tLogs[0]->tLogLocalities[i] = recr.tLogs[i].locality; } //Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed(); - std::vector> recoveryComplete; for( int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++) recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); logSystem->recoveryComplete = waitForAll(recoveryComplete); - logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, minTag, 1); - Void _ = wait(logSystem->remoteRecovery); + + if(configuration.remoteTLogReplicationFactor > 0) { + logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality); + } return logSystem; } diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 44f650df17..ca439852f9 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -59,6 +59,94 @@ struct ProxyVersionReplies { ProxyVersionReplies() : latestRequestNum(0) {} }; +ACTOR Future masterTerminateOnConflict( UID dbgid, Promise fullyRecovered, Future onConflict, Future switchedState ) { + choose { + when( Void _ = wait(onConflict) ) { + if (!fullyRecovered.isSet()) { + TraceEvent("MasterTerminated", dbgid).detail("Reason", "Conflict"); + TEST(true); // Coordinated state conflict, master dying + throw worker_removed(); + } + return Void(); + } + when( Void _ = wait(switchedState) ) { + return Void(); + } + } +} + +class ReusableCoordinatedState : NonCopyable { +public: + Promise fullyRecovered; + DBCoreState prevDBState; + DBCoreState myDBState; + + ReusableCoordinatedState( ServerCoordinators const& coordinators, PromiseStream> const& addActor, UID const& dbgid ) : coordinators(coordinators), cstate(coordinators), addActor(addActor), dbgid(dbgid) {} + + Future read() { + return _read(this); + } + + Future write(DBCoreState newState, bool finalWrite = false) { + return _write(this, newState, finalWrite); + } + + Future move( ClusterConnectionString const& nc ) { + return cstate.move(nc); + } + +private: + MovableCoordinatedState cstate; + ServerCoordinators coordinators; + PromiseStream> addActor; + Promise switchedState; + UID dbgid; + + ACTOR Future _read(ReusableCoordinatedState* self) { + Value prevDBStateRaw = wait( self->cstate.read() ); + self->addActor.send( masterTerminateOnConflict( self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture() ) ); + + if( prevDBStateRaw.size() ) { + self->prevDBState = BinaryReader::fromStringRef(prevDBStateRaw, IncludeVersion()); + self->myDBState = self->prevDBState; + } + + return Void(); + } + + ACTOR Future _write(ReusableCoordinatedState* self, DBCoreState newState, bool finalWrite) { + try { + Void _ = wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); + } catch (Error& e) { + TEST(true); // Master displaced during writeMasterState + throw; + } + + self->myDBState = newState; + + if(!finalWrite) { + self->switchedState.send(Void()); + self->cstate = MovableCoordinatedState(self->coordinators); + Value rereadDBStateRaw = wait( self->cstate.read() ); + DBCoreState readState; + if( rereadDBStateRaw.size() ) + readState = BinaryReader::fromStringRef(rereadDBStateRaw, IncludeVersion()); + + if( readState != newState ) { + TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged"); + TEST(true); // Coordinated state changed between writing and reading, master dying + throw worker_removed(); + } + self->switchedState = Promise(); + self->addActor.send( masterTerminateOnConflict( self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture() ) ); + } else { + self->fullyRecovered.send(Void()); + } + + return Void(); + } +}; + struct MasterData : NonCopyable, ReferenceCounted { UID dbgid; @@ -67,14 +155,6 @@ struct MasterData : NonCopyable, ReferenceCounted { recoveryTransactionVersion; // The first version in this epoch double lastCommitTime; - DBCoreState prevDBState; - // prevDBState is the coordinated state (contents of this->cstate) for the database as of the - // beginning of recovery. If our recovery succeeds, it will be the penultimate state in the consistent chain. - - Optional myDBState; - // myDBState present only after recovery succeeds. It is the state that we wrote to this->cstate - // after recovering and is the final state in the consistent chain. - DatabaseConfiguration originalConfiguration; DatabaseConfiguration configuration; @@ -86,6 +166,7 @@ struct MasterData : NonCopyable, ReferenceCounted { LogSystemDiskQueueAdapter* txnStateLogAdapter; IKeyValueStore* txnStateStore; int64_t memoryLimit; + std::map,int8_t> dcId_locality; vector< MasterProxyInterface > proxies; vector< MasterProxyInterface > provisionalProxies; @@ -99,9 +180,8 @@ struct MasterData : NonCopyable, ReferenceCounted { MasterInterface myInterface; ClusterControllerFullInterface clusterController; // If the cluster controller changes, this master will die, so this is immutable. - MovableCoordinatedState cstate1; // Stores serialized DBCoreState, this one kills previous tlog before recruiting new ones - MovableCoordinatedState cstate2; // Stores serialized DBCoreState, this one contains oldTlogs while we recover from them - MovableCoordinatedState cstate3; // Stores serialized DBCoreState, this is the final one + ReusableCoordinatedState cstate; + AsyncVar cstateUpdated; Reference> dbInfo; int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController @@ -111,7 +191,7 @@ struct MasterData : NonCopyable, ReferenceCounted { Version resolverChangesVersion; std::set resolverNeedingChanges; - Promise fullyRecovered; + PromiseStream> addActor; MasterData( Reference> const& dbInfo, @@ -119,14 +199,13 @@ struct MasterData : NonCopyable, ReferenceCounted { ServerCoordinators const& coordinators, ClusterControllerFullInterface const& clusterController, Standalone const& dbName, - Standalone const& dbId + Standalone const& dbId, + PromiseStream> const& addActor ) : dbgid( myInterface.id() ), myInterface(myInterface), dbInfo(dbInfo), - cstate1(coordinators), - cstate2(coordinators), - cstate3(coordinators), + cstate(coordinators, addActor, dbgid), coordinators(coordinators), clusterController(clusterController), dbName( dbName ), @@ -138,64 +217,14 @@ struct MasterData : NonCopyable, ReferenceCounted { version(invalidVersion), lastVersionTime(0), txnStateStore(0), - memoryLimit(2e9) + memoryLimit(2e9), + cstateUpdated(false), + addActor(addActor) { } ~MasterData() { if(txnStateStore) txnStateStore->close(); } }; -ACTOR Future writeTransitionMasterState( Reference self, bool skipTransition ) { - state DBCoreState newState; - self->logSystem->toCoreState( newState ); - newState.recoveryCount = self->prevDBState.recoveryCount + 1; - - ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor ); - - try { - Void _ = wait( self->cstate2.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); - } catch (Error& e) { - TEST(true); // Master displaced during writeMasterState - throw; - } - - if( !skipTransition ) { - Value rereadDBStateRaw = wait( self->cstate3.read() ); - DBCoreState readState; - if( rereadDBStateRaw.size() ) - readState = BinaryReader::fromStringRef(rereadDBStateRaw, IncludeVersion()); - - if( readState != newState ) { - TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged"); - TEST(true); // Coordinated state changed between writing and reading, master dying - throw worker_removed(); - } - } - - self->logSystem->coreStateWritten(newState); - self->myDBState = newState; - - return Void(); -} - -ACTOR Future writeRecoveredMasterState( Reference self ) { - state DBCoreState newState = self->myDBState.get(); - self->logSystem->toCoreState( newState ); - - ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor ); - - try { - Void _ = wait( self->cstate3.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); - } catch (Error& e) { - TEST(true); // Master displaced during writeMasterState - throw; - } - - self->logSystem->coreStateWritten(newState); - self->myDBState = newState; - - return Void(); -} - ACTOR Future newProxies( Reference self, Future< RecruitFromConfigurationReply > recruits ) { self->proxies.clear(); @@ -206,7 +235,7 @@ ACTOR Future newProxies( Reference self, Future< RecruitFromCo for( int i = 0; i < workers.size(); i++ ) { InitializeMasterProxyRequest req; req.master = self->myInterface; - req.recoveryCount = self->prevDBState.recoveryCount + 1; + req.recoveryCount = self->cstate.myDBState.recoveryCount + 1; req.recoveryTransactionVersion = self->recoveryTransactionVersion; req.firstProxy = i == 0; TraceEvent("ProxyReplies",self->dbgid).detail("workerID",workers[i].id()); @@ -228,7 +257,7 @@ ACTOR Future newResolvers( Reference self, Future< RecruitFrom state vector> initializationReplies; for( int i = 0; i < workers.size(); i++ ) { InitializeResolverRequest req; - req.recoveryCount = self->prevDBState.recoveryCount + 1; + req.recoveryCount = self->cstate.myDBState.recoveryCount + 1; req.proxyCount = recr.proxies.size(); req.resolverCount = recr.resolvers.size(); TraceEvent("ResolverReplies",self->dbgid).detail("workerID",workers[i].id()); @@ -242,9 +271,16 @@ ACTOR Future newResolvers( Reference self, Future< RecruitFrom } ACTOR Future newTLogServers( Reference self, Future< RecruitFromConfigurationReply > recruits, Reference oldLogSystem ) { - RecruitFromConfigurationReply recr = wait( recruits ); + if(!self->dcId_locality.count(self->configuration.primaryDcId) || !self->dcId_locality.count(self->configuration.remoteDcId)) { + TraceEvent(SevWarnAlways, "UnknownDCID", self->dbgid).detail("primaryFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("remoteFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("primaryId", printable(self->configuration.primaryDcId)).detail("remoteId", printable(self->configuration.remoteDcId)); + Void _ = wait( Future(Never()) ); + } - Reference newLogSystem = wait( oldLogSystem->newEpoch( recr.tLogs, recr.remoteTLogs, recr.logRouters, self->configuration, self->prevDBState.recoveryCount + 1 ) ); + RecruitFromConfigurationReply recr = wait( recruits ); + Future fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) ); + + Optional primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId; + Reference newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) ); self->logSystem = newLogSystem; return Void(); @@ -252,21 +288,23 @@ ACTOR Future newTLogServers( Reference self, Future< RecruitFr ACTOR Future newSeedServers( Reference self, vector* servers ) { // This is only necessary if the database is at version 0 + // FIXME: the seed servers do not respect the storage policy servers->clear(); if (self->lastEpochEnd) return Void(); state std::map,Tag> dcId_tags; state int8_t nextLocality = 0; - while( servers->size() < self->configuration.storageTeamSize ) { + while( servers->size() < self->configuration.storageTeamSize + self->configuration.remoteStorageTeamSize ) { try { RecruitStorageRequest req; req.criticalRecruitment = true; + req.includeDCs.push_back( servers->size() < self->configuration.storageTeamSize ? self->configuration.primaryDcId : self->configuration.remoteDcId); for(auto s = servers->begin(); s != servers->end(); ++s) req.excludeMachines.push_back(s->locality.zoneId()); TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid) .detail("ExcludingMachines", req.excludeMachines.size()) - .detail("ExcludingDataCenters", req.excludeDCs.size()); + .detail("DataCenters", req.includeDCs.size()); state RecruitStorageReply candidateWorker = wait( brokenPromiseToNever( self->clusterController.recruitStorage.getReply( req ) ) ); @@ -305,6 +343,11 @@ ACTOR Future newSeedServers( Reference self, vectordcId_locality.clear(); + for(auto& it : dcId_tags) { + self->dcId_locality[it.first] = it.second.locality; + } + TraceEvent("MasterRecruitedInitialStorageServers", self->dbgid) .detail("TargetCount", self->configuration.storageTeamSize) .detail("Servers", describe(*servers)); @@ -328,16 +371,6 @@ Future waitResolverFailure( vector const& resolvers ) { return tagError(quorum( failed, 1 ), master_resolver_failed()); } -ACTOR Future masterTerminateOnConflict( Reference self, Future onConflict ) { - Void _ = wait( onConflict ); - if (!self->fullyRecovered.isSet()) { - TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "Conflict"); - TEST(true); // Coordinated state conflict, master dying - throw worker_removed(); - } - return Void(); -} - ACTOR Future updateLogsValue( Reference self, Database cx ) { state Transaction tr(cx); loop { @@ -405,14 +438,14 @@ ACTOR Future updateRegistration( Reference self, ReferenceregistrationTrigger.onTrigger(); - TraceEvent("MasterUpdateRegistration", self->dbgid).detail("RecoveryCount", self->myDBState.present() ? self->myDBState.get().recoveryCount : self->prevDBState.recoveryCount).detail("logs", describe(logSystem->getLogSystemConfig().tLogs)); + TraceEvent("MasterUpdateRegistration", self->dbgid).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("logs", describe(logSystem->getLogSystemConfig().tLogs)); - if (!self->myDBState.present()) { + if (!self->cstateUpdated.get()) { //FIXME: prior committed tlogs should include - Void _ = wait(sendMasterRegistration(self.getPtr(), logSystem->getLogSystemConfig(), self->provisionalProxies, self->resolvers, self->prevDBState.recoveryCount, self->prevDBState.getPriorCommittedLogServers() )); + Void _ = wait(sendMasterRegistration(self.getPtr(), logSystem->getLogSystemConfig(), self->provisionalProxies, self->resolvers, self->cstate.myDBState.recoveryCount, self->cstate.prevDBState.getPriorCommittedLogServers() )); } else { updateLogsKey = updateLogsValue(self, cx); - Void _ = wait( sendMasterRegistration( self.getPtr(), logSystem->getLogSystemConfig(), self->proxies, self->resolvers, self->myDBState.get().recoveryCount, vector() ) ); + Void _ = wait( sendMasterRegistration( self.getPtr(), logSystem->getLogSystemConfig(), self->proxies, self->resolvers, self->cstate.myDBState.recoveryCount, vector() ) ); } } } @@ -474,7 +507,7 @@ ACTOR Future recruitEverything( Reference self, vectorconfiguration.initialized) status = RecoveryStatus::configuration_invalid; - else if (!self->prevDBState.tLogs.size()) + else if (!self->cstate.prevDBState.tLogs.size()) status = RecoveryStatus::configuration_never_created; else status = RecoveryStatus::configuration_missing; @@ -496,7 +529,7 @@ ACTOR Future recruitEverything( Reference self, vectorconfiguration.storageServerStoreType) .trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); - RecruitFromConfigurationReply recruits = wait( + state RecruitFromConfigurationReply recruits = wait( brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply( RecruitFromConfigurationRequest( self->configuration ) ) ) ); @@ -510,35 +543,8 @@ ACTOR Future recruitEverything( Reference self, vector rewriteMasterState( Reference self ) { - state DBCoreState newState = self->prevDBState; - - newState.recoveryCount++; - try { - Void _ = wait( self->cstate1.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); - } catch (Error& e) { - TEST(true); // Master displaced during rewriteMasterState - throw; - } - - self->prevDBState = newState; - - Value rereadDBStateRaw = wait( self->cstate2.read() ); - DBCoreState readState; - if( rereadDBStateRaw.size() ) - readState = BinaryReader::fromStringRef(rereadDBStateRaw, IncludeVersion()); - - if( readState != newState ) { - TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged"); - TEST(true); // Coordinated state changed between writing and reading, master dying - throw worker_removed(); - } - + Void _ = wait( newSeedServers( self, seedServers ) ); + Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) ); return Void(); } @@ -573,6 +579,12 @@ ACTOR Future readTransactionSystemState( Reference self, Refer self->originalConfiguration = self->configuration; TraceEvent("MasterRecoveredConfig", self->dbgid).detail("conf", self->configuration.toString()).trackLatest("RecoveredConfig"); + Standalone> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) ); + self->dcId_locality.clear(); + for(auto& kv : rawLocalities) { + self->dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value); + } + //auto kvs = self->txnStateStore->readRange( systemKeys ); //for( auto & kv : kvs.get() ) // TraceEvent("MasterRecoveredTXS", self->dbgid).detail("K", printable(kv.key)).detail("V", printable(kv.value)); @@ -635,8 +647,8 @@ ACTOR Future sendInitialCommitToResolvers( Reference self ) { ACTOR Future triggerUpdates( Reference self, Reference oldLogSystem ) { loop { - Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->fullyRecovered.getFuture() ); - if(self->fullyRecovered.isSet()) + Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() ); + if(self->cstate.fullyRecovered.isSet()) return Void(); self->registrationTrigger.trigger(); @@ -921,18 +933,14 @@ static std::set const& normalMasterErrors() { return s; } -ACTOR Future changeCoordinators( Reference self, bool skippedTransition ) { - Void _ = wait( self->fullyRecovered.getFuture() ); +ACTOR Future changeCoordinators( Reference self ) { + Void _ = wait( self->cstate.fullyRecovered.getFuture() ); loop { ChangeCoordinatorsRequest req = waitNext( self->myInterface.changeCoordinators.getFuture() ); state ChangeCoordinatorsRequest changeCoordinatorsRequest = req; try { - if( skippedTransition ) { - Void _ = wait( self->cstate2.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) ); - } else { - Void _ = wait( self->cstate3.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) ); - } + Void _ = wait( self->cstate.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) ); } catch(Error &e) { if(e.code() != error_code_actor_cancelled) @@ -952,32 +960,39 @@ ACTOR Future rejoinRequestHandler( Reference self ) { } } -ACTOR Future trackTlogRecovery( Reference self, Reference>> oldLogSystems, bool skipTransition ) { +ACTOR Future trackTlogRecovery( Reference self, Reference>> oldLogSystems ) { state Future rejoinRequests = Never(); - + state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1; loop { - DBCoreState coreState; - self->logSystem->toCoreState( coreState ); - if( !self->fullyRecovered.isSet() && !coreState.oldTLogData.size() ) { - if( !skipTransition ) { - Void _ = wait( writeRecoveredMasterState(self) ); - self->registrationTrigger.trigger(); - } + state DBCoreState newState; + self->logSystem->toCoreState( newState ); + newState.recoveryCount = recoverCount; + + ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor ); + + state bool finalUpdate = !newState.oldTLogData.size() && newState.tLogs.size() == self->configuration.expectedLogSets(); + Void _ = wait( self->cstate.write(newState, finalUpdate) ); + + self->cstateUpdated.set(true); + self->logSystem->coreStateWritten(newState); + self->registrationTrigger.trigger(); + + if( finalUpdate ) { TraceEvent("MasterFullyRecovered", self->dbgid); oldLogSystems->get()->stopRejoins(); rejoinRequests = rejoinRequestHandler(self); - self->fullyRecovered.send(Void()); + return Void(); } Void _ = wait( self->logSystem->onCoreStateChanged() ); } } -ACTOR Future masterCore( Reference self, PromiseStream> addActor ) +ACTOR Future masterCore( Reference self ) { state TraceInterval recoveryInterval("MasterRecovery"); - addActor.send( waitFailureServer(self->myInterface.waitFailure.getFuture()) ); + self->addActor.send( waitFailureServer(self->myInterface.waitFailure.getFuture()) ); TraceEvent( recoveryInterval.begin(), self->dbgid ); @@ -987,30 +1002,25 @@ ACTOR Future masterCore( Reference self, PromiseStreamdbName).c_str() ).c_str()); - Value prevDBStateRaw = wait( self->cstate1.read() ); - addActor.send( masterTerminateOnConflict( self, self->cstate1.onConflict() ) ); - - if( prevDBStateRaw.size() ) - self->prevDBState = BinaryReader::fromStringRef(prevDBStateRaw, IncludeVersion()); + Void _ = wait( self->cstate.read() ); self->recoveryState = RecoveryState::LOCKING_CSTATE; TraceEvent("MasterRecoveryState", self->dbgid) .detail("StatusCode", RecoveryStatus::locking_coordinated_state) .detail("Status", RecoveryStatus::names[RecoveryStatus::locking_coordinated_state]) - .detail("TLogs", self->prevDBState.tLogs.size()) - .detail("MyRecoveryCount", self->prevDBState.recoveryCount+2) - .detail("StateSize", prevDBStateRaw.size()) + .detail("TLogs", self->cstate.prevDBState.tLogs.size()) + .detail("MyRecoveryCount", self->cstate.prevDBState.recoveryCount+2) .trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); state Reference>> oldLogSystems( new AsyncVar> ); - state Future recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality); + state Future recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality); - Void _ = wait( rewriteMasterState( self ) || recoverAndEndEpoch ); + DBCoreState newState = self->cstate.myDBState; + newState.recoveryCount++; + Void _ = wait( self->cstate.write(newState) || recoverAndEndEpoch ); self->recoveryState = RecoveryState::RECRUITING; - addActor.send( masterTerminateOnConflict( self, self->cstate2.onConflict() ) ); - state vector seedServers; state vector> initialConfChanges; state Future logChanges; @@ -1098,7 +1108,7 @@ ACTOR Future masterCore( Reference self, PromiseStream proxyFailure = waitProxyFailure( self->proxies ); state Future providingVersions = provideVersions(self); - addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) ); + self->addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) ); self->registrationTrigger.trigger(); Void _ = wait(discardCommit(self->txnStateStore, self->txnStateLogAdapter)); @@ -1132,21 +1142,17 @@ ACTOR Future masterCore( Reference self, PromiseStreamrecoveryTransactionVersion), and only our own semi-commits can come between our // first commit and the next new TLogs - DBCoreState coreState; - self->logSystem->toCoreState( coreState ); - state bool skipTransition = !coreState.oldTLogData.size(); - + self->addActor.send( trackTlogRecovery(self, oldLogSystems) ); debug_advanceMaxCommittedVersion(UID(), self->recoveryTransactionVersion); - Void _ = wait( writeTransitionMasterState( self, skipTransition ) ); + while(!self->cstateUpdated.get()) { + Void _ = wait(self->cstateUpdated.onChange()); + } debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion); - if( !skipTransition ) - addActor.send( masterTerminateOnConflict( self, self->cstate3.onConflict() ) ); - if( debugResult ) TraceEvent(SevError, "DBRecoveryDurabilityError"); - TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->myDBState.get().recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion); + TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion); TraceEvent(recoveryInterval.end(), self->dbgid).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion); @@ -1157,23 +1163,18 @@ ACTOR Future masterCore( Reference self, PromiseStreamconfiguration.storageServerStoreType) .trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); - // Now that recovery is complete, we register ourselves with the cluster controller, so that the client and server information - // it hands out can be updated - self->registrationTrigger.trigger(); - // Now that the master is recovered we can start auxiliary services that happen to run here { PromiseStream< std::pair> > ddStorageServerChanges; state double lastLimited = 0; - addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) ); - addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) ); + self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) ); + self->addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) ); } if( self->resolvers.size() > 1 ) - addActor.send( resolutionBalancing(self) ); + self->addActor.send( resolutionBalancing(self) ); - addActor.send( changeCoordinators(self, skipTransition) ); - addActor.send( trackTlogRecovery(self, oldLogSystems, skipTransition) ); + self->addActor.send( changeCoordinators(self) ); loop choose { when( Void _ = wait( tlogFailure ) ) { throw internal_error(); } @@ -1185,17 +1186,16 @@ ACTOR Future masterCore( Reference self, PromiseStream masterServer( MasterInterface mi, Reference> db, ServerCoordinators coordinators, LifetimeToken lifetime ) { - state PromiseStream> addActor; - state Future collection = actorCollection( addActor.getFuture() ); - state Future onDBChange = Void(); - state Reference self( new MasterData( db, mi, coordinators, db->get().clusterInterface, db->get().dbName, LiteralStringRef("") ) ); + state PromiseStream> addActor; + state Reference self( new MasterData( db, mi, coordinators, db->get().clusterInterface, db->get().dbName, LiteralStringRef(""), addActor ) ); + state Future collection = actorCollection( self->addActor.getFuture() ); TEST( !lifetime.isStillValid( db->get().masterLifetime, mi.id()==db->get().master.id() ) ); // Master born doomed TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString()); try { - state Future core = masterCore( self, addActor ); + state Future core = masterCore( self ); loop choose { when (Void _ = wait( core )) { break; } when (Void _ = wait( onDBChange )) {