Merge branch 'master' of github.com:apple/foundationdb into feature-redwood

This commit is contained in:
Stephen Atherton 2018-06-17 14:55:05 -07:00
commit 90c8288c68
26 changed files with 484 additions and 274 deletions

View File

@ -150,7 +150,12 @@
],
"log_replication_factor":3,
"log_write_anti_quorum":0,
"log_fault_tolerance":2
"log_fault_tolerance":2,
"remote_log_replication_factor":3,
"remote_log_fault_tolerance":2,
"satellite_log_replication_factor":3,
"satellite_log_write_anti_quorum":0,
"satellite_log_fault_tolerance":2
}
],
"fault_tolerance":{
@ -188,6 +193,7 @@
"incompatible_connections":[
],
"datacenter_version_difference":0,
"database_available":true,
"database_locked":false,
"generation":2,
@ -231,6 +237,7 @@
"$enum":[
"unreachable_master_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",
"unreachable_processes",
"immediate_priority_transaction_start_probe_timeout",
@ -332,25 +339,32 @@
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"configuration":{
"full_replication":true,
"configuration":{
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":"single",
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":"one_satellite_single",
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":"remote_single",
"remote_log_replicas":3,
"remote_logs":5,
"storage_quorum":1,
"storage_replicas":1,
"resolvers":1,
"redundancy":{
"factor":{
"$enum":[
"single",
"double",
"triple",
"custom",
"two_datacenter",
"three_datacenter",
"three_data_hall",
"fast_recovery_double",
"fast_recovery_triple"
]
}
},
"storage_policy":"(zoneid^3x1)",
"tlog_policy":"(zoneid^2x1)",
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
@ -367,6 +381,9 @@
"address":"10.0.4.1"
}
],
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
},
"data":{

View File

@ -58,10 +58,27 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
regions->clear();
for (StatusObjectReader dc : regionArray) {
RegionInfo info;
std::string idStr;
dc.get("id", idStr);
info.dcId = idStr;
dc.get("priority", info.priority);
json_spirit::mArray datacenters;
dc.get("datacenters", datacenters);
bool nonSatelliteDatacenters = 0;
for (StatusObjectReader s : datacenters) {
std::string idStr;
if (s.has("satellite") && s.last().get_int() == 1) {
SatelliteInfo satInfo;
s.get("id", idStr);
satInfo.dcId = idStr;
s.get("priority", satInfo.priority);
info.satellites.push_back(satInfo);
} else {
if (nonSatelliteDatacenters > 0) throw invalid_option();
nonSatelliteDatacenters++;
s.get("id", idStr);
info.dcId = idStr;
s.get("priority", info.priority);
}
}
std::sort(info.satellites.begin(), info.satellites.end(), SatelliteInfo::sort_by_priority() );
if (nonSatelliteDatacenters != 1) throw invalid_option();
dc.tryGet("satellite_logs", info.satelliteDesiredTLogCount);
std::string satelliteReplication;
if(dc.tryGet("satellite_redundancy_mode", satelliteReplication)) {
@ -97,18 +114,6 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
dc.tryGet("satellite_log_replicas", info.satelliteTLogReplicationFactor);
dc.tryGet("satellite_usable_dcs", info.satelliteTLogUsableDcs);
dc.tryGet("satellite_anti_quorum", info.satelliteTLogWriteAntiQuorum);
json_spirit::mArray satellites;
if( dc.tryGet("satellites", satellites) ) {
for (StatusObjectReader s : satellites) {
SatelliteInfo satInfo;
std::string sidStr;
s.get("id", sidStr);
satInfo.dcId = sidStr;
s.get("priority", satInfo.priority);
info.satellites.push_back(satInfo);
}
std::sort(info.satellites.begin(), info.satellites.end(), SatelliteInfo::sort_by_priority() );
}
regions->push_back(info);
}
std::sort(regions->begin(), regions->end(), RegionInfo::sort_by_priority() );
@ -155,24 +160,22 @@ bool DatabaseConfiguration::isValid() const {
getDesiredRemoteLogs() >= 1 &&
remoteTLogReplicationFactor >= 0 &&
regions.size() <= 2 &&
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && regions.size() == 2 && durableStorageQuorum == storageTeamSize ) ) ) ) {
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && regions.size() == 2 && durableStorageQuorum == storageTeamSize ) ) &&
( regions.size() == 0 || regions[0].priority >= 0 ) ) ) {
return false;
}
std::set<Key> dcIds;
std::set<int> priorities;
dcIds.insert(Key());
for(auto& r : regions) {
if( !(!dcIds.count(r.dcId) &&
!priorities.count(r.priority) &&
r.satelliteTLogReplicationFactor >= 0 &&
r.satelliteTLogWriteAntiQuorum >= 0 &&
r.satelliteTLogUsableDcs >= 0 &&
r.satelliteTLogUsableDcs >= 1 &&
( r.satelliteTLogReplicationFactor == 0 || ( r.satelliteTLogPolicy && r.satellites.size() ) ) ) ) {
return false;
}
dcIds.insert(r.dcId);
priorities.insert(r.priority);
for(auto& s : r.satellites) {
if(dcIds.count(s.dcId)) {
return false;
@ -246,44 +249,47 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
if(regions.size()) {
StatusArray regionArr;
for(auto& r : regions) {
StatusObject regionObj;
StatusArray dcArr;
StatusObject dcObj;
dcObj["id"] = r.dcId.toString();
dcObj["priority"] = r.priority;
dcArr.push_back(dcObj);
if(r.satelliteTLogReplicationFactor == 1 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
dcObj["satellite_redundancy_mode"] = "one_satellite_single";
regionObj["satellite_redundancy_mode"] = "one_satellite_single";
} else if(r.satelliteTLogReplicationFactor == 2 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
dcObj["satellite_redundancy_mode"] = "one_satellite_double";
regionObj["satellite_redundancy_mode"] = "one_satellite_double";
} else if(r.satelliteTLogReplicationFactor == 3 && r.satelliteTLogUsableDcs == 1 && r.satelliteTLogWriteAntiQuorum == 0) {
dcObj["satellite_redundancy_mode"] = "one_satellite_triple";
regionObj["satellite_redundancy_mode"] = "one_satellite_triple";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 0) {
dcObj["satellite_redundancy_mode"] = "two_satellite_safe";
regionObj["satellite_redundancy_mode"] = "two_satellite_safe";
} else if(r.satelliteTLogReplicationFactor == 4 && r.satelliteTLogUsableDcs == 2 && r.satelliteTLogWriteAntiQuorum == 2) {
dcObj["satellite_redundancy_mode"] = "two_satellite_fast";
regionObj["satellite_redundancy_mode"] = "two_satellite_fast";
} else if(r.satelliteTLogReplicationFactor != 0) {
dcObj["satellite_log_replicas"] = r.satelliteTLogReplicationFactor;
dcObj["satellite_usable_dcs"] = r.satelliteTLogUsableDcs;
dcObj["satellite_anti_quorum"] = r.satelliteTLogWriteAntiQuorum;
if(r.satelliteTLogPolicy) dcObj["satellite_log_policy"] = r.satelliteTLogPolicy->info();
regionObj["satellite_log_replicas"] = r.satelliteTLogReplicationFactor;
regionObj["satellite_usable_dcs"] = r.satelliteTLogUsableDcs;
regionObj["satellite_anti_quorum"] = r.satelliteTLogWriteAntiQuorum;
if(r.satelliteTLogPolicy) regionObj["satellite_log_policy"] = r.satelliteTLogPolicy->info();
}
if( r.satelliteDesiredTLogCount != -1 ) {
dcObj["satellite_logs"] = r.satelliteDesiredTLogCount;
regionObj["satellite_logs"] = r.satelliteDesiredTLogCount;
}
if(r.satellites.size()) {
StatusArray satellitesArr;
for(auto& s : r.satellites) {
StatusObject satObj;
satObj["id"] = s.dcId.toString();
satObj["priority"] = s.priority;
satObj["satellite"] = 1;
satellitesArr.push_back(satObj);
dcArr.push_back(satObj);
}
dcObj["satellites"] = satellitesArr;
}
regionArr.push_back(dcObj);
regionObj["datacenters"] = dcArr;
regionArr.push_back(regionObj);
}
result["regions"] = regionArr;
}

View File

@ -57,7 +57,7 @@ struct RegionInfo {
std::vector<SatelliteInfo> satellites;
RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(0) {}
RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(1) {}
struct sort_by_priority {
bool operator ()(RegionInfo const&a, RegionInfo const& b) const { return a.priority > b.priority; }

View File

@ -33,7 +33,7 @@ typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalitySatellite = -5, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible
#pragma pack(push, 1)
struct Tag {

View File

@ -258,6 +258,8 @@ int decodeDatacenterReplicasValue( ValueRef const& value ) {
return s;
}
const KeyRef primaryDatacenterKey = LiteralStringRef("\xff/primaryDatacenter");
// serverListKeys.contains(k) iff k.startsWith( serverListKeys.begin ) because '/'+1 == '0'
const KeyRangeRef serverListKeys(
LiteralStringRef("\xff/serverList/"),

View File

@ -86,6 +86,8 @@ const Value datacenterReplicasValue( int const& );
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& );
int decodeDatacenterReplicasValue( ValueRef const& );
extern const KeyRef primaryDatacenterKey;
// "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
// Storage servers are listed here when they are recruited - always before assigning them keys
// Storage servers removed from here are never replaced. The same fdbserver, if re-recruited, will always

View File

@ -89,11 +89,12 @@ public:
std::map<NetworkAddress, std::string> traceLogGroupMap;
Promise<Void> forceMasterFailure;
int64_t masterRegistrationCount;
bool recoveryStalled;
DatabaseConfiguration config; // Asynchronously updated via master registration
DatabaseConfiguration fullyRecoveredConfig;
Database db;
DBInfo() : masterRegistrationCount(0),
DBInfo() : masterRegistrationCount(0), recoveryStalled(false),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo( LiteralStringRef("DB") ) ) ),
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskDefaultEndpoint, true ) ) // SOMEDAY: Locality!
@ -512,15 +513,31 @@ public:
std::vector<std::pair<WorkerInterface, ProcessClass>> satelliteLogs;
if(region.satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto& s : region.satellites) {
satelliteDCs.insert(s.dcId);
}
//FIXME: recruitment does not respect usable_dcs, a.k.a if usable_dcs is 1 we should recruit all tlogs in one data center
satelliteLogs = getWorkersForTlogs( req.configuration, region.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(dcId), region.satelliteTLogPolicy, id_used, false, satelliteDCs );
int startDC = 0;
loop {
if(startDC > 0 && startDC >= region.satellites.size() + 1 - region.satelliteTLogUsableDcs) {
throw no_more_servers();
}
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
try {
std::set<Optional<Key>> satelliteDCs;
for(int s = startDC; s < std::min<int>(startDC + region.satelliteTLogUsableDcs, region.satellites.size()); s++) {
satelliteDCs.insert(region.satellites[s].dcId);
}
satelliteLogs = getWorkersForTlogs( req.configuration, region.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(dcId), region.satelliteTLogPolicy, id_used, false, satelliteDCs );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
}
break;
} catch (Error &e) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}
startDC++;
}
}
@ -556,35 +573,39 @@ public:
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
if(req.configuration.regions.size() > 1) {
std::vector<RegionInfo> regions = req.configuration.regions;
if(regions[0].priority == regions[1].priority && clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
std::swap(regions[0], regions[1]);
}
bool setPrimaryDesired = false;
try {
auto reply = findWorkersForConfiguration(req, req.configuration.regions[0].dcId);
auto reply = findWorkersForConfiguration(req, regions[0].dcId);
setPrimaryDesired = true;
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[0].dcId);
dcPriority.push_back(req.configuration.regions[1].dcId);
dcPriority.push_back(regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
desiredDcIds.set(dcPriority);
if(reply.isError()) {
throw reply.getError();
} else if(clusterControllerDcId.present() && req.configuration.regions[0].dcId == clusterControllerDcId.get()) {
} else if(clusterControllerDcId.present() && regions[0].dcId == clusterControllerDcId.get()) {
return reply.get();
}
throw no_more_servers();
} catch( Error& e ) {
if (e.code() != error_code_no_more_servers) {
if (e.code() != error_code_no_more_servers || regions[1].priority < 0 || now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) {
throw;
}
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
auto reply = findWorkersForConfiguration(req, req.configuration.regions[1].dcId);
auto reply = findWorkersForConfiguration(req, regions[1].dcId);
if(!setPrimaryDesired) {
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[1].dcId);
dcPriority.push_back(req.configuration.regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
dcPriority.push_back(regions[0].dcId);
desiredDcIds.set(dcPriority);
}
if(reply.isError()) {
throw reply.getError();
} else if(clusterControllerDcId.present() && req.configuration.regions[1].dcId == clusterControllerDcId.get()) {
} else if(clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
return reply.get();
}
throw;
@ -690,36 +711,51 @@ public:
}
}
void checkPrimaryDC() {
if(db.config.regions.size() > 1 && clusterControllerDcId.present() && db.config.regions[0].dcId != clusterControllerDcId.get()) {
try {
std::map< Optional<Standalone<StringRef>>, int> id_used;
getWorkerForRoleInDatacenter(db.config.regions[0].dcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
getWorkerForRoleInDatacenter(db.config.regions[0].dcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
void checkRegions(const std::vector<RegionInfo>& regions) {
if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) {
return;
}
try {
std::map< Optional<Standalone<StringRef>>, int> id_used;
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::ClusterController, ProcessClass::ExcludeFit, db.config, id_used, true);
getWorkerForRoleInDatacenter(regions[0].dcId, ProcessClass::Master, ProcessClass::ExcludeFit, db.config, id_used, true);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(db.config.regions[0].dcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
std::set<Optional<Key>> primaryDC;
primaryDC.insert(regions[0].dcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.desiredTLogCount, db.config.tLogPolicy, id_used, true, primaryDC);
if(db.config.regions[0].satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto &s : db.config.regions[0].satellites) {
satelliteDCs.insert(s.dcId);
}
getWorkersForTlogs(db.config, db.config.regions[0].satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(db.config.regions[0].dcId), db.config.regions[0].satelliteTLogPolicy, id_used, true, satelliteDCs);
if(regions[0].satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto &s : regions[0].satellites) {
satelliteDCs.insert(s.dcId);
}
getWorkersForTlogs(db.config, regions[0].satelliteTLogReplicationFactor, db.config.getDesiredSatelliteLogs(regions[0].dcId), regions[0].satelliteTLogPolicy, id_used, true, satelliteDCs);
}
getWorkerForRoleInDatacenter( db.config.regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( db.config.regions[0].dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
getWorkerForRoleInDatacenter( regions[0].dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
vector<Optional<Key>> dcPriority;
dcPriority.push_back(db.config.regions[0].dcId);
dcPriority.push_back(db.config.regions[1].dcId);
desiredDcIds.set(dcPriority);
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
vector<Optional<Key>> dcPriority;
dcPriority.push_back(regions[0].dcId);
dcPriority.push_back(regions[1].dcId);
desiredDcIds.set(dcPriority);
} catch( Error &e ) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}
}
void checkRecoveryStalled() {
if(db.serverInfo->get().recoveryState < RecoveryState::RECOVERY_TRANSACTION && db.recoveryStalled ) {
if(db.config.regions.size() > 1 && clusterControllerDcId.present()) {
auto regions = db.config.regions;
if(clusterControllerDcId.get() == regions[0].dcId) {
std::swap(regions[0], regions[1]);
}
ASSERT(clusterControllerDcId.get() == regions[1].dcId);
checkRegions(regions);
}
}
}
@ -732,7 +768,10 @@ public:
return false;
}
checkPrimaryDC();
if(db.config.regions.size() > 1 && clusterControllerDcId.present() && db.config.regions[0].priority > db.config.regions[1].priority &&
db.config.regions[0].dcId != clusterControllerDcId.get() && versionDifferenceUpdated && datacenterVersionDifference < SERVER_KNOBS->MAX_VERSION_DIFFERENCE) {
checkRegions(db.config.regions);
}
// Get master process
auto masterWorker = id_worker.find(dbi.master.locality.processId());
@ -755,10 +794,11 @@ public:
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;
if(logSet.isLocal && logSet.hasBestPolicy > HasBestPolicyNone) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else if(logSet.isLocal) {
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
satellite_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
else if(logSet.isLocal) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else {
remote_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
@ -917,9 +957,11 @@ public:
DBInfo db;
Database cx;
double startTime;
Version datacenterVersionDifference;
bool versionDifferenceUpdated;
explicit ClusterControllerData( ClusterControllerFullInterface ccInterface )
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now())
: id(ccInterface.id()), ac(false), betterMasterExistsChecker(Void()), gotProcessClasses(false), gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0), versionDifferenceUpdated(false)
{
auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID();
@ -987,6 +1029,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
iMaster = newMaster.get();
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
db->forceMasterFailure = Promise<Void>();
auto dbInfo = ServerDBInfo( LiteralStringRef("DB") );
@ -1182,6 +1225,8 @@ ACTOR Future<Void> doCheckOutstandingMasterRequests( ClusterControllerData* self
}
void checkOutstandingMasterRequests( ClusterControllerData* self ) {
self->checkRecoveryStalled();
if( !self->betterMasterExistsChecker.isReady() )
return;
@ -1465,6 +1510,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
db->masterRegistrationCount = req.registrationCount;
db->recoveryStalled = req.recoveryStalled;
if ( req.configuration.present() ) {
db->config = req.configuration.get();
@ -1705,7 +1751,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections)));
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
@ -1893,6 +1939,65 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
}
}
ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *self ) {
loop {
self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED && self->db.config.remoteTLogReplicationFactor == 0) {
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0;
Void _ = wait(self->db.serverInfo->onChange());
continue;
}
state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::REMOTE_RECOVERED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
primaryLog = tLog.interf();
break;
}
}
}
if(!logSet.isLocal) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
remoteLog = tLog.interf();
break;
}
}
}
}
}
if(!primaryLog.present() || !remoteLog.present()) {
Void _ = wait(self->db.serverInfo->onChange());
continue;
}
state Future<Void> onChange = self->db.serverInfo->onChange();
loop {
state Future<TLogQueuingMetricsReply> primaryMetrics = primaryLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() );
state Future<TLogQueuingMetricsReply> remoteMetrics = remoteLog.get().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() );
Void _ = wait( ( success(primaryMetrics) && success(remoteMetrics) ) || onChange );
if(onChange.isReady()) {
break;
}
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = primaryMetrics.get().v - remoteMetrics.get().v;
Void _ = wait( delay(SERVER_KNOBS->VERSION_LAG_METRIC_INTERVAL) || onChange );
if(onChange.isReady()) {
break;
}
}
}
}
ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf, Future<Void> leaderFail, ServerCoordinators coordinators ) {
state ClusterControllerData self( interf );
state Future<Void> coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
@ -1910,6 +2015,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
addActor.send( monitorClientTxnInfoConfigs(&self.db) );
addActor.send( updatedChangingDatacenters(&self) );
addActor.send( updatedChangedDatacenters(&self) );
addActor.send( updateDatacenterVersionDifference(&self) );
//printf("%s: I am the cluster controller\n", g_network->getLocalAddress().toString().c_str());
loop choose {

View File

@ -202,6 +202,7 @@ struct RegisterMasterRequest {
Optional<DatabaseConfiguration> configuration;
vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
ReplyPromise<Void> reply;
@ -210,7 +211,7 @@ struct RegisterMasterRequest {
template <class Ar>
void serialize( Ar& ar ) {
ASSERT( ar.protocolVersion() >= 0x0FDB00A200040001LL );
ar & dbName & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & reply;
ar & dbName & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & recoveryStalled & reply;
}
};

View File

@ -41,20 +41,19 @@ struct CoreTLogSet {
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && hasBestPolicy == rhs.hasBestPolicy &&
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}
template <class Archive>
void serialize(Archive& ar) {
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};
@ -110,7 +109,7 @@ struct DBCoreState {
template <class Archive>
void serialize(Archive& ar) {
//FIXME: remove when we no longer need to test upgrades from 4.X releases
if(ar.protocolVersion() < 0x0FDB00A460010001LL) {
if(g_network->isSimulated() && ar.protocolVersion() < 0x0FDB00A460010001LL) {
TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0);
flushAndExit(0);
}

View File

@ -55,8 +55,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
// Versions
init( MAX_VERSIONS_IN_FLIGHT, 100000000 );
init( VERSIONS_PER_SECOND, 1000000 );
init( VERSIONS_PER_SECOND, 1e6 );
init( MAX_VERSIONS_IN_FLIGHT, 100 * VERSIONS_PER_SECOND );
init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND;
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 1 * VERSIONS_PER_SECOND);
init( MAX_COMMIT_BATCH_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 2.0; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
@ -256,6 +256,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MIN_BALANCE_DIFFERENCE, 10000 );
init( SECONDS_BEFORE_NO_FAILURE_DELAY, 8 * 3600 );
init( MAX_TXS_SEND_MEMORY, 1e7 ); if( randomize && BUGGIFY ) MAX_TXS_SEND_MEMORY = 1e5;
init( MAX_RECOVERY_VERSIONS, 200 * VERSIONS_PER_SECOND ); if( randomize && BUGGIFY ) MAX_RECOVERY_VERSIONS = VERSIONS_PER_SECOND;
init( MAX_RECOVERY_TIME, 20.0 ); if( randomize && BUGGIFY ) MAX_RECOVERY_TIME = 1.0;
// Resolver
init( SAMPLE_OFFSET_PER_KEY, 100 );
@ -276,6 +278,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
init( CHECK_BETTER_MASTER_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) CHECK_BETTER_MASTER_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::GoodFit );
init( EXPECTED_TLOG_FITNESS, ProcessClass::GoodFit );

View File

@ -60,11 +60,10 @@ public:
int64_t MAX_QUEUE_COMMIT_BYTES;
// Versions
int VERSIONS_PER_SECOND;
int MAX_VERSIONS_IN_FLIGHT;
int MAX_READ_TRANSACTION_LIFE_VERSIONS;
int MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
int VERSIONS_PER_SECOND;
double MAX_COMMIT_BATCH_INTERVAL; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
// Data distribution queue
@ -199,6 +198,8 @@ public:
int64_t MIN_BALANCE_DIFFERENCE;
double SECONDS_BEFORE_NO_FAILURE_DELAY;
int64_t MAX_TXS_SEND_MEMORY;
int64_t MAX_RECOVERY_VERSIONS;
double MAX_RECOVERY_TIME;
// Resolver
int64_t SAMPLE_OFFSET_PER_KEY;
@ -219,6 +220,8 @@ public:
double WORKER_FAILURE_TIME;
double CHECK_BETTER_MASTER_INTERVAL;
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
double VERSION_LAG_METRIC_INTERVAL;
int64_t MAX_VERSION_DIFFERENCE;
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -105,7 +105,6 @@ struct LogRouterData {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
logSet.hasBestPolicy = req.hasBestPolicy;
logSet.locality = req.locality;
logSet.updateLocalitySet(req.tLogLocalities);
@ -413,7 +412,7 @@ ACTOR Future<Void> logRouter(
Reference<AsyncVar<ServerDBInfo>> db)
{
try {
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("HasBestPolicy", req.hasBestPolicy).detail("Locality", req.locality);
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("Locality", req.locality);
state Future<Void> core = logRouterCore(interf, req, db);
loop choose{
when(Void _ = wait(core)) { return Void(); }

View File

@ -44,12 +44,11 @@ public:
std::vector<int> logIndexArray;
std::map<int,LocalityEntry> logEntryMap;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
std::vector<Future<TLogLockResult>> replies;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
std::string logRouterString() {
std::string result;
@ -74,17 +73,8 @@ public:
}
int bestLocationFor( Tag tag ) {
if(hasBestPolicy == HasBestPolicyNone) {
return -1;
} else if(hasBestPolicy == HasBestPolicyId) {
//This policy supports upgrades from 5.X
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
} else {
//Unsupported policy
ASSERT(false);
throw internal_error();
}
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
}
void updateLocalitySet() {
@ -127,11 +117,9 @@ public:
alsoServers.clear();
resultEntries.clear();
if(hasBestPolicy) {
for(auto& t : tags) {
if(t.locality == locality || t.locality == tagLocalitySpecial || locality == tagLocalitySpecial || (isLocal && t.locality == tagLocalityLogRouter)) {
newLocations.push_back(bestLocationFor(t));
}
for(auto& t : tags) {
if(locality == tagLocalitySpecial || t.locality == locality || t.locality < 0) {
newLocations.push_back(bestLocationFor(t));
}
}
@ -490,7 +478,8 @@ struct ILogSystem {
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) = 0;
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config,
LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) = 0;
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState

View File

@ -55,8 +55,6 @@ protected:
Optional<Interface> iface;
};
enum { HasBestPolicyNone = 0, HasBestPolicyId = 1 };
struct TLogSet {
std::vector<OptionalInterface<TLogInterface>> tLogs;
std::vector<OptionalInterface<TLogInterface>> logRouters;
@ -64,18 +62,17 @@ struct TLogSet {
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
std::string toString() const {
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBestPolicy, logRouters.size(), describe(tLogs).c_str(), locality);
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), locality);
}
bool operator == ( const TLogSet& rhs ) const {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBestPolicy != rhs.hasBestPolicy ||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal ||
startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
return false;
}
@ -96,7 +93,7 @@ struct TLogSet {
}
bool isEqualIds(TLogSet const& r) const {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBestPolicy != r.hasBestPolicy || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
return false;
}
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
@ -112,7 +109,7 @@ struct TLogSet {
template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};

View File

@ -762,28 +762,32 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {
StatusObject primaryObj;
primaryObj["id"] = "0";
primaryObj["priority"] = 0;
StatusObject primaryDcObj;
primaryDcObj["id"] = "0";
primaryDcObj["priority"] = 2;
StatusArray primaryDcArr;
primaryDcArr.push_back(primaryDcObj);
StatusObject remoteObj;
remoteObj["id"] = "1";
remoteObj["priority"] = 1;
StatusObject remoteDcObj;
remoteDcObj["id"] = "1";
remoteDcObj["priority"] = 1;
StatusArray remoteDcArr;
remoteDcArr.push_back(remoteDcObj);
bool needsRemote = generateFearless;
if(generateFearless) {
StatusObject primarySatelliteObj;
primarySatelliteObj["id"] = "2";
primarySatelliteObj["priority"] = 1;
StatusArray primarySatellitesArr;
primarySatellitesArr.push_back(primarySatelliteObj);
primaryObj["satellites"] = primarySatellitesArr;
primarySatelliteObj["satellite"] = 1;
primaryDcArr.push_back(primarySatelliteObj);
StatusObject remoteSatelliteObj;
remoteSatelliteObj["id"] = "3";
remoteSatelliteObj["priority"] = 1;
StatusArray remoteSatellitesArr;
remoteSatellitesArr.push_back(remoteSatelliteObj);
remoteObj["satellites"] = remoteSatellitesArr;
remoteSatelliteObj["satellite"] = 1;
remoteDcArr.push_back(remoteSatelliteObj);
int satellite_replication_type = g_random->randomInt(0,5);
switch (satellite_replication_type) {
@ -823,7 +827,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
primaryObj["satellite_logs"] = logs;
remoteObj["satellite_logs"] = logs;
}
int remote_replication_type = g_random->randomInt(0,5);
switch (remote_replication_type) {
case 0: {
@ -858,6 +862,9 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
if (g_random->random01() < 0.25) db.remoteDesiredTLogCount = g_random->randomInt(1,7);
}
primaryObj["datacenters"] = primaryDcArr;
remoteObj["datacenters"] = remoteDcArr;
StatusArray regionArr;
regionArr.push_back(primaryObj);
if(needsRemote || g_random->random01() < 0.5) {
@ -866,8 +873,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication) {
set_config("regions=" + json_spirit::write_string(json_spirit::mValue(regionArr), json_spirit::Output_options::none));
}
if(generateFearless && minimumReplication > 1) {
if(generateFearless && minimumReplication > 1) {
//low latency tests in fearless configurations need 4 machines per datacenter (3 for triple replication, 1 that is down during failures).
machine_count = 16;
} else if(generateFearless) {

View File

@ -1068,8 +1068,9 @@ ACTOR static Future<StatusObject> latencyProbeFetcher(Database cx, StatusArray *
return statusObj;
}
ACTOR static Future<Optional<DatabaseConfiguration>> loadConfiguration(Database cx, StatusArray *messages, std::set<std::string> *status_incomplete_reasons){
ACTOR static Future<std::pair<Optional<DatabaseConfiguration>,Optional<bool>>> loadConfiguration(Database cx, StatusArray *messages, std::set<std::string> *status_incomplete_reasons){
state Optional<DatabaseConfiguration> result;
state Optional<bool> fullReplication;
state Transaction tr(cx);
state Future<Void> getConfTimeout = delay(5.0);
@ -1090,7 +1091,34 @@ ACTOR static Future<Optional<DatabaseConfiguration>> loadConfiguration(Database
result = configuration;
}
when(Void _ = wait(getConfTimeout)) {
messages->push_back(makeMessage("unreadable_configuration", "Unable to read database configuration."));
if(!result.present()) {
messages->push_back(makeMessage("unreadable_configuration", "Unable to read database configuration."));
} else {
messages->push_back(makeMessage("full_replication_timeout", "Unable to read datacenter replicas."));
}
break;
}
}
ASSERT(result.present());
state std::vector<Future<Optional<Value>>> replicasFutures;
for(auto& region : result.get().regions) {
replicasFutures.push_back(tr.get(datacenterReplicasKeyFor(region.dcId)));
}
choose {
when( Void _ = wait( waitForAll(replicasFutures) ) ) {
int unreplicated = 0;
for(int i = 0; i < result.get().regions.size(); i++) {
if( !replicasFutures[i].get().present() || decodeDatacenterReplicasValue(replicasFutures[i].get().get()) < result.get().storageTeamSize ) {
unreplicated++;
}
}
fullReplication = (!unreplicated || (result.get().remoteTLogReplicationFactor == 0 && unreplicated < result.get().regions.size()));
}
when(Void _ = wait(getConfTimeout)) {
messages->push_back(makeMessage("full_replication_timeout", "Unable to read datacenter replicas."));
}
}
break;
@ -1099,7 +1127,7 @@ ACTOR static Future<Optional<DatabaseConfiguration>> loadConfiguration(Database
Void _ = wait(tr.onError(e));
}
}
return result;
return std::make_pair(result, fullReplication);
}
static StatusObject configurationFetcher(Optional<DatabaseConfiguration> conf, ServerCoordinators coordinators, std::set<std::string> *incomplete_reasons) {
@ -1283,17 +1311,35 @@ ACTOR static Future<vector<std::pair<TLogInterface, std::string>>> getTLogsAndMe
return results;
}
static std::set<StringRef> getTLogEligibleMachines(vector<std::pair<WorkerInterface, ProcessClass>> workers, DatabaseConfiguration configuration) {
std::set<StringRef> tlogEligibleMachines;
static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, ProcessClass>> workers, DatabaseConfiguration configuration) {
std::set<StringRef> allMachines;
std::map<Key,std::set<StringRef>> dcId_machine;
for(auto worker : workers) {
if(worker.second.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign
&& !configuration.isExcludedServer(worker.first.address()))
{
tlogEligibleMachines.insert(worker.first.locality.zoneId().get());
allMachines.insert(worker.first.locality.zoneId().get());
if(worker.first.locality.dcId().present()) {
dcId_machine[worker.first.locality.dcId().get()].insert(worker.first.locality.zoneId().get());
}
}
}
return tlogEligibleMachines;
if(configuration.regions.size() == 0) {
return allMachines.size() - std::max( configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) );
}
int extraTlogEligibleMachines = std::numeric_limits<int>::max();
for(auto& region : configuration.regions) {
extraTlogEligibleMachines = std::min<int>( extraTlogEligibleMachines, dcId_machine[region.dcId].size() - std::max( configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) ) );
if(region.satelliteTLogReplicationFactor > 0) {
int totalSatelliteEligible = 0;
for(auto& sat : region.satellites) {
totalSatelliteEligible += dcId_machine[sat.dcId].size();
}
extraTlogEligibleMachines = std::min<int>( extraTlogEligibleMachines, totalSatelliteEligible - region.satelliteTLogReplicationFactor );
}
}
return extraTlogEligibleMachines;
}
ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker,
@ -1465,12 +1511,20 @@ static StatusArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<
}
}
maxFaultTolerance = std::max(maxFaultTolerance, it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs);
//FIXME: add information for remote and satellites
if(i==0) {
if(it.tLogs[i].isLocal && it.tLogs[i].locality == tagLocalitySatellite) {
statusObj["satellite_log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["satellite_log_write_anti_quorum"] = it.tLogs[i].tLogWriteAntiQuorum;
statusObj["satellite_log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs;
}
else if(it.tLogs[i].isLocal) {
statusObj["log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["log_write_anti_quorum"] = it.tLogs[i].tLogWriteAntiQuorum;
statusObj["log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - it.tLogs[i].tLogWriteAntiQuorum - failedLogs;
}
else {
statusObj["remote_log_replication_factor"] = it.tLogs[i].tLogReplicationFactor;
statusObj["remote_log_fault_tolerance"] = it.tLogs[i].tLogReplicationFactor - 1 - failedLogs;
}
}
*oldLogFaultTolerance = std::min(*oldLogFaultTolerance, maxFaultTolerance);
statusObj["logs"] = logsObj;
@ -1487,7 +1541,7 @@ static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configurat
static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, std::vector<std::pair<WorkerInterface, ProcessClass>>& workers, int numTLogEligibleMachines, int minReplicasRemaining) {
*/
static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, std::vector<std::pair<WorkerInterface, ProcessClass>>& workers, int numTLogEligibleMachines, int minReplicasRemaining) {
static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, std::vector<std::pair<WorkerInterface, ProcessClass>>& workers, int extraTlogEligibleMachines, int minReplicasRemaining) {
StatusObject statusObj;
// without losing data
@ -1528,7 +1582,7 @@ static StatusObject faultToleranceStatusFetcher(DatabaseConfiguration configurat
statusObj["max_machine_failures_without_losing_data"] = std::max(machineFailuresWithoutLosingData, 0);
// without losing availablity
statusObj["max_machine_failures_without_losing_availability"] = std::max(std::min(numTLogEligibleMachines - configuration.minMachinesRequiredPerDatacenter(), machineFailuresWithoutLosingData), 0);
statusObj["max_machine_failures_without_losing_availability"] = std::max(std::min(extraTlogEligibleMachines, machineFailuresWithoutLosingData), 0);
return statusObj;
}
@ -1696,7 +1750,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
ClientVersionMap clientVersionMap,
std::map<NetworkAddress, std::string> traceLogGroupMap,
ServerCoordinators coordinators,
std::vector<NetworkAddress> incompatibleConnections )
std::vector<NetworkAddress> incompatibleConnections,
Version datacenterVersionDifference )
{
// since we no longer offer multi-database support, all databases must be named DB
state std::string dbName = "DB";
@ -1776,11 +1831,17 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["protocol_version"] = format("%llx", currentProtocolVersion);
state Optional<DatabaseConfiguration> configuration = Optional<DatabaseConfiguration>();
state Optional<DatabaseConfiguration> configuration;
state Optional<bool> fullReplication;
if(!(recoveryStateStatus.count("name") && recoveryStateStatus["name"] == RecoveryStatus::names[RecoveryStatus::configuration_missing])) {
Optional<DatabaseConfiguration> _configuration = wait(loadConfiguration(cx, &messages, &status_incomplete_reasons));
configuration = _configuration;
std::pair<Optional<DatabaseConfiguration>,Optional<bool>> loadResults = wait(loadConfiguration(cx, &messages, &status_incomplete_reasons));
configuration = loadResults.first;
fullReplication = loadResults.second;
}
if(fullReplication.present()) {
statusObj["full_replication"] = fullReplication.get();
}
statusObj["machines"] = machineStatusFetcher(mMetrics, workers, configuration, &status_incomplete_reasons);
@ -1817,8 +1878,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
if(configuration.present()) {
std::set<StringRef> tlogEligibleMachines = getTLogEligibleMachines(workers, configuration.get());
statusObj["fault_tolerance"] = faultToleranceStatusFetcher(configuration.get(), coordinators, workers, tlogEligibleMachines.size(), minReplicasRemaining);
int extraTlogEligibleMachines = getExtraTLogEligibleMachines(workers, configuration.get());
statusObj["fault_tolerance"] = faultToleranceStatusFetcher(configuration.get(), coordinators, workers, extraTlogEligibleMachines, minReplicasRemaining);
}
StatusObject configObj = configurationFetcher(configuration, coordinators, &status_incomplete_reasons);
@ -1880,6 +1941,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
incompatibleConnectionsArray.push_back(it.toString());
}
statusObj["incompatible_connections"] = incompatibleConnectionsArray;
statusObj["datacenter_version_difference"] = datacenterVersionDifference;
if (!recoveryStateStatus.empty())
statusObj["recovery_state"] = recoveryStateStatus;

View File

@ -32,6 +32,7 @@ typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > Clie
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract );
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections );
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap,
ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );
#endif

View File

@ -1621,8 +1621,10 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
if (fFormat.get().present() && !persistFormatReadableRange.contains( fFormat.get().get() )) {
//FIXME: remove when we no longer need to test upgrades from 4.X releases
TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0);
flushAndExit(0);
if(g_network->isSimulated()) {
TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0);
flushAndExit(0);
}
TraceEvent(SevError, "UnsupportedDBFormat", self->dbgid).detail("Format", printable(fFormat.get().get())).detail("Expected", persistFormat.value.toString());
throw worker_recovery_failed();
@ -1917,7 +1919,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
self->newLogData.trigger();
if(req.isPrimary && !logData->stopped && logData->unrecoveredBefore <= req.recoverAt) {
if(req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalityInvalid) {
if(req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalitySatellite) {
logData->logRouterPopToVersion = req.recoverAt;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);

View File

@ -150,7 +150,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->updateLocalitySet();
@ -176,7 +175,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogData.tLogPolicy;
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->hasBestPolicy = tLogData.hasBestPolicy;
logSet->locality = tLogData.locality;
logSet->startVersion = tLogData.startVersion;
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -211,7 +209,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -237,7 +234,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBestPolicy = tLogSet.hasBestPolicy;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
@ -272,7 +268,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBestPolicy = t->hasBestPolicy;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
newState.tLogs.push_back(coreSet);
@ -294,7 +289,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBestPolicy = t->hasBestPolicy;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
newState.oldTLogData[i].tLogs.push_back(coreSet);
@ -414,17 +408,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {
int bestSet = -1;
int nextBestSet = -1;
int bestSet = 0;
std::vector<Reference<LogSet>> localSets;
Version lastBegin = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
lastBegin = std::max(lastBegin, log->startVersion);
localSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
}
}
@ -434,17 +426,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekAllCurrentOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", bestSet >= 0 ? localSets[bestSet]->logServerString() : "no best set");
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, end, parallelGetMore ) );
TraceEvent("TLogPeekAllCurrentOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localSets[bestSet]->logServerString());
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, end, parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
if(lastBegin < end) {
TraceEvent("TLogPeekAllAddingCurrent", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", bestSet >= 0 ? localSets[bestSet]->logServerString() : "no best set");
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, lastBegin, end, parallelGetMore)) );
TraceEvent("TLogPeekAllAddingCurrent", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localSets[bestSet]->logServerString());
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore)) );
}
int i = 0;
while(begin < lastBegin) {
@ -460,17 +450,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
int bestOldSet = -1;
int nextBestOldSet = -1;
int bestOldSet = 0;
std::vector<Reference<LogSet>> localOldSets;
Version thisBegin = begin;
for(auto& log : oldLogData[i].tLogs) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
if(log->isLocal && log->logServers.size() && (log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || log->locality == tag.locality || tag.locality < 0)) {
thisBegin = std::max(thisBegin, log->startVersion);
localOldSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestOldSet = localOldSets.size()-1;
nextBestOldSet = bestOldSet;
}
}
}
@ -486,10 +474,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(thisBegin < lastBegin) {
if(thisBegin < end) {
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end)
.detail("BestLogs", bestOldSet >= 0 ? localOldSets[bestOldSet]->logServerString() : "no best set").detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin);
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet == -1 ? nextBestOldSet : bestOldSet,
bestOldSet >= 0 ? localOldSets[bestOldSet]->bestLocationFor( tag ) : -1, tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localOldSets[bestOldSet]->logServerString()).detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin);
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
}
lastBegin = thisBegin;
@ -596,11 +582,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Reference<IPeekCursor> peekLocal( UID dbgid, Tag tag, Version begin, Version end ) {
ASSERT(tag.locality >= 0 || tag.locality == tagLocalityUpgraded);
int bestSet = -1;
bool foundSpecial = false;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->logServers.size() && tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
if( tLogs[t]->locality == tagLocalitySpecial ) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == tag.locality)) {
if( tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
bestSet = t;
@ -633,8 +621,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial ) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == tag.locality)) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
bestOldSet = t;
@ -709,27 +697,24 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if( found ) {
if(stopped) {
int bestSet = -1;
int nextBestSet = -1;
std::vector<Reference<LogSet>> localSets;
int bestSet = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterLocalSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size() - 1;
}
}
}
TraceEvent("TLogPeekLogRouterSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), false ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, getPeekEnd(), false ) );
} else {
for( auto& log : tLogs ) {
if( log->logServers.size() && log->isLocal && log->hasBestPolicy ) {
if(log->logServers.size() && log->isLocal && log->locality != tagLocalitySatellite) {
TraceEvent("TLogPeekLogRouterBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogId", log->logServers[log->bestLocationFor( tag )]->get().id());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( log->logServers[log->bestLocationFor( tag )], tag, begin, getPeekEnd(), false, false ) );
}
@ -751,24 +736,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if( found ) {
int bestSet = -1;
int nextBestSet = -1;
int bestSet = 0;
std::vector<Reference<LogSet>> localSets;
for(auto& log : old.tLogs) {
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterOldLocalSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LogServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy) {
if(log->locality != tagLocalitySatellite) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
}
}
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("OldEpoch", old.epochEnd).detail("PreviousEpochEndVersion", previousEpochEndVersion.present() ? previousEpochEndVersion.get() : -1).detail("FirstOld", firstOld);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, firstOld && previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : old.epochEnd, false ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, firstOld && previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : old.epochEnd, false ) );
}
firstOld = false;
}
@ -812,12 +794,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
ASSERT(popLocality == tagLocalityInvalid);
for(auto& t : tLogs) {
for(auto& log : t->logServers) {
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
if (prev < upTo)
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
if (prev == 0)
actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
if(t->locality == tagLocalitySpecial || t->locality == tag.locality || tag.locality < 0) {
for(auto& log : t->logServers) {
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)].first;
if (prev < upTo)
outstandingPops[std::make_pair(log->get().id(),tag)] = std::make_pair(upTo, knownCommittedVersion);
if (prev == 0)
actors.add( popFromLog( this, log, tag, 1.0 ) ); //< FIXME: knob
}
}
}
}
@ -924,10 +908,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return waitForAll(lockResults);
}
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) {
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) {
// Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch
// is only provisional until the caller updates the coordinated DBCoreState
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags );
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags, recruitmentStalled );
}
virtual LogSystemConfig getLogSystemConfig() {
@ -948,7 +932,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBestPolicy = logSet->hasBestPolicy;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
@ -975,7 +958,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBestPolicy = logSet->hasBestPolicy;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
@ -1211,7 +1193,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = coreSet.tLogPolicy;
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->hasBestPolicy = coreSet.hasBestPolicy;
logSet->locality = coreSet.locality;
logSet->startVersion = coreSet.startVersion;
logFailed.push_back(failed);
@ -1235,7 +1216,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogPolicy = log.tLogPolicy;
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->hasBestPolicy = log.hasBestPolicy;
logSet->locality = log.locality;
logSet->startVersion = log.startVersion;
}
@ -1327,7 +1307,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
ACTOR static Future<Void> recruitOldLogRouters( TagPartitionedLogSystem* self, vector<WorkerInterface> workers, LogEpoch recoveryCount, int8_t locality, Version startVersion,
std::vector<LocalityData> tLogLocalities, IRepPolicyRef tLogPolicy, int32_t hasBestPolicy, bool forRemote ) {
std::vector<LocalityData> tLogLocalities, IRepPolicyRef tLogPolicy, bool forRemote ) {
state vector<vector<Future<TLogInterface>>> logRouterInitializationReplies;
state vector<Future<TLogInterface>> allReplies;
int nextRouter = 0;
@ -1377,7 +1357,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = lastStart;
req.tLogLocalities = tLogLocalities;
req.tLogPolicy = tLogPolicy;
req.hasBestPolicy = hasBestPolicy;
req.locality = locality;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
@ -1427,7 +1406,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = lastStart;
req.tLogLocalities = tLogLocalities;
req.tLogPolicy = tLogPolicy;
req.hasBestPolicy = hasBestPolicy;
req.locality = locality;
auto reply = transformErrors( throwErrorOr( workers[nextRouter].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() );
logRouterInitializationReplies.back().push_back( reply );
@ -1504,7 +1482,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSet->tLogPolicy = configuration.remoteTLogPolicy;
logSet->isLocal = false;
logSet->hasBestPolicy = HasBestPolicyId;
logSet->locality = remoteLocality;
logSet->startVersion = oldLogSystem->knownCommittedVersion + 1;
@ -1532,7 +1509,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Future<Void> oldRouterRecruitment = Void();
if(logSet->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(self, remoteWorkers.logRouters, recoveryCount, remoteLocality, logSet->startVersion, localities, logSet->tLogPolicy, logSet->hasBestPolicy, true);
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(self, remoteWorkers.logRouters, recoveryCount, remoteLocality, logSet->startVersion, localities, logSet->tLogPolicy, true);
}
state vector<Future<TLogInterface>> logRouterInitializationReplies;
@ -1543,11 +1520,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.startVersion = std::max(self->tLogs[0]->startVersion, logSet->startVersion);
req.tLogLocalities = localities;
req.tLogPolicy = logSet->tLogPolicy;
req.hasBestPolicy = logSet->hasBestPolicy;
req.locality = remoteLocality;
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
std::vector<Tag> localTags;
for(auto& tag : allTags) {
if(remoteLocality == tagLocalitySpecial || remoteLocality == tag.locality || tag.locality < 0) {
localTags.push_back(tag);
}
}
state vector<Future<TLogInterface>> remoteTLogInitializationReplies;
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
@ -1561,7 +1544,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.remoteTag = Tag(tagLocalityRemoteLog, i);
req.locality = remoteLocality;
req.isPrimary = false;
req.allTags = allTags;
req.allTags = localTags;
req.startVersion = logSet->startVersion;
req.logRouterTags = 0;
}
@ -1595,7 +1578,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Void();
}
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags ) {
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount,
int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags, Reference<AsyncVar<bool>> recruitmentStalled ) {
state double startTime = now();
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
logSystem->logSystemType = 2;
@ -1609,7 +1593,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
logSystem->tLogs[0]->isLocal = true;
logSystem->tLogs[0]->hasBestPolicy = HasBestPolicyId;
logSystem->tLogs[0]->locality = primaryLocality;
state RegionInfo region = configuration.getRegion(recr.dcId);
@ -1620,8 +1603,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactor;
logSystem->tLogs[1]->tLogPolicy = region.satelliteTLogPolicy;
logSystem->tLogs[1]->isLocal = true;
logSystem->tLogs[1]->hasBestPolicy = HasBestPolicyNone;
logSystem->tLogs[1]->locality = tagLocalityInvalid;
logSystem->tLogs[1]->locality = tagLocalitySatellite;
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
logSystem->expectedLogSets++;
}
@ -1651,6 +1633,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(oldLogSystem->lockResults[lockNum].isCurrent && oldLogSystem->lockResults[lockNum].logSet->isLocal) {
break;
}
state Future<Void> stalledAfter = setAfter(recruitmentStalled, SERVER_KNOBS->MAX_RECOVERY_TIME, true);
loop {
auto versions = TagPartitionedLogSystem::getDurableVersion(logSystem->dbgid, oldLogSystem->lockResults[lockNum]);
if(versions.present()) {
@ -1659,6 +1642,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
Void _ = wait( TagPartitionedLogSystem::getDurableVersionChanged(oldLogSystem->lockResults[lockNum]) );
}
stalledAfter.cancel();
break;
}
lockNum++;
@ -1673,14 +1657,30 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Future<Void> oldRouterRecruitment = Never();
TraceEvent("NewEpochStartVersion", oldLogSystem->getDebugID()).detail("StartVersion", logSystem->tLogs[0]->startVersion).detail("EpochEnd", oldLogSystem->knownCommittedVersion + 1).detail("Locality", primaryLocality).detail("OldLogRouterTags", oldLogSystem->logRouterTags);
if(oldLogSystem->logRouterTags > 0 || logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, logSystem->tLogs[0]->hasBestPolicy, false);
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, false);
if(oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion > SERVER_KNOBS->MAX_RECOVERY_VERSIONS) {
//make sure we can recover in the other DC.
for(auto& lockResult : oldLogSystem->lockResults) {
if(lockResult.logSet->locality == remoteLocality) {
if( TagPartitionedLogSystem::getDurableVersion(logSystem->dbgid, lockResult).present() ) {
recruitmentStalled->set(true);
}
}
}
}
} else {
oldLogSystem->logSystemConfigChanged.trigger();
}
std::vector<Tag> localTags;
for(auto& tag : allTags) {
if(primaryLocality == tagLocalitySpecial || primaryLocality == tag.locality || tag.locality < 0) {
localTags.push_back(tag);
}
}
state vector<Future<TLogInterface>> initializationReplies;
vector< InitializeTLogRequest > reqs( recr.tLogs.size() );
for( int i = 0; i < recr.tLogs.size(); i++ ) {
InitializeTLogRequest &req = reqs[i];
req.recruitmentID = logSystem->recruitmentID;
@ -1692,7 +1692,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.locality = primaryLocality;
req.remoteTag = Tag(tagLocalityRemoteLog, i);
req.isPrimary = true;
req.allTags = allTags;
req.allTags = localTags;
req.startVersion = logSystem->tLogs[0]->startVersion;
req.logRouterTags = logSystem->logRouterTags;
}
@ -1706,7 +1706,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
reqs[ logSystem->tLogs[0]->bestLocationFor( tag ) ].recoverTags.push_back( tag );
}
std::vector<int> locations;
for( Tag tag : allTags ) {
for( Tag tag : localTags ) {
locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
@ -1719,9 +1719,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state std::vector<Future<Void>> recoveryComplete;
if(region.satelliteTLogReplicationFactor > 0) {
std::vector<Tag> satelliteTags;
satelliteTags.push_back(txsTag);
state vector<Future<TLogInterface>> satelliteInitializationReplies;
vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() );
for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) {
InitializeTLogRequest &req = sreqs[i];
req.recruitmentID = logSystem->recruitmentID;
@ -1730,10 +1732,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.recoverAt = oldLogSystem->epochEndVersion.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
req.locality = tagLocalityInvalid;
req.locality = tagLocalitySatellite;
req.remoteTag = Tag();
req.isPrimary = true;
req.allTags = allTags;
req.allTags = satelliteTags;
req.startVersion = oldLogSystem->knownCommittedVersion + 1;
req.logRouterTags = logSystem->logRouterTags;
}

View File

@ -98,13 +98,12 @@ struct InitializeLogRouterRequest {
Version startVersion;
std::vector<LocalityData> tLogLocalities;
IRepPolicyRef tLogPolicy;
int32_t hasBestPolicy;
int8_t locality;
ReplyPromise<struct TLogInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
ar & recoveryCount & routerTag & startVersion & tLogLocalities & tLogPolicy & hasBestPolicy & locality & reply;
ar & recoveryCount & routerTag & startVersion & tLogLocalities & tLogPolicy & locality & reply;
}
};

View File

@ -219,6 +219,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::set<UID> resolverNeedingChanges;
PromiseStream<Future<Void>> addActor;
Reference<AsyncVar<bool>> recruitmentStalled;
MasterData(
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
@ -246,7 +247,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
txnStateStore(0),
memoryLimit(2e9),
addActor(addActor),
hasConfiguration(false)
hasConfiguration(false),
recruitmentStalled( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) )
{
}
~MasterData() { if(txnStateStore) txnStateStore->close(); }
@ -311,10 +313,10 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() ) ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
} else {
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags ) );
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
}
return Void();
@ -395,31 +397,31 @@ ACTOR Future<Void> updateLogsValue( Reference<MasterData> self, Database cx ) {
try {
Optional<Standalone<StringRef>> value = wait( tr.get(logsKey) );
ASSERT(value.present());
std::vector<OptionalInterface<TLogInterface>> logConf;
auto logs = decodeLogsValue(value.get());
for(auto& log : self->logSystem->getLogSystemConfig().tLogs) {
for(auto& tl : log.tLogs) {
logConf.push_back(tl);
}
std::set<UID> logIds;
for(auto& log : logs.first) {
logIds.insert(log.first);
}
bool match = (logs.first.size() == logConf.size());
if(match) {
for(int i = 0; i < logs.first.size(); i++) {
if(logs.first[i].first != logConf[i].id()) {
match = false;
bool found = false;
for(auto& logSet : self->logSystem->getLogSystemConfig().tLogs) {
for(auto& log : logSet.tLogs) {
if(logIds.count(log.id())) {
found = true;
break;
}
}
if(found) {
break;
}
}
if(!match) {
if(!found) {
TEST(true); //old master attempted to change logsKey
return Void();
}
//FIXME: include remote logs in the log key
tr.set(logsKey, self->logSystem->getLogsValue());
Void _ = wait( tr.commit() );
return Void();
@ -442,6 +444,7 @@ Future<Void> sendMasterRegistration( MasterData* self, LogSystemConfig const& lo
masterReq.registrationCount = ++self->registrationCount;
masterReq.priorCommittedLogServers = priorCommittedLogServers;
masterReq.recoveryState = self->recoveryState;
masterReq.recoveryStalled = self->recruitmentStalled->get();
return brokenPromiseToNever( self->clusterController.registerMaster.getReply( masterReq ) );
}
@ -704,7 +707,7 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
ACTOR Future<Void> triggerUpdates( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
loop {
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() );
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || self->cstate.fullyRecovered.getFuture() || self->recruitmentStalled->onChange() );
if(self->cstate.fullyRecovered.isSet())
return Void();
@ -1192,6 +1195,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
tr.set(recoveryCommitRequest.arena, backupVersionKey, backupVersionValue);
tr.set(recoveryCommitRequest.arena, coordinatorsKey, self->coordinators.ccf->getConnectionString().toString());
tr.set(recoveryCommitRequest.arena, logsKey, self->logSystem->getLogsValue());
tr.set(recoveryCommitRequest.arena, primaryDatacenterKey, self->myInterface.locality.dcId().present() ? self->myInterface.locality.dcId().get() : StringRef());
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore, NULL, NULL);
mmApplied = tr.mutations.size();

View File

@ -57,7 +57,7 @@ struct StatusWorkload : TestWorkload {
return Void();
}
virtual Future<Void> start(Database const& cx) {
//if (clientId != 0)
if (clientId != 0)
return Void();
Reference<Cluster> cluster = cx->cluster;
if (!cluster) {

View File

@ -739,6 +739,13 @@ Future<Void> delayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>>
}
}
ACTOR template <class T>
Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
Void _ = wait( delay( time ) );
var->set( val );
return Void();
}
Future<bool> allTrue( const std::vector<Future<bool>>& all );
Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long