passed first tests which kill entire datacenters

added configuration options for the remote data center and satellite data centers
updated cluster controller recruitment logic
refactors how master writes core state
updated log recovery, and log system peeking
This commit is contained in:
Evan Tschannen 2017-09-07 15:32:08 -07:00
parent c22708b6d6
commit ea26bc1c43
19 changed files with 993 additions and 476 deletions

View File

@ -65,7 +65,11 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
std::string key = mode.substr(0, pos);
std::string value = mode.substr(pos+1);
if( (key == "logs" || key == "proxies" || key == "resolvers") && isInteger(value) ) {
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "satellite_logs") && isInteger(value) ) {
out[p+key] = value;
}
if( key == "primary_dc" || key == "remote_dc" || key == "primary_satellite_dcs" || key == "remote_satellite_dcs" ) {
out[p+key] = value;
}
@ -87,48 +91,40 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
return out;
}
std::string redundancy, log_replicas, remote_replicas;
std::string redundancy, log_replicas;
IRepPolicyRef storagePolicy;
IRepPolicyRef tLogPolicy;
IRepPolicyRef remoteTLogPolicy;
//FIXME: add modes for real remote policies
bool redundancySpecified = true;
if (mode == "single") {
redundancy="1";
log_replicas="1";
remote_replicas="1";
storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyOne());
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyOne());
} else if(mode == "double" || mode == "fast_recovery_double") {
redundancy="2";
log_replicas="2";
remote_replicas="2";
storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "triple" || mode == "fast_recovery_triple") {
redundancy="3";
log_replicas="3";
remote_replicas="3";
storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "two_datacenter") {
redundancy="3";
log_replicas="3";
remote_replicas="3";
storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "three_datacenter") {
redundancy="3";
log_replicas="3";
remote_replicas="3";
storagePolicy = tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAnd({
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAnd({
IRepPolicyRef(new PolicyAcross(3, "dcid", IRepPolicyRef(new PolicyOne()))),
IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())))
}));
} else if(mode == "three_data_hall") {
redundancy="3";
log_replicas="4";
remote_replicas="4";
storagePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne())));
tLogPolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
));
} else
@ -137,7 +133,6 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
out[p+"storage_replicas"] =
out[p+"storage_quorum"] = redundancy;
out[p+"log_replicas"] = log_replicas;
out[p+"remote_log_replication"] = remote_replicas;
out[p+"log_anti_quorum"] = "0";
BinaryWriter policyWriter(IncludeVersion());
@ -147,11 +142,92 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
policyWriter = BinaryWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, tLogPolicy);
out[p+"log_replication_policy"] = policyWriter.toStringRef().toString();
return out;
}
std::string remote_redundancy, remote_log_replicas;
IRepPolicyRef remoteStoragePolicy;
IRepPolicyRef remoteTLogPolicy;
bool remoteRedundancySpecified = true;
if (mode == "remote_single") {
remote_redundancy="1";
remote_log_replicas="1";
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyOne());
} else if(mode == "remote_double") {
remote_redundancy="2";
remote_log_replicas="2";
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "remote_triple") {
remote_redundancy="3";
remote_log_replicas="3";
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "remote_three_data_hall") {
remote_redundancy="3";
remote_log_replicas="4";
remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne())));
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
));
} else
remoteRedundancySpecified = false;
if (remoteRedundancySpecified) {
out[p+"remote_storage_replicas"] =
out[p+"remote_storage_quorum"] = remote_redundancy;
out[p+"remote_log_replicas"] =
out[p+"log_routers"] = remote_log_replicas;
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, remoteStoragePolicy);
out[p+"remote_storage_policy"] = policyWriter.toStringRef().toString();
policyWriter = BinaryWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, remoteTLogPolicy);
out[p+"remote_replication_policy"] = policyWriter.toStringRef().toString();
out[p+"remote_log_policy"] = policyWriter.toStringRef().toString();
return out;
}
std::string satellite_log_replicas, satellite_anti_quorum, satellite_usable_dcs;
IRepPolicyRef satelliteTLogPolicy;
bool satelliteRedundancySpecified = true;
if (mode == "one_satellite_single") {
satellite_anti_quorum="0";
satellite_usable_dcs="1";
satellite_log_replicas="1";
satelliteTLogPolicy = IRepPolicyRef(new PolicyOne());
} else if(mode == "one_satellite_double") {
satellite_anti_quorum="0";
satellite_usable_dcs="1";
satellite_log_replicas="2";
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "one_satellite_triple") {
satellite_anti_quorum="0";
satellite_usable_dcs="1";
satellite_log_replicas="3";
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "two_satellite_safe") {
satellite_anti_quorum="0";
satellite_usable_dcs="2";
satellite_log_replicas="4";
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
));
} else if(mode == "two_satellite_fast") {
satellite_anti_quorum="2";
satellite_usable_dcs="2";
satellite_log_replicas="4";
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid",
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
));
} else
satelliteRedundancySpecified = false;
if (satelliteRedundancySpecified) {
out[p+"satellite_anti_quorum"] = satellite_anti_quorum;
out[p+"satellite_usable_dcs"] = satellite_usable_dcs;
out[p+"satellite_log_replicas"] = satellite_log_replicas;
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, satelliteTLogPolicy);
out[p+"satellite_log_policy"] = policyWriter.toStringRef().toString();
return out;
}

View File

@ -108,6 +108,23 @@ ProcessClass::Fitness ProcessClass::machineClassFitness( ClusterRole role ) {
default:
return ProcessClass::WorstFit;
}
case ProcessClass::LogRouter:
switch( _class ) {
case ProcessClass::LogRouterClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::ResolutionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::TransactionClass:
return ProcessClass::BestOtherFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::TesterClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;
}

View File

@ -26,9 +26,9 @@
struct ProcessClass {
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, InvalidClass = -1 };
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, LogRouterClass, InvalidClass = -1 };
enum Fitness { BestFit, GoodFit, BestOtherFit, UnsetFit, WorstFit, NeverAssign };
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver };
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter };
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
int16_t _class;
int16_t _source;
@ -46,6 +46,7 @@ public:
else if (s=="unset") _class = UnsetClass;
else if (s=="stateless") _class = StatelessClass;
else if (s=="log") _class = LogClass;
else if (s=="router") _class = LogRouterClass;
else _class = InvalidClass;
}
@ -59,6 +60,7 @@ public:
else if (classStr=="unset") _class = UnsetClass;
else if (classStr=="stateless") _class = StatelessClass;
else if (classStr=="log") _class = LogClass;
else if (classStr=="router") _class = LogRouterClass;
else _class = InvalidClass;
if (sourceStr=="command_line") _source = CommandLineSource;
@ -87,6 +89,7 @@ public:
case TesterClass: return "test";
case StatelessClass: return "stateless";
case LogClass: return "log";
case LogRouterClass: return "router";
default: return "invalid";
}
}

View File

@ -89,6 +89,7 @@ public:
case ProcessClass::TesterClass: return false;
case ProcessClass::StatelessClass: return false;
case ProcessClass::LogClass: return true;
case ProcessClass::LogRouterClass: return false;
default: return false;
}
}
@ -207,8 +208,8 @@ public:
std::map<NetworkAddress, ProcessInfo*> currentlyRebootingProcesses;
class ClusterConnectionString* extraDB;
IRepPolicyRef storagePolicy;
IRepPolicyRef tLogPolicy;
int tLogWriteAntiQuorum;
IRepPolicyRef tLogPolicy;
int tLogWriteAntiQuorum;
//Used by workloads that perform reconfigurations
int testerCount;

View File

@ -152,13 +152,13 @@ public:
std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
std::set<Optional<Standalone<StringRef>>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() );
std::set<Optional<Standalone<StringRef>>> excludedDCs( req.excludeDCs.begin(), req.excludeDCs.end() );
std::set<Optional<Standalone<StringRef>>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() );
std::set<AddressExclusion> excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() );
for( auto& it : id_worker )
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
!excludedDCs.count(it.second.interf.locality.dcId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return std::make_pair(it.second.interf, it.second.processClass);
@ -171,7 +171,7 @@ public:
ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
!excludedDCs.count(it.second.interf.locality.dcId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
fit < bestFit ) {
bestFit = fit;
@ -211,7 +211,7 @@ public:
throw no_more_servers();
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDatacenters( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<NetworkAddress> additionalExlusions = std::set<NetworkAddress>() )
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogs( DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>(), std::set<NetworkAddress> additionalExlusions = std::set<NetworkAddress>() )
{
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
@ -225,7 +225,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( ProcessClass::TLog );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && !additionalExlusions.count(it.second.interf.address()) && fitness != ProcessClass::NeverAssign ) {
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && !additionalExlusions.count(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.interf.locality.dcId())) ) {
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
else {
@ -420,7 +420,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
throw no_more_servers();
}
vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, WorkerFitnessInfo minWorker, bool checkStable = false ) {
vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
vector<std::pair<WorkerInterface, ProcessClass>> results;
if (amount <= 0)
@ -428,7 +428,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.id() != minWorker.worker.first.id() && (fitness < minWorker.fitness || (fitness == minWorker.fitness && id_used[it.first] <= minWorker.used)) && it.second.interf.locality.dcId()==dcId ) {
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
}
}
@ -548,78 +549,161 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
return result;
}
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
RecruitFromConfigurationReply result;
RecruitRemoteFromConfigurationReply findRemoteWorkersForConfiguration( RecruitRemoteFromConfigurationRequest const& req ) {
RecruitRemoteFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
std::set<NetworkAddress> additionalExclusions;
for(int i = 0; i < tlogs.size(); i++) {
result.tLogs.push_back(tlogs[i].first);
additionalExclusions.insert(tlogs[i].first.address());
std::set<Optional<Key>> remoteDC;
remoteDC.insert(req.dcId);
auto remoteLogs = getWorkersForTlogs( req.configuration, id_used, false, remoteDC );
for(int i = 0; i < remoteLogs.size(); i++) {
result.remoteTLogs.push_back(remoteLogs[i].first);
}
std::map< Optional<Standalone<StringRef>>, int> id_used2;
auto remoteTlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used2, false, additionalExclusions );
for(int i = 0; i < remoteTlogs.size(); i++) {
result.remoteTLogs.push_back(remoteTlogs[i].first);
result.logRouters.push_back(remoteTlogs[i].first);
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.configuration.logRouterCount, req.configuration, id_used );
for(int i = 0; i < logRouters.size(); i++) {
result.logRouters.push_back(logRouters[i].first);
}
auto datacenters = getDatacenters( req.configuration );
InDatacenterFitness bestFitness;
int numEquivalent = 1;
for(auto dcId : datacenters ) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
if(fitness < bestFitness) {
bestFitness = fitness;
numEquivalent = 1;
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
} else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) {
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
}
}
ASSERT(bestFitness != InDatacenterFitness());
TraceEvent("findWorkersForConfig").detail("replication", req.configuration.tLogReplicationFactor)
.detail("desiredLogs", req.configuration.getDesiredLogs()).detail("actualLogs", result.tLogs.size())
.detail("desiredProxies", req.configuration.getDesiredProxies()).detail("actualProxies", result.proxies.size())
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
//FIXME: fitness for logs is wrong
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
( AcrossDatacenterFitness(remoteLogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ) ) {
throw operation_failed();
}
return result;
}
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req, Optional<Key> dcId ) {
RecruitFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
ASSERT(dcId == req.configuration.primaryDcId || dcId == req.configuration.remoteDcId);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(dcId == req.configuration.primaryDcId ? req.configuration.primaryDcId : req.configuration.remoteDcId);
result.remoteDcId = dcId == req.configuration.primaryDcId ? req.configuration.remoteDcId : req.configuration.primaryDcId;
auto tlogs = getWorkersForTlogs( req.configuration, id_used, false, primaryDC );
for(int i = 0; i < tlogs.size(); i++) {
result.tLogs.push_back(tlogs[i].first);
}
if(req.configuration.satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
if( dcId == req.configuration.primaryDcId ) {
satelliteDCs.insert( req.configuration.primarySatelliteDcIds.begin(), req.configuration.primarySatelliteDcIds.end() );
} else {
satelliteDCs.insert( req.configuration.remoteSatelliteDcIds.begin(), req.configuration.remoteSatelliteDcIds.end() );
}
auto satelliteLogs = getWorkersForTlogs( req.configuration, id_used, false, satelliteDCs );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
}
}
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, id_used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, id_used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, id_used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, id_used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
//FIXME: fitness for logs is wrong
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
fitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
return result;
}
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
if(req.configuration.remoteTLogReplicationFactor > 0) {
try {
return findWorkersForConfiguration(req, req.configuration.primaryDcId);
} catch( Error& e ) {
if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) {
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
return findWorkersForConfiguration(req, req.configuration.remoteDcId);
} else {
throw;
}
}
} else {
RecruitFromConfigurationReply result;
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogs( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++) {
result.tLogs.push_back(tlogs[i].first);
}
auto datacenters = getDatacenters( req.configuration );
InDatacenterFitness bestFitness;
int numEquivalent = 1;
for(auto dcId : datacenters ) {
auto used = id_used;
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration, used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = InDatacenterFitness(proxies, resolvers);
if(fitness < bestFitness) {
bestFitness = fitness;
numEquivalent = 1;
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
} else if( fitness == bestFitness && g_random->random01() < 1.0/++numEquivalent ) {
result.resolvers = vector<WorkerInterface>();
result.proxies = vector<WorkerInterface>();
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
}
}
ASSERT(bestFitness != InDatacenterFitness());
TraceEvent("findWorkersForConfig").detail("replication", req.configuration.tLogReplicationFactor)
.detail("desiredLogs", req.configuration.getDesiredLogs()).detail("actualLogs", result.tLogs.size())
.detail("desiredProxies", req.configuration.getDesiredProxies()).detail("actualProxies", result.proxies.size())
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
return result;
}
}
bool betterMasterExists() {
ServerDBInfo dbi = db.serverInfo->get();
std::map< Optional<Standalone<StringRef>>, int> id_used;
@ -653,7 +737,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
tlogProcessClasses.push_back(tlogWorker->second.processClass);
}
AcrossDatacenterFitness oldAcrossFit(dbi.logSystemConfig.tLogs[0].tLogs, tlogProcessClasses);
AcrossDatacenterFitness newAcrossFit(getWorkersForTlogsAcrossDatacenters(db.config, id_used, true));
AcrossDatacenterFitness newAcrossFit(getWorkersForTlogs(db.config, id_used, true));
if(oldAcrossFit < newAcrossFit) return false;
@ -713,8 +797,10 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
Standalone<RangeResultRef> lastProcessClasses;
bool gotProcessClasses;
Optional<Standalone<StringRef>> masterProcessId;
Optional<Standalone<StringRef>> masterDcId;
UID id;
std::vector<RecruitFromConfigurationRequest> outstandingRecruitmentRequests;
std::vector<RecruitRemoteFromConfigurationRequest> outstandingRemoteRecruitmentRequests;
std::vector<std::pair<RecruitStorageRequest, double>> outstandingStorageRequests;
ActorCollection ac;
UpdateWorkerList updateWorkerList;
@ -771,6 +857,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
rmq.lifetime = db->serverInfo->get().masterLifetime;
cluster->masterProcessId = masterWorker.first.locality.processId();
cluster->masterDcId = masterWorker.first.locality.dcId();
ErrorOr<MasterInterface> newMaster = wait( masterWorker.first.master.tryGetReply( rmq ) );
if (newMaster.present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", newMaster.get().id());
@ -915,6 +1002,24 @@ void checkOutstandingRecruitmentRequests( ClusterControllerData* self ) {
}
}
void checkOutstandingRemoteRecruitmentRequests( ClusterControllerData* self ) {
for( int i = 0; i < self->outstandingRemoteRecruitmentRequests.size(); i++ ) {
RecruitRemoteFromConfigurationRequest& req = self->outstandingRemoteRecruitmentRequests[i];
try {
req.reply.send( self->findRemoteWorkersForConfiguration( req ) );
std::swap( self->outstandingRemoteRecruitmentRequests[i--], self->outstandingRemoteRecruitmentRequests.back() );
self->outstandingRemoteRecruitmentRequests.pop_back();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers || e.code() == error_code_operation_failed) {
TraceEvent(SevWarn, "RecruitRemoteTLogMatchingSetNotAvailable", self->id).error(e);
} else {
TraceEvent(SevError, "RecruitRemoteTLogsRequestError", self->id).error(e);
throw;
}
}
}
}
void checkOutstandingStorageRequests( ClusterControllerData* self ) {
for( int i = 0; i < self->outstandingStorageRequests.size(); i++ ) {
auto& req = self->outstandingStorageRequests[i];
@ -948,12 +1053,15 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
ACTOR Future<Void> doCheckOutstandingMasterRequests( ClusterControllerData* self ) {
Void _ = wait( delay(SERVER_KNOBS->CHECK_BETTER_MASTER_INTERVAL) );
//FIXME: re-enable betterMasterExists
/*
if (self->betterMasterExists()) {
if (!self->db.forceMasterFailure.isSet()) {
self->db.forceMasterFailure.send( Void() );
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
}
}
*/
return Void();
}
@ -966,6 +1074,7 @@ void checkOutstandingMasterRequests( ClusterControllerData* self ) {
void checkOutstandingRequests( ClusterControllerData* self ) {
checkOutstandingRecruitmentRequests( self );
checkOutstandingRemoteRecruitmentRequests( self );
checkOutstandingStorageRequests( self );
checkOutstandingMasterRequests( self );
}
@ -1196,6 +1305,30 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
}
}
ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData* self, RecruitRemoteFromConfigurationRequest req ) {
// At the moment this doesn't really need to be an actor (it always completes immediately)
TEST(true); //ClusterController RecruitTLogsRequest
loop {
try {
req.reply.send( self->findRemoteWorkersForConfiguration( req ) );
return Void();
} catch (Error& e) {
if (e.code() == error_code_no_more_servers && now() - self->startTime >= SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
self->outstandingRemoteRecruitmentRequests.push_back( req );
TraceEvent(SevWarn, "RecruitRemoteFromConfigurationNotAvailable", self->id).error(e);
return Void();
} else if(e.code() == error_code_operation_failed || e.code() == error_code_no_more_servers) {
//recruitment not good enough, try again
}
else {
TraceEvent(SevError, "RecruitRemoteFromConfigurationError", self->id).error(e);
throw; // goodbye, cluster controller
}
}
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
}
}
void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest const& req ) {
req.reply.send( Void() );
@ -1518,6 +1651,9 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
addActor.send( clusterRecruitFromConfiguration( &self, req ) );
}
when( RecruitRemoteFromConfigurationRequest req = waitNext( interf.recruitRemoteFromConfiguration.getFuture() ) ) {
addActor.send( clusterRecruitRemoteFromConfiguration( &self, req ) );
}
when( RecruitStorageRequest req = waitNext( interf.recruitStorage.getFuture() ) ) {
clusterRecruitStorage( &self, req );
}

View File

@ -35,6 +35,7 @@
struct ClusterControllerFullInterface {
ClusterInterface clientInterface;
RequestStream< struct RecruitFromConfigurationRequest > recruitFromConfiguration;
RequestStream< struct RecruitRemoteFromConfigurationRequest > recruitRemoteFromConfiguration;
RequestStream< struct RecruitStorageRequest > recruitStorage;
RequestStream< struct RegisterWorkerRequest > registerWorker;
RequestStream< struct GetWorkersRequest > getWorkers;
@ -48,6 +49,7 @@ struct ClusterControllerFullInterface {
void initEndpoints() {
clientInterface.initEndpoints();
recruitFromConfiguration.getEndpoint( TaskClusterController );
recruitRemoteFromConfiguration.getEndpoint( TaskClusterController );
recruitStorage.getEndpoint( TaskClusterController );
registerWorker.getEndpoint( TaskClusterController );
getWorkers.getEndpoint( TaskClusterController );
@ -58,7 +60,7 @@ struct ClusterControllerFullInterface {
template <class Ar>
void serialize( Ar& ar ) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A200040001LL );
ar & clientInterface & recruitFromConfiguration & recruitStorage & registerWorker & getWorkers & registerMaster & getServerDBInfo;
ar & clientInterface & recruitFromConfiguration & recruitRemoteFromConfiguration & recruitStorage & registerWorker & getWorkers & registerMaster & getServerDBInfo;
}
};
@ -78,14 +80,39 @@ struct RecruitFromConfigurationRequest {
struct RecruitFromConfigurationReply {
vector<WorkerInterface> tLogs;
vector<WorkerInterface> remoteTLogs;
vector<WorkerInterface> logRouters;
vector<WorkerInterface> satelliteTLogs;
vector<WorkerInterface> proxies;
vector<WorkerInterface> resolvers;
Optional<Key> remoteDcId;
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & remoteTLogs & logRouters & proxies & resolvers;
ar & tLogs & satelliteTLogs & proxies & resolvers & remoteDcId;
}
};
struct RecruitRemoteFromConfigurationRequest {
DatabaseConfiguration configuration;
Optional<Key> dcId;
ReplyPromise< struct RecruitRemoteFromConfigurationReply > reply;
RecruitRemoteFromConfigurationRequest() {}
explicit RecruitRemoteFromConfigurationRequest(DatabaseConfiguration const& configuration, Optional<Key> const& dcId)
: configuration(configuration), dcId(dcId) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & configuration & dcId & reply;
}
};
struct RecruitRemoteFromConfigurationReply {
vector<WorkerInterface> remoteTLogs;
vector<WorkerInterface> logRouters;
template <class Ar>
void serialize( Ar& ar ) {
ar & remoteTLogs & logRouters;
}
};
@ -102,13 +129,13 @@ struct RecruitStorageReply {
struct RecruitStorageRequest {
std::vector<Optional<Standalone<StringRef>>> excludeMachines; //< Don't recruit any of these machines
std::vector<AddressExclusion> excludeAddresses; //< Don't recruit any of these addresses
std::vector<Optional<Standalone<StringRef>>> excludeDCs; //< Don't recruit from any of these data centers
std::vector<Optional<Standalone<StringRef>>> includeDCs;
bool criticalRecruitment; //< True if machine classes are to be ignored
ReplyPromise< RecruitStorageReply > reply;
template <class Ar>
void serialize( Ar& ar ) {
ar & excludeMachines & excludeAddresses & excludeDCs & criticalRecruitment & reply;
ar & excludeMachines & excludeAddresses & includeDCs & criticalRecruitment & reply;
}
};

View File

@ -300,8 +300,19 @@ struct MovableCoordinatedStateImpl {
}
};
void MovableCoordinatedState::operator=(MovableCoordinatedState&& av) {
if(impl) {
delete impl;
}
impl = av.impl;
av.impl = 0;
}
MovableCoordinatedState::MovableCoordinatedState( class ServerCoordinators const& coord ) : impl( new MovableCoordinatedStateImpl(coord) ) {}
MovableCoordinatedState::~MovableCoordinatedState() { delete impl; }
MovableCoordinatedState::~MovableCoordinatedState() {
if(impl) {
delete impl;
}
}
Future<Value> MovableCoordinatedState::read() { return MovableCoordinatedStateImpl::read(impl); }
Future<Void> MovableCoordinatedState::onConflict() { return impl->onConflict(); }
Future<Void> MovableCoordinatedState::setExclusive(Value v) { return impl->setExclusive(v); }

View File

@ -60,6 +60,7 @@ private:
class MovableCoordinatedState : NonCopyable {
public:
MovableCoordinatedState( class ServerCoordinators const& );
void operator=(MovableCoordinatedState&& av);
~MovableCoordinatedState();
Future<Value> read();

View File

@ -42,17 +42,18 @@ struct CoreTLogSet {
IRepPolicyRef tLogPolicy;
bool isLocal;
bool hasBest;
int8_t locality;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {}
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
hasBest == rhs.hasBest && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
hasBest == rhs.hasBest && locality == rhs.locality && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}
template <class Archive>
void serialize(Archive& ar) {
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest;
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality;
}
};

View File

@ -437,9 +437,7 @@ struct DDTeamCollection {
PromiseStream<Future<Void>> addActor;
Database cx;
UID masterId;
int teamSize;
IRepPolicyRef replicationPolicy;
KeyValueStoreType storeType;
DatabaseConfiguration configuration;
bool doBuildTeams;
Future<Void> teamBuilder;
@ -479,12 +477,10 @@ struct DDTeamCollection {
MoveKeysLock const& lock,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
int teamSize,
IRepPolicyRef replicationPolicy,
KeyValueStoreType storeType,
DatabaseConfiguration configuration,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges )
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
teamSize( teamSize ), replicationPolicy(replicationPolicy), storeType( storeType ), serverChanges(serverChanges),
configuration(configuration), serverChanges(serverChanges),
initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
initializationDoneActor(logOnCompletion(initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
unhealthyServers(0)
@ -682,10 +678,10 @@ struct DDTeamCollection {
void addSubsetOfEmergencyTeams() {
for( int i = 0; i < teams.size(); i++ ) {
if( teams[i]->servers.size() > teamSize ) {
if( teams[i]->servers.size() > configuration.storageTeamSize ) {
auto& serverIds = teams[i]->getServerIDs();
bool foundTeam = false;
for( int j = 0; j < std::max( 1, (int)(serverIds.size() - teamSize + 1) ) && !foundTeam; j++ ) {
for( int j = 0; j < std::max( 1, (int)(serverIds.size() - configuration.storageTeamSize + 1) ) && !foundTeam; j++ ) {
auto& serverTeams = server_info[serverIds[j]]->teams;
for( int k = 0; k < serverTeams.size(); k++ ) {
auto &testTeam = serverTeams[k]->getServerIDs();
@ -703,7 +699,7 @@ struct DDTeamCollection {
}
}
if( !foundTeam ) {
addTeam(serverIds.begin(), serverIds.begin() + teamSize );
addTeam(serverIds.begin(), serverIds.begin() + configuration.storageTeamSize );
}
}
}
@ -724,7 +720,7 @@ struct DDTeamCollection {
void evaluateTeamQuality() {
int teamCount = teams.size(), serverCount = allServers.size();
double teamsPerServer = (double)teamCount * teamSize / serverCount;
double teamsPerServer = (double)teamCount * configuration.storageTeamSize / serverCount;
ASSERT( serverCount == server_info.size() );
@ -801,8 +797,8 @@ struct DDTeamCollection {
Void _ = wait( yield( TaskDataDistributionLaunch ) );
// Add team, if valid
if(history->size() == self->teamSize) {
auto valid = self->replicationPolicy->validate(*history, processes);
if(history->size() == self->configuration.storageTeamSize) {
auto valid = self->configuration.storagePolicy->validate(*history, processes);
if(!valid) {
return Void();
}
@ -858,8 +854,8 @@ struct DDTeamCollection {
}
}
if(totalServers.size() < teamSize ) {
TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Not enough servers for a team").detail("Servers",totalServers.size()).detail("teamSize", teamSize);
if(totalServers.size() < configuration.storageTeamSize ) {
TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Not enough servers for a team").detail("Servers",totalServers.size()).detail("teamSize", configuration.storageTeamSize);
return addedTeams;
}
@ -894,7 +890,7 @@ struct DDTeamCollection {
int maxAttempts = SERVER_KNOBS->BEST_OF_AMT;
for( int i = 0; i < maxAttempts && i < 100; i++) {
team.clear();
auto success = totalServers.selectReplicas(replicationPolicy, forcedAttributes, team);
auto success = totalServers.selectReplicas(configuration.storagePolicy, forcedAttributes, team);
if(!success) {
break;
}
@ -902,7 +898,7 @@ struct DDTeamCollection {
if(forcedAttributes.size() > 0) {
team.push_back((UID*)totalServers.getObject(forcedAttributes[0]));
}
if( team.size() != teamSize) {
if( team.size() != configuration.storageTeamSize) {
maxAttempts += 1;
}
@ -917,7 +913,7 @@ struct DDTeamCollection {
}
}
if( bestTeam.size() == teamSize) {
if( bestTeam.size() == configuration.storageTeamSize) {
vector<UID> processIDs;
for (auto process = bestTeam.begin(); process < bestTeam.end(); process++) {
@ -935,7 +931,7 @@ struct DDTeamCollection {
TraceEvent(SevWarn, "DataDistributionBuildTeams", masterId).detail("Reason","Unable to make desiredTeams");
break;
}
if(++loopCount > 2*teamsToBuild*(teamSize+1) ) {
if(++loopCount > 2*teamsToBuild*(configuration.storageTeamSize+1) ) {
break;
}
}
@ -967,7 +963,7 @@ struct DDTeamCollection {
uniqueMachines = machines.size();
// If there are too few machines to even build teams or there are too few represented datacenters, build no new teams
if( uniqueMachines >= self->teamSize ) {
if( uniqueMachines >= self->configuration.storageTeamSize ) {
desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER*serverCount;
// Count only properly sized teams against the desired number of teams. This is to prevent "emergency" merged teams (see MoveKeys)
@ -976,13 +972,13 @@ struct DDTeamCollection {
// Also exclude teams who have members in the wrong configuration, since we don't want these teams either
int teamCount = 0;
for(int i = 0; i < self->teams.size(); i++) {
if( self->teams[i]->getServerIDs().size() == self->teamSize && !self->teams[i]->isWrongConfiguration() ) {
if( self->teams[i]->getServerIDs().size() == self->configuration.storageTeamSize && !self->teams[i]->isWrongConfiguration() ) {
teamCount++;
}
}
TraceEvent("BuildTeamsBegin", self->masterId).detail("DesiredTeams", desiredTeams).detail("UniqueMachines", uniqueMachines)
.detail("TeamSize", self->teamSize).detail("Servers", serverCount)
.detail("TeamSize", self->configuration.storageTeamSize).detail("Servers", serverCount)
.detail("CurrentTrackedTeams", self->teams.size()).detail("TeamCount", teamCount);
if( desiredTeams > teamCount ) {
@ -996,18 +992,23 @@ struct DDTeamCollection {
state int teamsToBuild = desiredTeams - teamCount;
state vector<std::vector<UID>> builtTeams;
int addedTeams = wait( self->addAllTeams( self, desiredServerVector, &builtTeams, teamsToBuild ) );
if( addedTeams < teamsToBuild ) {
for( int i = 0; i < builtTeams.size(); i++ ) {
std::sort(builtTeams[i].begin(), builtTeams[i].end());
self->addTeam( builtTeams[i].begin(), builtTeams[i].end() );
}
TraceEvent("AddAllTeams", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", builtTeams.size());
}
else {
if( self->configuration.storageTeamSize > 3) {
int addedTeams = self->addTeamsBestOf( teamsToBuild );
TraceEvent("AddTeamsBestOf", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", addedTeams);
} else {
int addedTeams = wait( self->addAllTeams( self, desiredServerVector, &builtTeams, teamsToBuild ) );
if( addedTeams < teamsToBuild ) {
for( int i = 0; i < builtTeams.size(); i++ ) {
std::sort(builtTeams[i].begin(), builtTeams[i].end());
self->addTeam( builtTeams[i].begin(), builtTeams[i].end() );
}
TraceEvent("AddAllTeams", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", builtTeams.size());
}
else {
int addedTeams = self->addTeamsBestOf( teamsToBuild );
TraceEvent("AddTeamsBestOf", self->masterId).detail("CurrentTeams", self->teams.size()).detail("AddedTeams", addedTeams);
}
}
}
}
@ -1116,7 +1117,7 @@ struct DDTeamCollection {
ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistributionTeam> team) {
state int lastServersLeft = team->getServerIDs().size();
state bool lastAnyUndesired = false;
state bool wrongSize = team->getServerIDs().size() != self->teamSize;
state bool wrongSize = team->getServerIDs().size() != self->configuration.storageTeamSize;
state bool lastReady = self->initialFailureReactionDelay.isReady();
state bool lastHealthy = team->isHealthy();
state bool lastOptimal = team->isOptimal();
@ -1156,12 +1157,12 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
}
int serversLeft = teamLocality->size();
bool matchesPolicy = self->replicationPolicy->validate(teamLocality->getEntries(), teamLocality);
bool matchesPolicy = self->configuration.storagePolicy->validate(teamLocality->getEntries(), teamLocality);
if( !self->initialFailureReactionDelay.isReady() )
change.push_back( self->initialFailureReactionDelay );
bool recheck = lastReady != self->initialFailureReactionDelay.isReady() && ( !matchesPolicy || anyUndesired || team->getServerIDs().size() != self->teamSize );
bool recheck = lastReady != self->initialFailureReactionDelay.isReady() && ( !matchesPolicy || anyUndesired || team->getServerIDs().size() != self->configuration.storageTeamSize );
lastReady = self->initialFailureReactionDelay.isReady();
if( serversLeft != lastServersLeft || anyUndesired != lastAnyUndesired || anyWrongConfiguration != lastWrongConfiguration || wrongSize || recheck ) {
@ -1170,7 +1171,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
.detail("lastServersLeft", lastServersLeft).detail("ContainsUndesiredServer", anyUndesired)
.detail("HealthyTeamsCount", self->healthyTeamCount).detail("IsWrongConfiguration", anyWrongConfiguration);
bool healthy = matchesPolicy && !anyUndesired && team->getServerIDs().size() == self->teamSize && team->getServerIDs().size() == serversLeft;
bool healthy = matchesPolicy && !anyUndesired && team->getServerIDs().size() == self->configuration.storageTeamSize && team->getServerIDs().size() == serversLeft;
team->setHealthy( healthy ); // Unhealthy teams won't be chosen by bestTeam
team->setWrongConfiguration( anyWrongConfiguration );
@ -1216,7 +1217,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
wrongSize = false;
state int lastPriority = team->getPriority();
if( serversLeft < self->teamSize ) {
if( serversLeft < self->configuration.storageTeamSize ) {
if( serversLeft == 0 )
team->setPriority( PRIORITY_TEAM_0_LEFT );
else if( serversLeft == 1 )
@ -1226,7 +1227,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
else
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
}
else if ( team->getServerIDs().size() != self->teamSize )
else if ( team->getServerIDs().size() != self->configuration.storageTeamSize )
team->setPriority( PRIORITY_TEAM_UNHEALTHY );
else if( anyUndesired )
team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
@ -1444,8 +1445,8 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
//Returns the KeyValueStoreType of server if it is different from self->storeType
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection *self, TCServerInfo *server) {
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskDataDistribution)));
if(type == self->storeType)
Void _ = wait(Never());
if(type == self->configuration.storageServerStoreType && ( server->lastKnownInterface.locality.dcId() == self->configuration.primaryDcId || server->lastKnownInterface.locality.dcId() == self->configuration.remoteDcId ) )
Void _ = wait(Future<Void>(Never()));
return type;
}
@ -1469,7 +1470,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreType = false;
state bool hasWrongStoreTypeOrDC = false;
changes.send( std::make_pair(server->id, server->lastKnownInterface) );
@ -1525,7 +1526,7 @@ ACTOR Future<Void> storageServerTracker(
}
//If this storage server has the wrong key-value store type, then mark it undesired so it will be replaced with a server having the correct type
if(hasWrongStoreType) {
if(hasWrongStoreTypeOrDC) {
TraceEvent(SevWarn, "UndesiredStorageServer", masterId).detail("Server", server->id).detail("StoreType", "?");
status.isUndesired = true;
status.isWrongConfiguration = true;
@ -1547,7 +1548,7 @@ ACTOR Future<Void> storageServerTracker(
failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId );
//We need to recruit new storage servers if the key value store type has changed
if(hasWrongStoreType)
if(hasWrongStoreTypeOrDC)
self->restartRecruiting.trigger();
if( lastIsUndesired && !status.isUndesired )
@ -1583,7 +1584,7 @@ ACTOR Future<Void> storageServerTracker(
//Restart the storeTracker for the new interface
storeTracker = keyValueStoreTypeTracker(self, server);
hasWrongStoreType = false;
hasWrongStoreTypeOrDC = false;
self->restartTeamBuilder.trigger();
if(restartRecruiting)
self->restartRecruiting.trigger();
@ -1595,11 +1596,11 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent("KeyValueStoreTypeChanged", masterId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->storeType.toString());
.detail("DesiredType", self->configuration.storageServerStoreType.toString());
TEST(true); //KeyValueStore type changed
storeTracker = Never();
hasWrongStoreType = true;
hasWrongStoreTypeOrDC = true;
}
when( Void _ = wait( server->wakeUpTracker.getFuture() ) ) {
server->wakeUpTracker = Promise<Void>();
@ -1649,7 +1650,7 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl
state UID interfaceId = g_random->randomUniqueID();
InitializeStorageRequest isr;
isr.storeType = self->storeType;
isr.storeType = self->configuration.storageServerStoreType;
isr.seedTag = invalidTag;
isr.reqId = g_random->randomUniqueID();
isr.interfaceId = interfaceId;
@ -1721,6 +1722,9 @@ ACTOR Future<Void> storageRecruiter( DDTeamCollection *self, Reference<AsyncVar<
rsr.excludeAddresses.push_back(it);
}
rsr.includeDCs.push_back(self->configuration.primaryDcId);
rsr.includeDCs.push_back(self->configuration.remoteDcId);
TraceEvent(rsr.criticalRecruitment ? SevWarn : SevInfo, "DDRecruiting").detail("State", "Sending request to CC")
.detail("Exclusions", rsr.excludeAddresses.size()).detail("Critical", rsr.criticalRecruitment);
@ -1760,13 +1764,11 @@ ACTOR Future<Void> dataDistributionTeamCollection(
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<RelocateShard> output,
UID masterId, int teamSize,
IRepPolicyRef replicationPolicy,
KeyValueStoreType storeType,
UID masterId, DatabaseConfiguration configuration,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
Future<Void> readyToStart )
{
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, replicationPolicy, storeType, serverChanges );
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, serverChanges );
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -2095,7 +2097,7 @@ ACTOR Future<Void> dataDistribution(
TraceEvent("DDInitGotInitialDD", mi.id()).detail("b","").detail("e", "").detail("src", "[no items]").detail("dest", "[no items]").trackLatest("InitialDD");
}
if (initData->mode) break;
if (initData->mode && false) break; //FIXME: re-enable data distribution
TraceEvent("DataDistributionDisabled", mi.id());
TraceEvent("MovingData", mi.id())
@ -2129,7 +2131,7 @@ ACTOR Future<Void> dataDistribution(
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tci, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, configuration.storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration.storageTeamSize, configuration.storagePolicy, configuration.storageServerStoreType, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) );
Void _ = wait( waitForAll( actors ) );
return Void();
@ -2154,15 +2156,17 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
false
);
DatabaseConfiguration conf;
conf.storageTeamSize = teamSize;
conf.storagePolicy = policy;
DDTeamCollection* collection = new DDTeamCollection(
database,
UID(0, 0),
MoveKeysLock(),
PromiseStream<RelocateShard>(),
Reference<ShardsAffectedByTeamFailure>(new ShardsAffectedByTeamFailure()),
teamSize,
policy,
KeyValueStoreType(),
conf,
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>()
);

View File

@ -34,11 +34,23 @@ void DatabaseConfiguration::resetInternal() {
autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES;
autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS;
autoDesiredTLogCount = CLIENT_KNOBS->DEFAULT_AUTO_LOGS;
storagePolicy = IRepPolicyRef();
tLogPolicy = IRepPolicyRef();
remoteTLogCount = 0;
remoteTLogReplicationFactor = 0;
remoteTLogPolicy = IRepPolicyRef();
primaryDcId = remoteDcId = Optional<Standalone<StringRef>>();
tLogPolicy = storagePolicy = remoteTLogPolicy = remoteStoragePolicy = satelliteTLogPolicy = IRepPolicyRef();
remoteDesiredTLogCount = remoteTLogReplicationFactor = remoteDurableStorageQuorum = remoteStorageTeamSize = satelliteDesiredTLogCount = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = logRouterCount = 0;
primarySatelliteDcIds.clear();
remoteSatelliteDcIds.clear();
}
void parse( std::vector<Optional<Standalone<StringRef>>>* dcs, ValueRef const& v ) {
int lastBegin = 0;
for(int i = 0; i < v.size(); i++) {
if(v[i] == ',') {
dcs->push_back(v.substr(lastBegin,i));
lastBegin = i + 1;
}
}
dcs->push_back(v.substr(lastBegin));
}
void parse( int* i, ValueRef const& v ) {
@ -53,8 +65,10 @@ void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) {
void DatabaseConfiguration::setDefaultReplicationPolicy() {
storagePolicy = IRepPolicyRef(new PolicyAcross(storageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(remoteStorageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
tLogPolicy = IRepPolicyRef(new PolicyAcross(tLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(remoteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(satelliteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
}
bool DatabaseConfiguration::isValid() const {
@ -74,9 +88,19 @@ bool DatabaseConfiguration::isValid() const {
autoDesiredTLogCount >= 1 &&
storagePolicy &&
tLogPolicy &&
remoteTLogCount >= 0 &&
remoteTLogReplicationFactor >=0 &&
remoteTLogPolicy;
remoteDesiredTLogCount >= 0 &&
remoteTLogReplicationFactor >= 0 &&
remoteTLogPolicy &&
( remoteTLogReplicationFactor == 0 || ( primaryDcId.present() && remoteDcId.present() && remoteDurableStorageQuorum >= 1 && logRouterCount >= 1 ) ) &&
remoteStoragePolicy &&
remoteDurableStorageQuorum <= remoteStorageTeamSize &&
satelliteDesiredTLogCount >= 0 &&
satelliteTLogReplicationFactor >= 0 &&
satelliteTLogWriteAntiQuorum >= 0 &&
satelliteTLogUsableDcs >= 0 &&
( satelliteTLogReplicationFactor == 0 || ( primarySatelliteDcIds.size() && remoteSatelliteDcIds.size() && remoteTLogReplicationFactor > 0 ) ) &&
satelliteTLogPolicy &&
logRouterCount >= 0;
}
std::map<std::string, std::string> DatabaseConfiguration::toMap() const {
@ -108,6 +132,56 @@ std::map<std::string, std::string> DatabaseConfiguration::toMap() const {
result["storage_engine"] = "memory";
else
result["storage_engine"] = "custom";
if(primaryDcId.present()) {
result["primary_dc"] = printable(primaryDcId.get());
}
if(remoteDcId.present()) {
result["remote_dc"] = printable(remoteDcId.get());
}
if(primarySatelliteDcIds.size()) {
std::string primaryDcStr = "";
bool first = true;
for(auto& it : primarySatelliteDcIds) {
if(it.present()) {
if(!first) {
primaryDcStr += ",";
first = false;
}
primaryDcStr += printable(it.get());
}
}
result["primary_satellite_dcs"] = primaryDcStr;
}
if(remoteSatelliteDcIds.size()) {
std::string remoteDcStr = "";
bool first = true;
for(auto& it : remoteSatelliteDcIds) {
if(it.present()) {
if(!first) {
remoteDcStr += ",";
first = false;
}
remoteDcStr += printable(it.get());
}
}
result["remote_satellite_dcs"] = remoteDcStr;
}
if(satelliteTLogReplicationFactor > 0) {
result["satellite_replication"] = format("%d", satelliteTLogReplicationFactor);
}
if( remoteDurableStorageQuorum == remoteStorageTeamSize && remoteDurableStorageQuorum > 0) {
if( remoteTLogReplicationFactor == 1 && remoteDurableStorageQuorum == 1 )
result["remote_redundancy_mode"] = "remote_single";
else if( remoteTLogReplicationFactor == 2 && remoteDurableStorageQuorum == 2 )
result["remote_redundancy_mode"] = "remote_double";
else if( remoteTLogReplicationFactor == 3 && remoteDurableStorageQuorum == 3 )
result["remote_redundancy_mode"] = "remote_triple";
else
result["remote_redundancy_mode"] = "custom";
}
}
return result;
@ -144,9 +218,22 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
else if (ck == LiteralStringRef("auto_logs")) parse(&autoDesiredTLogCount, value);
else if (ck == LiteralStringRef("storage_replication_policy")) parseReplicationPolicy(&storagePolicy, value);
else if (ck == LiteralStringRef("log_replication_policy")) parseReplicationPolicy(&tLogPolicy, value);
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteTLogCount, value);
else if (ck == LiteralStringRef("remote_log_replication")) parse(&remoteTLogReplicationFactor, value);
else if (ck == LiteralStringRef("remote_replication_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
else if (ck == LiteralStringRef("remote_storage_policy")) parseReplicationPolicy(&remoteStoragePolicy, value);
else if (ck == LiteralStringRef("satellite_log_policy")) parseReplicationPolicy(&satelliteTLogPolicy, value);
else if (ck == LiteralStringRef("remote_storage_quorum")) parse(&remoteDurableStorageQuorum, value);
else if (ck == LiteralStringRef("remote_storage_replicas")) parse(&remoteStorageTeamSize, value);
else if (ck == LiteralStringRef("satellite_logs")) parse(&satelliteDesiredTLogCount, value);
else if (ck == LiteralStringRef("satellite_log_replicas")) parse(&satelliteTLogReplicationFactor, value);
else if (ck == LiteralStringRef("satellite_anti_quorum")) parse(&satelliteTLogWriteAntiQuorum, value);
else if (ck == LiteralStringRef("satellite_usable_dcs")) parse(&satelliteTLogUsableDcs, value);
else if (ck == LiteralStringRef("primary_dc")) primaryDcId = value;
else if (ck == LiteralStringRef("remote_dc")) remoteDcId = value;
else if (ck == LiteralStringRef("primary_satellite_dcs")) parse(&primarySatelliteDcIds, value);
else if (ck == LiteralStringRef("remote_satellite_dcs")) parse(&remoteSatelliteDcIds, value);
else if (ck == LiteralStringRef("log_routers")) parse(&logRouterCount, value);
else return false;
return true; // All of the above options currently require recovery to take effect
}

View File

@ -42,14 +42,21 @@ struct DatabaseConfiguration {
std::string toString() const;
std::map<std::string, std::string> toMap() const;
int expectedLogSets() {
int result = 1;
if( satelliteTLogReplicationFactor > 0) {
result++;
}
if( remoteTLogReplicationFactor > 0) {
result++;
}
return result;
}
// SOMEDAY: think about changing storageTeamSize to durableStorageQuorum
int32_t minMachinesRequired() const { return std::max(tLogReplicationFactor*2, storageTeamSize); }
int32_t maxMachineFailuresTolerated() const { return std::min(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, durableStorageQuorum - 1); }
// Redundancy Levels
IRepPolicyRef storagePolicy;
// MasterProxy Servers
int32_t masterProxyCount;
int32_t autoMasterProxyCount;
@ -59,21 +66,41 @@ struct DatabaseConfiguration {
int32_t autoResolverCount;
// TLogs
IRepPolicyRef tLogPolicy;
int32_t desiredTLogCount;
int32_t autoDesiredTLogCount;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
KeyValueStoreType tLogDataStoreType;
IRepPolicyRef tLogPolicy;
int32_t remoteTLogCount;
int32_t remoteTLogReplicationFactor;
IRepPolicyRef remoteTLogPolicy;
Optional<Standalone<StringRef>> primaryDcId;
// Storage servers
// Storage Servers
IRepPolicyRef storagePolicy;
int32_t durableStorageQuorum;
int32_t storageTeamSize;
KeyValueStoreType storageServerStoreType;
// Remote TLogs
int32_t remoteDesiredTLogCount;
int32_t remoteTLogReplicationFactor;
int32_t logRouterCount;
IRepPolicyRef remoteTLogPolicy;
Optional<Standalone<StringRef>> remoteDcId;
// Remote Storage Servers
IRepPolicyRef remoteStoragePolicy;
int32_t remoteDurableStorageQuorum;
int32_t remoteStorageTeamSize;
// Satellite TLogs
IRepPolicyRef satelliteTLogPolicy;
int32_t satelliteDesiredTLogCount;
int32_t satelliteTLogReplicationFactor;
int32_t satelliteTLogWriteAntiQuorum;
int32_t satelliteTLogUsableDcs;
std::vector<Optional<Standalone<StringRef>>> primarySatelliteDcIds;
std::vector<Optional<Standalone<StringRef>>> remoteSatelliteDcIds;
// Excluded servers (no state should be here)
bool isExcludedServer( NetworkAddress ) const;
std::set<AddressExclusion> getExcludedServers() const;

View File

@ -384,8 +384,8 @@ ACTOR Future<Void> logRouterCore(
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t recoveryCount, TLogInterface myInterface, int logSet) {
loop{
if (db->get().recoveryCount >= recoveryCount && logSet < db->get().logSystemConfig.tLogs.size() &&
!std::count(db->get().logSystemConfig.tLogs[logSet].logRouters.begin(), db->get().logSystemConfig.tLogs[logSet].logRouters.end(), myInterface.id())) {
if (db->get().recoveryCount >= recoveryCount && ( logSet >= db->get().logSystemConfig.expectedLogSets || ( logSet < db->get().logSystemConfig.tLogs.size() &&
!std::count(db->get().logSystemConfig.tLogs[logSet].logRouters.begin(), db->get().logSystemConfig.tLogs[logSet].logRouters.end(), myInterface.id()) ) )) {
throw worker_removed();
}
Void _ = wait(db->onChange());

View File

@ -51,8 +51,9 @@ public:
std::map<int,LocalityEntry> logEntryMap;
bool isLocal;
bool hasBest;
int8_t locality;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {}
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
int bestLocationFor( Tag tag ) {
return hasBest ? tag.id % logServers.size() : -1;
@ -100,7 +101,9 @@ public:
if(hasBest) {
for(auto& t : tags) {
newLocations.push_back(bestLocationFor(t));
if(t.locality == locality || t.locality == tagLocalitySpecial) {
newLocations.push_back(bestLocationFor(t));
}
}
}
@ -498,7 +501,7 @@ struct ILogSystem {
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
virtual Future<Reference<ILogSystem>> newEpoch( vector<WorkerInterface> availableLogServers, vector<WorkerInterface> availableRemoteLogServers, vector<WorkerInterface> availableLogRouters, DatabaseConfiguration const& config, LogEpoch recoveryCount ) = 0;
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) = 0;
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState

View File

@ -61,17 +61,18 @@ struct TLogSet {
int32_t tLogWriteAntiQuorum, tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
int8_t locality;
bool isLocal;
bool hasBest;
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true) {}
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBest(true), locality(-99) {}
std::string toString() const {
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBest, logRouters.size(), describe(tLogs).c_str());
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBest, logRouters.size(), describe(tLogs).c_str(), locality);
}
bool operator == ( const TLogSet& rhs ) const {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBest != rhs.hasBest || tLogs.size() != rhs.tLogs.size()) {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBest != rhs.hasBest || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality) {
return false;
}
if ((tLogPolicy && !rhs.tLogPolicy) || (!tLogPolicy && rhs.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != rhs.tLogPolicy->info()))) {
@ -86,7 +87,7 @@ struct TLogSet {
}
bool isEqualIds(TLogSet const& r) const {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBest != r.hasBest || tLogs.size() != r.tLogs.size()) {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBest != r.hasBest || tLogs.size() != r.tLogs.size() || locality != r.locality) {
return false;
}
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
@ -102,7 +103,7 @@ struct TLogSet {
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest;
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBest & locality;
}
};
@ -142,8 +143,10 @@ struct LogSystemConfig {
int logSystemType;
std::vector<TLogSet> tLogs;
std::vector<OldTLogConf> oldTLogs;
int expectedLogSets;
int minRouters;
LogSystemConfig() : logSystemType(0) {}
LogSystemConfig() : logSystemType(0), minRouters(0), expectedLogSets(0) {}
std::string toString() const {
return format("type: %d oldGenerations: %d %s", logSystemType, oldTLogs.size(), describe(tLogs).c_str());
@ -164,11 +167,11 @@ struct LogSystemConfig {
bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); }
bool isEqual(LogSystemConfig const& r) const {
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs;
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && minRouters == r.minRouters && expectedLogSets == r.expectedLogSets;
}
bool isEqualIds(LogSystemConfig const& r) const {
if( logSystemType!=r.logSystemType || tLogs.size() != r.tLogs.size() || oldTLogs.size() != r.oldTLogs.size() ) {
if( logSystemType!=r.logSystemType || tLogs.size() != r.tLogs.size() || oldTLogs.size() != r.oldTLogs.size() || minRouters != r.minRouters || expectedLogSets != r.expectedLogSets ) {
return false;
}
for(int i = 0; i < tLogs.size(); i++ ) {
@ -198,7 +201,7 @@ struct LogSystemConfig {
template <class Ar>
void serialize( Ar& ar ) {
ar & logSystemType & tLogs & oldTLogs;
ar & logSystemType & tLogs & oldTLogs & minRouters & expectedLogSets;
}
};

View File

@ -455,26 +455,33 @@ ArenaReader* ILogSystem::SetPeekCursor::reader() { return serverCursors[currentS
void ILogSystem::SetPeekCursor::calcHasMessage() {
if(nextVersion.present()) serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() );
if( serverCursors[bestSet][bestServer]->hasMessage() ) {
messageVersion = serverCursors[bestSet][bestServer]->version();
currentSet = bestSet;
currentCursor = bestServer;
hasNextMessage = true;
if(bestSet >= 0 && bestServer >= 0) {
if(nextVersion.present()) {
//TraceEvent("LPC_calcNext").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage).detail("nextVersion", nextVersion.get().toString());
serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() );
}
if( serverCursors[bestSet][bestServer]->hasMessage() ) {
messageVersion = serverCursors[bestSet][bestServer]->version();
currentSet = bestSet;
currentCursor = bestServer;
hasNextMessage = true;
for (auto& cursors : serverCursors) {
for(auto& c : cursors) {
c->advanceTo(messageVersion);
//TraceEvent("LPC_calc1").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage);
for (auto& cursors : serverCursors) {
for(auto& c : cursors) {
c->advanceTo(messageVersion);
}
}
return;
}
return;
}
auto bestVersion = serverCursors[bestSet][bestServer]->version();
for (auto& cursors : serverCursors) {
for (auto& c : cursors) {
c->advanceTo(bestVersion);
auto bestVersion = serverCursors[bestSet][bestServer]->version();
for (auto& cursors : serverCursors) {
for (auto& c : cursors) {
c->advanceTo(bestVersion);
}
}
}
@ -482,8 +489,10 @@ void ILogSystem::SetPeekCursor::calcHasMessage() {
if(useBestSet) {
updateMessage(bestSet, false); // Use Quorum logic
//TraceEvent("LPC_calc2").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage);
if(!hasNextMessage) {
updateMessage(bestSet, true);
//TraceEvent("LPC_calc3").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage);
}
} else {
for(int i = 0; i < logSets.size() && !hasNextMessage; i++) {
@ -491,12 +500,13 @@ void ILogSystem::SetPeekCursor::calcHasMessage() {
updateMessage(i, false); // Use Quorum logic
}
}
//TraceEvent("LPC_calc4").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage);
for(int i = 0; i < logSets.size() && !hasNextMessage; i++) {
if(i != bestSet) {
updateMessage(i, true);
}
}
//TraceEvent("LPC_calc5").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage);
}
}
@ -508,6 +518,7 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
auto& serverCursor = serverCursors[logIdx][i];
if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get());
sortedVersions.push_back(std::pair<LogMessageVersion, int>(serverCursor->version(), i));
//TraceEvent("LPC_update1").detail("ver", messageVersion.toString()).detail("tag", tag).detail("hasNextMessage", hasNextMessage).detail("serverVer", serverCursor->version().toString()).detail("i", i);
}
if(usePolicy) {
@ -529,13 +540,14 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first;
}
for(int i = 0; i < serverCursors[logIdx].size(); i++) {
auto& c = serverCursors[logIdx][i];
auto start = c->version();
c->advanceTo(messageVersion);
if( start < messageVersion && messageVersion < c->version() ) {
advancedPast = true;
TEST(true); //Merge peek cursor advanced past desired sequence
for (auto& cursors : serverCursors) {
for (auto& c : cursors) {
auto start = c->version();
c->advanceTo(messageVersion);
if( start < messageVersion && messageVersion < c->version() ) {
advancedPast = true;
TEST(true); //Merge peek cursor advanced past desired sequence
}
}
}
@ -587,6 +599,7 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
//TraceEvent("LPC_getMore1", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag);
if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) {
ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage());
//TraceEvent("LPC_getMore2", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag);
Void _ = wait( self->serverCursors[self->bestSet][self->bestServer]->getMore() || self->serverCursors[self->bestSet][self->bestServer]->onFailed() );
self->useBestSet = true;
} else {
@ -602,6 +615,7 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet]->tLogPolicy);
}
if(bestSetValid) {
//TraceEvent("LPC_getMore3", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag);
vector<Future<Void>> q;
for (auto& c : self->serverCursors[self->bestSet]) {
if (!c->hasMessage()) {
@ -613,6 +627,7 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
} else {
//FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them
vector<Future<Void>> q;
//TraceEvent("LPC_getMore4", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag);
for(auto& cursors : self->serverCursors) {
for (auto& c :cursors) {
if (!c->hasMessage()) {

View File

@ -1141,7 +1141,7 @@ static StatusObject configurationFetcher(Optional<DatabaseConfiguration> conf, S
else if (configuration.autoDesiredTLogCount != CLIENT_KNOBS->DEFAULT_AUTO_LOGS)
statusObj["auto_logs"] = configuration.autoDesiredTLogCount;
statusObj["remote_logs"] = configuration.remoteTLogCount;
statusObj["remote_logs"] = configuration.remoteDesiredTLogCount;
if(configuration.storagePolicy) {
statusObj["storage_policy"] = configuration.storagePolicy->info();

View File

@ -53,6 +53,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
UID dbgid;
int logSystemType;
std::vector<Reference<LogSet>> tLogs;
int expectedLogSets;
int minRouters;
// new members
@ -61,6 +62,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Future<Void> remoteRecovery;
Future<Void> remoteRecoveryComplete;
bool recoveryCompleteWrittenToCoreState;
bool remoteLogsWrittenToCoreState;
Optional<Version> epochEndVersion;
std::set< Tag > epochEndTags;
@ -70,7 +72,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
ActorCollection actors;
std::vector<OldLogData> oldLogData;
TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits<int>::max()) {}
TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits<int>::max()), expectedLogSets(0) {}
virtual void stopRejoins() {
rejoins = Future<Void>();
@ -109,6 +111,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
logSystem->tLogs.resize(lsConf.tLogs.size());
logSystem->expectedLogSets = lsConf.expectedLogSets;
logSystem->minRouters = lsConf.minRouters;
for( int i = 0; i < lsConf.tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs[i] = logSet;
@ -125,9 +129,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
logSet->locality = tLogSet.locality;
logSet->updateLocalitySet();
if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet->logRouters.size());
}
logSystem->oldLogData.resize(lsConf.oldTLogs.size());
@ -149,6 +152,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->hasBest = tLogData.hasBest;
logSet->locality = tLogData.locality;
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd;
@ -181,8 +185,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
logSet->locality = tLogSet.locality;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet->logRouters.size());
}
//logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd;
@ -205,6 +209,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
logSet->locality = tLogSet.locality;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd;
@ -231,6 +236,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBest = t->hasBest;
coreSet.locality = t->locality;
newState.tLogs.push_back(coreSet);
}
@ -249,6 +255,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBest = t->hasBest;
coreSet.locality = t->locality;
newState.oldTLogData[i].tLogs.push_back(coreSet);
}
newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd;
@ -259,15 +266,29 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual Future<Void> onCoreStateChanged() {
ASSERT(recoveryComplete.isValid());
if( recoveryComplete.isReady() )
ASSERT(recoveryComplete.isValid() && remoteRecovery.isValid() );
if( recoveryComplete.isReady() && remoteRecovery.isReady() ) {
return Never();
return recoveryComplete;
}
if( remoteRecovery.isReady() ) {
return recoveryComplete;
}
if( recoveryComplete.isReady() ) {
return remoteRecovery;
}
return recoveryComplete || remoteRecovery;
}
virtual void coreStateWritten( DBCoreState const& newState ) {
if( !newState.oldTLogData.size() )
if( !newState.oldTLogData.size() ) {
recoveryCompleteWrittenToCoreState = true;
}
for(auto& t : newState.tLogs) {
if(!t.isLocal) {
remoteLogsWrittenToCoreState = true;
break;
}
}
}
virtual Future<Void> onError() {
@ -327,14 +348,31 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(tLogs.size() < 1) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
int bestSet = -1;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
bestSet = t;
break;
}
}
if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) {
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 0, tLogs[0]->logServers.size() ? tLogs[0]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, std::max(0,bestSet),
bestSet >= 0 ? ( tLogs[bestSet]->logServers.size() ? tLogs[bestSet]->bestLocationFor( tag ) : -1 ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 0, tLogs[0]->logServers.size() ? tLogs[0]->bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) );
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, std::max(0,bestSet),
bestSet >= 0 ? ( tLogs[bestSet]->logServers.size() ? tLogs[bestSet]->bestLocationFor( tag ) : -1 ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) );
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 0, oldLogData[i].tLogs[0]->logServers.size() ? oldLogData[i].tLogs[0]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) );
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->hasBest && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
bestOldSet = t;
break;
}
}
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, std::max(0,bestOldSet),
bestOldSet >= 0 ? ( oldLogData[i].tLogs[bestOldSet]->logServers.size() ? oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ) : -1 ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) );
epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd));
}
@ -352,27 +390,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
tLogs[0]->logServers[tLogs[0]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
} else {
if(tLogs.size() < 2) {
int bestSet = -1;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->hasBest && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial)) {
bestSet = t;
break;
}
}
if(tLogs.size() < 1 || bestSet < 0) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ?
tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
} else {
TEST(true); //peekSingle used during non-copying tlog recovery
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ?
tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, oldLogData[0].epochEnd, getPeekEnd(), false, false) ) );
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) );
epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd));
}
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
}
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers.size() ?
tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
}
@ -444,33 +476,38 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return waitForAll(lockResults);
}
virtual Future<Reference<ILogSystem>> newEpoch( vector<WorkerInterface> availableLogServers, vector<WorkerInterface> availableRemoteLogServers, vector<WorkerInterface> availableLogRouters, DatabaseConfiguration const& config, LogEpoch recoveryCount ) {
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) {
// Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch
// is only provisional until the caller updates the coordinated DBCoreState
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), availableLogServers, availableRemoteLogServers, availableLogRouters, config, recoveryCount );
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality );
}
virtual LogSystemConfig getLogSystemConfig() {
LogSystemConfig logSystemConfig;
logSystemConfig.logSystemType = logSystemType;
logSystemConfig.expectedLogSets = expectedLogSets;
logSystemConfig.minRouters = minRouters;
logSystemConfig.tLogs.resize(tLogs.size());
for( int i = 0; i < tLogs.size(); i++ ) {
TLogSet& log = logSystemConfig.tLogs[i];
Reference<LogSet> logSet = tLogs[i];
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBest = logSet->hasBest;
if(logSet->isLocal || remoteLogsWrittenToCoreState) {
logSystemConfig.tLogs.push_back(TLogSet());
TLogSet& log = logSystemConfig.tLogs.back();
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBest = logSet->hasBest;
log.locality = logSet->locality;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
}
}
}
@ -488,6 +525,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBest = logSet->hasBest;
log.locality = logSet->locality;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
@ -507,8 +545,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<std::pair<UID, NetworkAddress>> logs;
vector<std::pair<UID, NetworkAddress>> oldLogs;
for(auto& t : tLogs) {
for( int i = 0; i < t->logServers.size(); i++ ) {
logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress()));
if(t->isLocal || remoteLogsWrittenToCoreState) {
for( int i = 0; i < t->logServers.size(); i++ ) {
logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress()));
}
}
}
if(!recoveryCompleteWrittenToCoreState) {
@ -632,6 +672,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->hasBest = coreSet.hasBest;
logSet->locality = coreSet.locality;
logFailed.push_back(failed);
}
oldLogData.resize(prevState.oldTLogData.size());
@ -654,6 +695,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->hasBest = log.hasBest;
logSet->locality = log.locality;
}
oldData.epochEnd = old.epochEnd;
}
@ -861,23 +903,32 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, vector<WorkerInterface> remoteTLogWorkers, vector<WorkerInterface> logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int logNum )
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int8_t remoteLocality )
{
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSet->tLogPolicy = configuration.remoteTLogPolicy;
logSet->isLocal = false;
logSet->hasBest = true;
logSet->locality = remoteLocality;
//recruit temporary log routers and update registration with them
state int tempLogRouters = std::max<int>(logRouterWorkers.size(), minTag + 1);
state int tempLogRouters = std::max<int>(remoteWorkers.logRouters.size(), minTag + 1);
state vector<Future<TLogInterface>> logRouterInitializationReplies;
for( int i = 0; i < tempLogRouters; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
req.routerTag = Tag(tagLocalityLogRouter, i);
req.logSet = logNum;
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( logRouterWorkers[i%logRouterWorkers.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
req.logSet = self->tLogs.size();
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
Void _ = wait( waitForAll(logRouterInitializationReplies) );
for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) {
self->tLogs[logNum]->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get()) ) ) );
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get()) ) ) );
}
state double startTime = now();
@ -885,9 +936,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state vector<Future<TLogInterface>> remoteTLogInitializationReplies;
vector< InitializeTLogRequest > remoteTLogReqs( remoteTLogWorkers.size() );
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
for( int i = 0; i < remoteTLogWorkers.size(); i++ ) {
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
InitializeTLogRequest &req = remoteTLogReqs[i];
req.recruitmentID = remoteRecruitmentID;
req.storeType = configuration.tLogDataStoreType;
@ -898,60 +949,71 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.remoteTag = Tag(tagLocalityRemoteLog, i);
}
self->tLogs[logNum]->tLogLocalities.resize( remoteTLogWorkers.size() );
self->tLogs[logNum]->logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
self->tLogs[logNum]->updateLocalitySet(remoteTLogWorkers);
logSet->tLogLocalities.resize( remoteWorkers.remoteTLogs.size() );
logSet->logServers.resize( remoteWorkers.remoteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSet->updateLocalitySet(remoteWorkers.remoteTLogs);
vector<int> locations;
for( Tag tag : oldLogSystem->epochEndTags ) {
locations.clear();
self->tLogs[logNum]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
logSet->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag );
}
for( int i = 0; i < remoteTLogWorkers.size(); i++ )
remoteTLogInitializationReplies.push_back( transformErrors( throwErrorOr( remoteTLogWorkers[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ )
remoteTLogInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
Void _ = wait( waitForAll(remoteTLogInitializationReplies) );
for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) {
self->tLogs[logNum]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(remoteTLogInitializationReplies[i].get()) ) );
self->tLogs[logNum]->tLogLocalities[i] = remoteTLogWorkers[i].locality;
logSet->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(remoteTLogInitializationReplies[i].get()) ) );
logSet->tLogLocalities[i] = remoteWorkers.remoteTLogs[i].locality;
}
std::vector<Future<Void>> recoveryComplete;
for( int i = 0; i < self->tLogs[logNum]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
for( int i = 0; i < logSet->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSet->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
self->remoteRecoveryComplete = waitForAll(recoveryComplete);
self->tLogs[logNum]->logRouters.resize(logRouterWorkers.size());
logSet->logRouters.resize(remoteWorkers.remoteTLogs.size());
self->tLogs.push_back( logSet );
return Void();
}
ACTOR static Future<Reference<ILogSystem>> newEpoch(
Reference<TagPartitionedLogSystem> oldLogSystem, vector<WorkerInterface> tLogWorkers, vector<WorkerInterface> remoteTLogWorkers, vector<WorkerInterface> logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount )
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality )
{
state double startTime = now();
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
state UID recruitmentID = g_random->randomUniqueID();
logSystem->logSystemType = 2;
logSystem->tLogs.resize(2);
logSystem->minRouters = configuration.logRouterCount;
logSystem->expectedLogSets = 1;
logSystem->tLogs[0] = Reference<LogSet>( new LogSet() );
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
logSystem->tLogs[0]->isLocal = true;
logSystem->tLogs[0]->hasBest = true;
logSystem->tLogs[1] = Reference<LogSet>( new LogSet() );
logSystem->tLogs[1]->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSystem->tLogs[1]->tLogPolicy = configuration.remoteTLogPolicy;
logSystem->tLogs[1]->isLocal = false;
logSystem->tLogs[1]->hasBest = true;
logSystem->tLogs[0]->locality = primaryLocality;
if(configuration.satelliteTLogReplicationFactor > 0) {
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs[1]->tLogWriteAntiQuorum = configuration.satelliteTLogWriteAntiQuorum;
logSystem->tLogs[1]->tLogReplicationFactor = configuration.satelliteTLogReplicationFactor;
logSystem->tLogs[1]->tLogPolicy = configuration.satelliteTLogPolicy;
logSystem->tLogs[1]->isLocal = true;
logSystem->tLogs[1]->hasBest = false;
logSystem->tLogs[1]->locality = -99;
logSystem->expectedLogSets++;
}
if(configuration.remoteTLogReplicationFactor > 0) {
logSystem->expectedLogSets++;
}
if(oldLogSystem->tLogs.size()) {
logSystem->oldLogData.push_back(OldLogData());
logSystem->oldLogData[0].tLogs = oldLogSystem->tLogs;
@ -963,9 +1025,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
state vector<Future<TLogInterface>> initializationReplies;
vector< InitializeTLogRequest > reqs( tLogWorkers.size() );
vector< InitializeTLogRequest > reqs( recr.tLogs.size() );
for( int i = 0; i < tLogWorkers.size(); i++ ) {
for( int i = 0; i < recr.tLogs.size(); i++ ) {
InitializeTLogRequest &req = reqs[i];
req.recruitmentID = recruitmentID;
req.storeType = configuration.tLogDataStoreType;
@ -975,9 +1037,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.epoch = recoveryCount;
}
logSystem->tLogs[0]->tLogLocalities.resize( tLogWorkers.size() );
logSystem->tLogs[0]->logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[0]->updateLocalitySet(tLogWorkers);
logSystem->tLogs[0]->tLogLocalities.resize( recr.tLogs.size() );
logSystem->tLogs[0]->logServers.resize( recr.tLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[0]->updateLocalitySet(recr.tLogs);
std::vector<int> locations;
state uint16_t minTag = 0;
@ -989,25 +1051,68 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
reqs[ loc ].recoverTags.push_back( tag );
}
for( int i = 0; i < tLogWorkers.size(); i++ )
initializationReplies.push_back( transformErrors( throwErrorOr( tLogWorkers[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
for( int i = 0; i < recr.tLogs.size(); i++ )
initializationReplies.push_back( transformErrors( throwErrorOr( recr.tLogs[i].tLog.getReplyUnlessFailedFor( reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
state std::vector<Future<Void>> recoveryComplete;
if(configuration.satelliteTLogReplicationFactor > 0) {
state vector<Future<TLogInterface>> satelliteInitializationReplies;
vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() );
for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) {
InitializeTLogRequest &req = sreqs[i];
req.recruitmentID = recruitmentID;
req.storeType = configuration.tLogDataStoreType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->epochEndVersion.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
}
logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() );
logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[1]->updateLocalitySet(recr.satelliteTLogs);
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
minTag = std::min(minTag, tag.id);
locations.clear();
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag );
}
for( int i = 0; i < recr.satelliteTLogs.size(); i++ )
satelliteInitializationReplies.push_back( transformErrors( throwErrorOr( recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor( sreqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
Void _ = wait( waitForAll( satelliteInitializationReplies ) );
for( int i = 0; i < satelliteInitializationReplies.size(); i++ ) {
logSystem->tLogs[1]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(satelliteInitializationReplies[i].get()) ) );
logSystem->tLogs[1]->tLogLocalities[i] = recr.satelliteTLogs[i].locality;
}
for( int i = 0; i < logSystem->tLogs[1]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[1]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
Void _ = wait( waitForAll( initializationReplies ) );
for( int i = 0; i < initializationReplies.size(); i++ ) {
logSystem->tLogs[0]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
logSystem->tLogs[0]->tLogLocalities[i] = tLogWorkers[i].locality;
logSystem->tLogs[0]->tLogLocalities[i] = recr.tLogs[i].locality;
}
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();
std::vector<Future<Void>> recoveryComplete;
for( int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
logSystem->recoveryComplete = waitForAll(recoveryComplete);
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, minTag, 1);
Void _ = wait(logSystem->remoteRecovery);
if(configuration.remoteTLogReplicationFactor > 0) {
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality);
}
return logSystem;
}

View File

@ -59,6 +59,94 @@ struct ProxyVersionReplies {
ProxyVersionReplies() : latestRequestNum(0) {}
};
ACTOR Future<Void> masterTerminateOnConflict( UID dbgid, Promise<Void> fullyRecovered, Future<Void> onConflict, Future<Void> switchedState ) {
choose {
when( Void _ = wait(onConflict) ) {
if (!fullyRecovered.isSet()) {
TraceEvent("MasterTerminated", dbgid).detail("Reason", "Conflict");
TEST(true); // Coordinated state conflict, master dying
throw worker_removed();
}
return Void();
}
when( Void _ = wait(switchedState) ) {
return Void();
}
}
}
class ReusableCoordinatedState : NonCopyable {
public:
Promise<Void> fullyRecovered;
DBCoreState prevDBState;
DBCoreState myDBState;
ReusableCoordinatedState( ServerCoordinators const& coordinators, PromiseStream<Future<Void>> const& addActor, UID const& dbgid ) : coordinators(coordinators), cstate(coordinators), addActor(addActor), dbgid(dbgid) {}
Future<Void> read() {
return _read(this);
}
Future<Void> write(DBCoreState newState, bool finalWrite = false) {
return _write(this, newState, finalWrite);
}
Future<Void> move( ClusterConnectionString const& nc ) {
return cstate.move(nc);
}
private:
MovableCoordinatedState cstate;
ServerCoordinators coordinators;
PromiseStream<Future<Void>> addActor;
Promise<Void> switchedState;
UID dbgid;
ACTOR Future<Void> _read(ReusableCoordinatedState* self) {
Value prevDBStateRaw = wait( self->cstate.read() );
self->addActor.send( masterTerminateOnConflict( self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture() ) );
if( prevDBStateRaw.size() ) {
self->prevDBState = BinaryReader::fromStringRef<DBCoreState>(prevDBStateRaw, IncludeVersion());
self->myDBState = self->prevDBState;
}
return Void();
}
ACTOR Future<Void> _write(ReusableCoordinatedState* self, DBCoreState newState, bool finalWrite) {
try {
Void _ = wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) );
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
}
self->myDBState = newState;
if(!finalWrite) {
self->switchedState.send(Void());
self->cstate = MovableCoordinatedState(self->coordinators);
Value rereadDBStateRaw = wait( self->cstate.read() );
DBCoreState readState;
if( rereadDBStateRaw.size() )
readState = BinaryReader::fromStringRef<DBCoreState>(rereadDBStateRaw, IncludeVersion());
if( readState != newState ) {
TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged");
TEST(true); // Coordinated state changed between writing and reading, master dying
throw worker_removed();
}
self->switchedState = Promise<Void>();
self->addActor.send( masterTerminateOnConflict( self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture() ) );
} else {
self->fullyRecovered.send(Void());
}
return Void();
}
};
struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
UID dbgid;
@ -67,14 +155,6 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
recoveryTransactionVersion; // The first version in this epoch
double lastCommitTime;
DBCoreState prevDBState;
// prevDBState is the coordinated state (contents of this->cstate) for the database as of the
// beginning of recovery. If our recovery succeeds, it will be the penultimate state in the consistent chain.
Optional<DBCoreState> myDBState;
// myDBState present only after recovery succeeds. It is the state that we wrote to this->cstate
// after recovering and is the final state in the consistent chain.
DatabaseConfiguration originalConfiguration;
DatabaseConfiguration configuration;
@ -86,6 +166,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
LogSystemDiskQueueAdapter* txnStateLogAdapter;
IKeyValueStore* txnStateStore;
int64_t memoryLimit;
std::map<Optional<Value>,int8_t> dcId_locality;
vector< MasterProxyInterface > proxies;
vector< MasterProxyInterface > provisionalProxies;
@ -99,9 +180,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
MasterInterface myInterface;
ClusterControllerFullInterface clusterController; // If the cluster controller changes, this master will die, so this is immutable.
MovableCoordinatedState cstate1; // Stores serialized DBCoreState, this one kills previous tlog before recruiting new ones
MovableCoordinatedState cstate2; // Stores serialized DBCoreState, this one contains oldTlogs while we recover from them
MovableCoordinatedState cstate3; // Stores serialized DBCoreState, this is the final one
ReusableCoordinatedState cstate;
AsyncVar<bool> cstateUpdated;
Reference<AsyncVar<ServerDBInfo>> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
@ -111,7 +191,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Version resolverChangesVersion;
std::set<UID> resolverNeedingChanges;
Promise<Void> fullyRecovered;
PromiseStream<Future<Void>> addActor;
MasterData(
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
@ -119,14 +199,13 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController,
Standalone<StringRef> const& dbName,
Standalone<StringRef> const& dbId
Standalone<StringRef> const& dbId,
PromiseStream<Future<Void>> const& addActor
)
: dbgid( myInterface.id() ),
myInterface(myInterface),
dbInfo(dbInfo),
cstate1(coordinators),
cstate2(coordinators),
cstate3(coordinators),
cstate(coordinators, addActor, dbgid),
coordinators(coordinators),
clusterController(clusterController),
dbName( dbName ),
@ -138,64 +217,14 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
version(invalidVersion),
lastVersionTime(0),
txnStateStore(0),
memoryLimit(2e9)
memoryLimit(2e9),
cstateUpdated(false),
addActor(addActor)
{
}
~MasterData() { if(txnStateStore) txnStateStore->close(); }
};
ACTOR Future<Void> writeTransitionMasterState( Reference<MasterData> self, bool skipTransition ) {
state DBCoreState newState;
self->logSystem->toCoreState( newState );
newState.recoveryCount = self->prevDBState.recoveryCount + 1;
ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor );
try {
Void _ = wait( self->cstate2.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) );
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
}
if( !skipTransition ) {
Value rereadDBStateRaw = wait( self->cstate3.read() );
DBCoreState readState;
if( rereadDBStateRaw.size() )
readState = BinaryReader::fromStringRef<DBCoreState>(rereadDBStateRaw, IncludeVersion());
if( readState != newState ) {
TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged");
TEST(true); // Coordinated state changed between writing and reading, master dying
throw worker_removed();
}
}
self->logSystem->coreStateWritten(newState);
self->myDBState = newState;
return Void();
}
ACTOR Future<Void> writeRecoveredMasterState( Reference<MasterData> self ) {
state DBCoreState newState = self->myDBState.get();
self->logSystem->toCoreState( newState );
ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor );
try {
Void _ = wait( self->cstate3.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) );
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
}
self->logSystem->coreStateWritten(newState);
self->myDBState = newState;
return Void();
}
ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromConfigurationReply > recruits ) {
self->proxies.clear();
@ -206,7 +235,7 @@ ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromCo
for( int i = 0; i < workers.size(); i++ ) {
InitializeMasterProxyRequest req;
req.master = self->myInterface;
req.recoveryCount = self->prevDBState.recoveryCount + 1;
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
req.recoveryTransactionVersion = self->recoveryTransactionVersion;
req.firstProxy = i == 0;
TraceEvent("ProxyReplies",self->dbgid).detail("workerID",workers[i].id());
@ -228,7 +257,7 @@ ACTOR Future<Void> newResolvers( Reference<MasterData> self, Future< RecruitFrom
state vector<Future<ResolverInterface>> initializationReplies;
for( int i = 0; i < workers.size(); i++ ) {
InitializeResolverRequest req;
req.recoveryCount = self->prevDBState.recoveryCount + 1;
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
req.proxyCount = recr.proxies.size();
req.resolverCount = recr.resolvers.size();
TraceEvent("ResolverReplies",self->dbgid).detail("workerID",workers[i].id());
@ -242,9 +271,16 @@ ACTOR Future<Void> newResolvers( Reference<MasterData> self, Future< RecruitFrom
}
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, Future< RecruitFromConfigurationReply > recruits, Reference<ILogSystem> oldLogSystem ) {
RecruitFromConfigurationReply recr = wait( recruits );
if(!self->dcId_locality.count(self->configuration.primaryDcId) || !self->dcId_locality.count(self->configuration.remoteDcId)) {
TraceEvent(SevWarnAlways, "UnknownDCID", self->dbgid).detail("primaryFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("remoteFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("primaryId", printable(self->configuration.primaryDcId)).detail("remoteId", printable(self->configuration.remoteDcId));
Void _ = wait( Future<Void>(Never()) );
}
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr.tLogs, recr.remoteTLogs, recr.logRouters, self->configuration, self->prevDBState.recoveryCount + 1 ) );
RecruitFromConfigurationReply recr = wait( recruits );
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) );
Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) );
self->logSystem = newLogSystem;
return Void();
@ -252,21 +288,23 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, Future< RecruitFr
ACTOR Future<Void> newSeedServers( Reference<MasterData> self, vector<StorageServerInterface>* servers ) {
// This is only necessary if the database is at version 0
// FIXME: the seed servers do not respect the storage policy
servers->clear();
if (self->lastEpochEnd) return Void();
state std::map<Optional<Value>,Tag> dcId_tags;
state int8_t nextLocality = 0;
while( servers->size() < self->configuration.storageTeamSize ) {
while( servers->size() < self->configuration.storageTeamSize + self->configuration.remoteStorageTeamSize ) {
try {
RecruitStorageRequest req;
req.criticalRecruitment = true;
req.includeDCs.push_back( servers->size() < self->configuration.storageTeamSize ? self->configuration.primaryDcId : self->configuration.remoteDcId);
for(auto s = servers->begin(); s != servers->end(); ++s)
req.excludeMachines.push_back(s->locality.zoneId());
TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid)
.detail("ExcludingMachines", req.excludeMachines.size())
.detail("ExcludingDataCenters", req.excludeDCs.size());
.detail("DataCenters", req.includeDCs.size());
state RecruitStorageReply candidateWorker = wait( brokenPromiseToNever( self->clusterController.recruitStorage.getReply( req ) ) );
@ -305,6 +343,11 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, vector<StorageSer
}
}
self->dcId_locality.clear();
for(auto& it : dcId_tags) {
self->dcId_locality[it.first] = it.second.locality;
}
TraceEvent("MasterRecruitedInitialStorageServers", self->dbgid)
.detail("TargetCount", self->configuration.storageTeamSize)
.detail("Servers", describe(*servers));
@ -328,16 +371,6 @@ Future<Void> waitResolverFailure( vector<ResolverInterface> const& resolvers ) {
return tagError<Void>(quorum( failed, 1 ), master_resolver_failed());
}
ACTOR Future<Void> masterTerminateOnConflict( Reference<MasterData> self, Future<Void> onConflict ) {
Void _ = wait( onConflict );
if (!self->fullyRecovered.isSet()) {
TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "Conflict");
TEST(true); // Coordinated state conflict, master dying
throw worker_removed();
}
return Void();
}
ACTOR Future<Void> updateLogsValue( Reference<MasterData> self, Database cx ) {
state Transaction tr(cx);
loop {
@ -405,14 +438,14 @@ ACTOR Future<Void> updateRegistration( Reference<MasterData> self, Reference<ILo
trigger = self->registrationTrigger.onTrigger();
TraceEvent("MasterUpdateRegistration", self->dbgid).detail("RecoveryCount", self->myDBState.present() ? self->myDBState.get().recoveryCount : self->prevDBState.recoveryCount).detail("logs", describe(logSystem->getLogSystemConfig().tLogs));
TraceEvent("MasterUpdateRegistration", self->dbgid).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("logs", describe(logSystem->getLogSystemConfig().tLogs));
if (!self->myDBState.present()) {
if (!self->cstateUpdated.get()) {
//FIXME: prior committed tlogs should include
Void _ = wait(sendMasterRegistration(self.getPtr(), logSystem->getLogSystemConfig(), self->provisionalProxies, self->resolvers, self->prevDBState.recoveryCount, self->prevDBState.getPriorCommittedLogServers() ));
Void _ = wait(sendMasterRegistration(self.getPtr(), logSystem->getLogSystemConfig(), self->provisionalProxies, self->resolvers, self->cstate.myDBState.recoveryCount, self->cstate.prevDBState.getPriorCommittedLogServers() ));
} else {
updateLogsKey = updateLogsValue(self, cx);
Void _ = wait( sendMasterRegistration( self.getPtr(), logSystem->getLogSystemConfig(), self->proxies, self->resolvers, self->myDBState.get().recoveryCount, vector<UID>() ) );
Void _ = wait( sendMasterRegistration( self.getPtr(), logSystem->getLogSystemConfig(), self->proxies, self->resolvers, self->cstate.myDBState.recoveryCount, vector<UID>() ) );
}
}
}
@ -474,7 +507,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
RecoveryStatus::RecoveryStatus status;
if (self->configuration.initialized)
status = RecoveryStatus::configuration_invalid;
else if (!self->prevDBState.tLogs.size())
else if (!self->cstate.prevDBState.tLogs.size())
status = RecoveryStatus::configuration_never_created;
else
status = RecoveryStatus::configuration_missing;
@ -496,7 +529,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
RecruitFromConfigurationReply recruits = wait(
state RecruitFromConfigurationReply recruits = wait(
brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply(
RecruitFromConfigurationRequest( self->configuration ) ) ) );
@ -510,35 +543,8 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) && newSeedServers( self, seedServers ) );
return Void();
}
ACTOR Future<Void> rewriteMasterState( Reference<MasterData> self ) {
state DBCoreState newState = self->prevDBState;
newState.recoveryCount++;
try {
Void _ = wait( self->cstate1.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) );
} catch (Error& e) {
TEST(true); // Master displaced during rewriteMasterState
throw;
}
self->prevDBState = newState;
Value rereadDBStateRaw = wait( self->cstate2.read() );
DBCoreState readState;
if( rereadDBStateRaw.size() )
readState = BinaryReader::fromStringRef<DBCoreState>(rereadDBStateRaw, IncludeVersion());
if( readState != newState ) {
TraceEvent("MasterTerminated", self->dbgid).detail("Reason", "CStateChanged");
TEST(true); // Coordinated state changed between writing and reading, master dying
throw worker_removed();
}
Void _ = wait( newSeedServers( self, seedServers ) );
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) );
return Void();
}
@ -573,6 +579,12 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
self->originalConfiguration = self->configuration;
TraceEvent("MasterRecoveredConfig", self->dbgid).detail("conf", self->configuration.toString()).trackLatest("RecoveredConfig");
Standalone<VectorRef<KeyValueRef>> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
self->dcId_locality.clear();
for(auto& kv : rawLocalities) {
self->dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
}
//auto kvs = self->txnStateStore->readRange( systemKeys );
//for( auto & kv : kvs.get() )
// TraceEvent("MasterRecoveredTXS", self->dbgid).detail("K", printable(kv.key)).detail("V", printable(kv.value));
@ -635,8 +647,8 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
ACTOR Future<Void> triggerUpdates( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
loop {
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->fullyRecovered.getFuture() );
if(self->fullyRecovered.isSet())
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() );
if(self->cstate.fullyRecovered.isSet())
return Void();
self->registrationTrigger.trigger();
@ -921,18 +933,14 @@ static std::set<int> const& normalMasterErrors() {
return s;
}
ACTOR Future<Void> changeCoordinators( Reference<MasterData> self, bool skippedTransition ) {
Void _ = wait( self->fullyRecovered.getFuture() );
ACTOR Future<Void> changeCoordinators( Reference<MasterData> self ) {
Void _ = wait( self->cstate.fullyRecovered.getFuture() );
loop {
ChangeCoordinatorsRequest req = waitNext( self->myInterface.changeCoordinators.getFuture() );
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
try {
if( skippedTransition ) {
Void _ = wait( self->cstate2.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) );
} else {
Void _ = wait( self->cstate3.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) );
}
Void _ = wait( self->cstate.move( ClusterConnectionString( changeCoordinatorsRequest.newConnectionString.toString() ) ) );
}
catch(Error &e) {
if(e.code() != error_code_actor_cancelled)
@ -952,32 +960,39 @@ ACTOR Future<Void> rejoinRequestHandler( Reference<MasterData> self ) {
}
}
ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems, bool skipTransition ) {
ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems ) {
state Future<Void> rejoinRequests = Never();
state DBRecoveryCount recoverCount = self->cstate.myDBState.recoveryCount + 1;
loop {
DBCoreState coreState;
self->logSystem->toCoreState( coreState );
if( !self->fullyRecovered.isSet() && !coreState.oldTLogData.size() ) {
if( !skipTransition ) {
Void _ = wait( writeRecoveredMasterState(self) );
self->registrationTrigger.trigger();
}
state DBCoreState newState;
self->logSystem->toCoreState( newState );
newState.recoveryCount = recoverCount;
ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor );
state bool finalUpdate = !newState.oldTLogData.size() && newState.tLogs.size() == self->configuration.expectedLogSets();
Void _ = wait( self->cstate.write(newState, finalUpdate) );
self->cstateUpdated.set(true);
self->logSystem->coreStateWritten(newState);
self->registrationTrigger.trigger();
if( finalUpdate ) {
TraceEvent("MasterFullyRecovered", self->dbgid);
oldLogSystems->get()->stopRejoins();
rejoinRequests = rejoinRequestHandler(self);
self->fullyRecovered.send(Void());
return Void();
}
Void _ = wait( self->logSystem->onCoreStateChanged() );
}
}
ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<Void>> addActor )
ACTOR Future<Void> masterCore( Reference<MasterData> self )
{
state TraceInterval recoveryInterval("MasterRecovery");
addActor.send( waitFailureServer(self->myInterface.waitFailure.getFuture()) );
self->addActor.send( waitFailureServer(self->myInterface.waitFailure.getFuture()) );
TraceEvent( recoveryInterval.begin(), self->dbgid );
@ -987,30 +1002,25 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_coordinated_state])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
Value prevDBStateRaw = wait( self->cstate1.read() );
addActor.send( masterTerminateOnConflict( self, self->cstate1.onConflict() ) );
if( prevDBStateRaw.size() )
self->prevDBState = BinaryReader::fromStringRef<DBCoreState>(prevDBStateRaw, IncludeVersion());
Void _ = wait( self->cstate.read() );
self->recoveryState = RecoveryState::LOCKING_CSTATE;
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::locking_coordinated_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_coordinated_state])
.detail("TLogs", self->prevDBState.tLogs.size())
.detail("MyRecoveryCount", self->prevDBState.recoveryCount+2)
.detail("StateSize", prevDBStateRaw.size())
.detail("TLogs", self->cstate.prevDBState.tLogs.size())
.detail("MyRecoveryCount", self->cstate.prevDBState.recoveryCount+2)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality);
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality);
Void _ = wait( rewriteMasterState( self ) || recoverAndEndEpoch );
DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++;
Void _ = wait( self->cstate.write(newState) || recoverAndEndEpoch );
self->recoveryState = RecoveryState::RECRUITING;
addActor.send( masterTerminateOnConflict( self, self->cstate2.onConflict() ) );
state vector<StorageServerInterface> seedServers;
state vector<Standalone<CommitTransactionRef>> initialConfChanges;
state Future<Void> logChanges;
@ -1098,7 +1108,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
state Future<Void> proxyFailure = waitProxyFailure( self->proxies );
state Future<Void> providingVersions = provideVersions(self);
addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) );
self->addActor.send( reportErrors(updateRegistration(self, self->logSystem), "updateRegistration", self->dbgid) );
self->registrationTrigger.trigger();
Void _ = wait(discardCommit(self->txnStateStore, self->txnStateLogAdapter));
@ -1132,21 +1142,17 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
// we made to the new Tlogs (self->recoveryTransactionVersion), and only our own semi-commits can come between our
// first commit and the next new TLogs
DBCoreState coreState;
self->logSystem->toCoreState( coreState );
state bool skipTransition = !coreState.oldTLogData.size();
self->addActor.send( trackTlogRecovery(self, oldLogSystems) );
debug_advanceMaxCommittedVersion(UID(), self->recoveryTransactionVersion);
Void _ = wait( writeTransitionMasterState( self, skipTransition ) );
while(!self->cstateUpdated.get()) {
Void _ = wait(self->cstateUpdated.onChange());
}
debug_advanceMinCommittedVersion(UID(), self->recoveryTransactionVersion);
if( !skipTransition )
addActor.send( masterTerminateOnConflict( self, self->cstate3.onConflict() ) );
if( debugResult )
TraceEvent(SevError, "DBRecoveryDurabilityError");
TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->myDBState.get().recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
TraceEvent("MasterCommittedTLogs", self->dbgid).detail("TLogs", self->logSystem->describe()).detail("RecoveryCount", self->cstate.myDBState.recoveryCount).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
TraceEvent(recoveryInterval.end(), self->dbgid).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
@ -1157,23 +1163,18 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
// Now that recovery is complete, we register ourselves with the cluster controller, so that the client and server information
// it hands out can be updated
self->registrationTrigger.trigger();
// Now that the master is recovered we can start auxiliary services that happen to run here
{
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
state double lastLimited = 0;
addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) );
addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) );
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, &lastLimited ), "DataDistribution", self->dbgid, &normalMasterErrors() ) );
self->addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) );
}
if( self->resolvers.size() > 1 )
addActor.send( resolutionBalancing(self) );
self->addActor.send( resolutionBalancing(self) );
addActor.send( changeCoordinators(self, skipTransition) );
addActor.send( trackTlogRecovery(self, oldLogSystems, skipTransition) );
self->addActor.send( changeCoordinators(self) );
loop choose {
when( Void _ = wait( tlogFailure ) ) { throw internal_error(); }
@ -1185,17 +1186,16 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDBInfo>> db, ServerCoordinators coordinators, LifetimeToken lifetime )
{
state PromiseStream<Future<Void>> addActor;
state Future<Void> collection = actorCollection( addActor.getFuture() );
state Future<Void> onDBChange = Void();
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, db->get().dbName, LiteralStringRef("") ) );
state PromiseStream<Future<Void>> addActor;
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, db->get().dbName, LiteralStringRef(""), addActor ) );
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
TEST( !lifetime.isStillValid( db->get().masterLifetime, mi.id()==db->get().master.id() ) ); // Master born doomed
TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString());
try {
state Future<Void> core = masterCore( self, addActor );
state Future<Void> core = masterCore( self );
loop choose {
when (Void _ = wait( core )) { break; }
when (Void _ = wait( onDBChange )) {