Merge branch 'master' into removing-old-dc-code

This commit is contained in:
Evan Tschannen 2017-06-26 16:27:10 -07:00 committed by GitHub Enterprise
parent f21b2874a8
commit 9fd5955e92
7 changed files with 26 additions and 72 deletions

View File

@ -87,7 +87,7 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
return out;
}
std::string redundancy, log_replicas, log_recovery_anti_quorum, dc="1", minDC="1";
std::string redundancy, log_replicas, log_recovery_anti_quorum;
IRepPolicyRef storagePolicy;
IRepPolicyRef tLogPolicy;
@ -121,10 +121,10 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
storagePolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
tLogPolicy = IRepPolicyRef(new PolicyAcross(4, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "two_datacenter") {
redundancy="3"; log_replicas="3"; log_recovery_anti_quorum="0"; dc="2"; minDC="1";
redundancy="3"; log_replicas="3"; log_recovery_anti_quorum="0";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
} else if(mode == "three_datacenter") {
redundancy="3"; log_replicas="3"; log_recovery_anti_quorum="0"; dc="3"; minDC="2";
redundancy="3"; log_replicas="3"; log_recovery_anti_quorum="0";
storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAnd({
IRepPolicyRef(new PolicyAcross(3, "dcid", IRepPolicyRef(new PolicyOne()))),
IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())))
@ -145,8 +145,6 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
out[p+"log_replicas"] = log_replicas;
out[p+"log_anti_quorum"] = "0";
out[p+"log_recovery_anti_quorum"] = log_recovery_anti_quorum;
out[p+"replica_datacenters"] = dc;
out[p+"min_replica_datacenters"] = minDC;
BinaryWriter policyWriter(IncludeVersion());
serializeReplicationPolicy(policyWriter, storagePolicy);
@ -214,9 +212,7 @@ ConfigurationResult::Type buildConfiguration( std::string const& configMode, std
bool isCompleteConfiguration( std::map<std::string, std::string> const& options ) {
std::string p = configKeysPrefix.toString();
return options.count( p+"min_replica_datacenters" ) == 1 &&
options.count( p+"replica_datacenters" ) == 1 &&
options.count( p+"log_replicas" ) == 1 &&
return options.count( p+"log_replicas" ) == 1 &&
options.count( p+"log_anti_quorum" ) == 1 &&
options.count( p+"storage_quorum" ) == 1 &&
options.count( p+"storage_replicas" ) == 1 &&

View File

@ -452,28 +452,23 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
ProcessClass::Fitness resolverFit;
int proxyCount;
int resolverCount;
int datacenters;
InDatacenterFitness( ProcessClass::Fitness proxyFit, ProcessClass::Fitness resolverFit, int proxyCount, int resolverCount, int datacenters)
: proxyFit(proxyFit), resolverFit(resolverFit), proxyCount(proxyCount), resolverCount(resolverCount), datacenters(datacenters) {}
InDatacenterFitness( ProcessClass::Fitness proxyFit, ProcessClass::Fitness resolverFit, int proxyCount, int resolverCount)
: proxyFit(proxyFit), resolverFit(resolverFit), proxyCount(proxyCount), resolverCount(resolverCount) {}
InDatacenterFitness() : proxyFit( ProcessClass::NeverAssign ), resolverFit( ProcessClass::NeverAssign ), datacenters(10000000) {}
InDatacenterFitness() : proxyFit( ProcessClass::NeverAssign ), resolverFit( ProcessClass::NeverAssign ) {}
InDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> proxies, vector<std::pair<WorkerInterface, ProcessClass>> resolvers ) {
std::set<Optional<Standalone<StringRef>>> dcs;
proxyFit = ProcessClass::BestFit;
resolverFit = ProcessClass::BestFit;
for(auto it: proxies) {
dcs.insert(it.first.locality.dcId());
proxyFit = std::max(proxyFit, it.second.machineClassFitness( ProcessClass::Proxy ));
}
for(auto it: resolvers) {
dcs.insert(it.first.locality.dcId());
resolverFit = std::max(resolverFit, it.second.machineClassFitness( ProcessClass::Resolver ));
}
proxyCount = proxies.size();
resolverCount = resolvers.size();
datacenters = dcs.size();
}
InDatacenterFitness( vector<MasterProxyInterface> proxies, vector<ResolverInterface> resolvers, vector<ProcessClass> proxyClasses, vector<ProcessClass> resolverClasses ) {
@ -491,12 +486,9 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
proxyCount = proxies.size();
resolverCount = resolvers.size();
datacenters = dcs.size();
}
bool operator < (InDatacenterFitness const& r) const {
if(datacenters != r.datacenters) return datacenters < r.datacenters;
int lmax = std::max(resolverFit,proxyFit);
int lmin = std::min(resolverFit,proxyFit);
int rmax = std::max(r.resolverFit,r.proxyFit);
@ -508,17 +500,16 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
return resolverCount > r.resolverCount;
}
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && datacenters == r.datacenters && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
struct AcrossDatacenterFitness {
ProcessClass::Fitness tlogFit;
int tlogCount;
int datacenters;
AcrossDatacenterFitness( ProcessClass::Fitness tlogFit, int tlogCount, int datacenters ) : tlogFit(tlogFit), tlogCount(tlogCount), datacenters(datacenters) {}
AcrossDatacenterFitness( ProcessClass::Fitness tlogFit, int tlogCount) : tlogFit(tlogFit), tlogCount(tlogCount) {}
AcrossDatacenterFitness() : tlogFit( ProcessClass::NeverAssign ), datacenters(0), tlogCount(0) {}
AcrossDatacenterFitness() : tlogFit( ProcessClass::NeverAssign ), tlogCount(0) {}
AcrossDatacenterFitness( vector<std::pair<WorkerInterface, ProcessClass>> tlogs ) {
std::set<Optional<Standalone<StringRef>>> dcs;
@ -527,7 +518,6 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
dcs.insert(it.first.locality.dcId());
tlogFit = std::max(tlogFit, it.second.machineClassFitness( ProcessClass::TLog ));
}
datacenters = dcs.size();
tlogCount = tlogs.size();
}
@ -539,17 +529,15 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
dcs.insert(tlogs[i].interf().locality.dcId());
tlogFit = std::max(tlogFit, processClasses[i].machineClassFitness( ProcessClass::TLog ));
}
datacenters = dcs.size();
tlogCount = tlogs.size();
}
bool operator < (AcrossDatacenterFitness const& r) const {
if(tlogFit != r.tlogFit) return tlogFit < r.tlogFit;
if(tlogCount != r.tlogCount) return tlogCount > r.tlogCount;
return datacenters > r.datacenters;
return tlogCount > r.tlogCount;
}
bool operator == (AcrossDatacenterFitness const& r) const { return datacenters == r.datacenters && tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
bool operator == (AcrossDatacenterFitness const& r) const { return tlogFit == r.tlogFit && tlogCount == r.tlogCount; }
};
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
@ -613,8 +601,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
.detail("desiredResolvers", req.configuration.getDesiredResolvers()).detail("actualResolvers", result.resolvers.size());
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), req.configuration.minDataCenters) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers(), 1) ) ) {
( AcrossDatacenterFitness(tlogs) > AcrossDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()) ||
bestFitness > InDatacenterFitness((ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_PROXY_FITNESS, (ProcessClass::Fitness)SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredProxies(), req.configuration.getDesiredResolvers()) ) ) {
throw operation_failed();
}
@ -697,10 +685,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
if(oldInFit < newInFit) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitD", oldAcrossFit.datacenters).detail("newAcrossFitD", newAcrossFit.datacenters)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
.detail("oldAcrossFitT", oldAcrossFit.tlogFit).detail("newAcrossFitT", newAcrossFit.tlogFit)
.detail("oldInFitD", oldInFit.datacenters).detail("newInFitD", newInFit.datacenters)
.detail("oldInFitP", oldInFit.proxyFit).detail("newInFitP", newInFit.proxyFit)
.detail("oldInFitR", oldInFit.resolverFit).detail("newInFitR", newInFit.resolverFit)
.detail("oldInFitPC", oldInFit.proxyCount).detail("newInFitPC", newInFit.proxyCount)

View File

@ -438,7 +438,6 @@ struct DDTeamCollection {
Database cx;
UID masterId;
int teamSize;
int minDataCenters, desiredDataCenters;
IRepPolicyRef replicationPolicy;
KeyValueStoreType storeType;
@ -481,13 +480,11 @@ struct DDTeamCollection {
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
int teamSize,
int minDataCenters,
int desiredDataCenters,
IRepPolicyRef replicationPolicy,
KeyValueStoreType storeType,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges )
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
teamSize( teamSize ), minDataCenters( minDataCenters ), desiredDataCenters( desiredDataCenters ), replicationPolicy(replicationPolicy), storeType( storeType ), serverChanges(serverChanges),
teamSize( teamSize ), replicationPolicy(replicationPolicy), storeType( storeType ), serverChanges(serverChanges),
initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
initializationDoneActor(logOnCompletion(initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
unhealthyServers(0)
@ -958,7 +955,6 @@ struct DDTeamCollection {
int serverCount = 0;
int uniqueDataCenters = 0;
int uniqueMachines = 0;
std::set<Optional<Standalone<StringRef>>> dataCenters;
std::set<Optional<Standalone<StringRef>>> machines;
for(auto i = self->server_info.begin(); i != self->server_info.end(); ++i) {
@ -966,15 +962,12 @@ struct DDTeamCollection {
++serverCount;
LocalityData& serverLocation = i->second->lastKnownInterface.locality;
machines.insert( serverLocation.zoneId() );
// Only add the datacenter if it's set or we don't care (less than two datacenters targeted)
if( serverLocation.dcId().present() || self->desiredDataCenters < 2 )
dataCenters.insert( serverLocation.dcId() );
}
}
uniqueMachines = machines.size();
// If there are too few machines to even build teams or there are too few represented datacenters, build no new teams
if( uniqueMachines >= self->teamSize && dataCenters.size() >= self->minDataCenters ) {
if( uniqueMachines >= self->teamSize ) {
desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER*serverCount;
// Count only properly sized teams against the desired number of teams. This is to prevent "emergency" merged teams (see MoveKeys)
@ -989,7 +982,7 @@ struct DDTeamCollection {
}
TraceEvent("BuildTeamsBegin", self->masterId).detail("DesiredTeams", desiredTeams).detail("UniqueMachines", uniqueMachines)
.detail("TeamSize", self->teamSize).detail("Servers", serverCount).detail("DataCenters", dataCenters.size())
.detail("TeamSize", self->teamSize).detail("Servers", serverCount)
.detail("CurrentTrackedTeams", self->teams.size()).detail("TeamCount", teamCount);
if( desiredTeams > teamCount ) {
@ -1767,14 +1760,13 @@ ACTOR Future<Void> dataDistributionTeamCollection(
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<RelocateShard> output,
UID masterId, int teamSize, int minDataCenters, int desiredDataCenters,
UID masterId, int teamSize,
IRepPolicyRef replicationPolicy,
KeyValueStoreType storeType,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
Future<Void> readyToStart )
{
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, minDataCenters,
desiredDataCenters, replicationPolicy, storeType, serverChanges );
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, teamSize, replicationPolicy, storeType, serverChanges );
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -2120,8 +2112,7 @@ ACTOR Future<Void> dataDistribution(
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tci, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, configuration.storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration.storageTeamSize, configuration.minDataCenters,
configuration.desiredDataCenters, configuration.storagePolicy, configuration.storageServerStoreType, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tci, cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration.storageTeamSize, configuration.storagePolicy, configuration.storageServerStoreType, serverChanges, readyToStart.getFuture() ), "DDTeamCollection", mi.id(), &normalDDQueueErrors() ) );
Void _ = wait( waitForAll( actors ) );
return Void();
@ -2153,8 +2144,6 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
PromiseStream<RelocateShard>(),
Reference<ShardsAffectedByTeamFailure>(new ShardsAffectedByTeamFailure()),
teamSize,
-1,
-1,
policy,
KeyValueStoreType(),
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>()

View File

@ -29,7 +29,6 @@ DatabaseConfiguration::DatabaseConfiguration()
void DatabaseConfiguration::resetInternal() {
// does NOT reset rawConfiguration
initialized = false;
minDataCenters = desiredDataCenters = -1;
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = durableStorageQuorum = storageTeamSize = -1;
tLogDataStoreType = storageServerStoreType = KeyValueStoreType::END;
autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES;
@ -56,9 +55,6 @@ void DatabaseConfiguration::setDefaultReplicationPolicy() {
bool DatabaseConfiguration::isValid() const {
return initialized &&
minDataCenters >= 1 &&
desiredDataCenters >= 1 &&
minDataCenters <= desiredDataCenters &&
tLogWriteAntiQuorum >= 0 &&
tLogReplicationFactor >= 1 &&
durableStorageQuorum >= 1 &&
@ -84,16 +80,12 @@ std::map<std::string, std::string> DatabaseConfiguration::toMap() const {
if( tLogReplicationFactor == durableStorageQuorum &&
durableStorageQuorum == storageTeamSize &&
tLogWriteAntiQuorum == 0 ) {
if( durableStorageQuorum == 1 && desiredDataCenters == 1 && minDataCenters == 1 )
if( durableStorageQuorum == 1 )
result["redundancy_mode"] = "single";
else if( durableStorageQuorum == 2 && desiredDataCenters == 1 && minDataCenters == 1 )
else if( durableStorageQuorum == 2 )
result["redundancy_mode"] = "double";
else if( durableStorageQuorum == 3 && desiredDataCenters == 1 && minDataCenters == 1 )
else if( durableStorageQuorum == 3 )
result["redundancy_mode"] = "triple";
else if( durableStorageQuorum == 3 && desiredDataCenters == 2 && minDataCenters == 1 )
result["redundancy_mode"] = "two_datacenter";
else if( durableStorageQuorum == 3 && desiredDataCenters == 3 && minDataCenters == 2 )
result["redundancy_mode"] = "three_datacenter";
else
result["redundancy_mode"] = "custom";
} else
@ -129,8 +121,6 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
int type;
if (ck == LiteralStringRef("initialized")) initialized = true;
else if (ck == LiteralStringRef("min_replica_datacenters")) parse(&minDataCenters, value);
else if (ck == LiteralStringRef("replica_datacenters")) parse(&desiredDataCenters, value);
else if (ck == LiteralStringRef("proxies")) parse(&masterProxyCount, value);
else if (ck == LiteralStringRef("resolvers")) parse(&resolverCount, value);
else if (ck == LiteralStringRef("logs")) parse(&desiredTLogCount, value);

View File

@ -43,14 +43,11 @@ struct DatabaseConfiguration {
std::string toString() const;
std::map<std::string, std::string> toMap() const;
// SOMEDAY: think about changing desiredDataCenters to minDataCenters
// SOMEDAY: think about changing storageTeamSize to durableStorageQuorum
int32_t minMachinesRequired() const { return std::max(tLogReplicationFactor, std::max(storageTeamSize, desiredDataCenters)); }
int32_t minMachinesRequired() const { return std::max(tLogReplicationFactor, storageTeamSize); }
int32_t maxMachineFailuresTolerated() const { return std::min(tLogReplicationFactor - 1 - tLogWriteAntiQuorum, durableStorageQuorum - 1); }
// Redundancy Levels
int32_t minDataCenters;
int32_t desiredDataCenters;
IRepPolicyRef storagePolicy;
// MasterProxy Servers

View File

@ -818,8 +818,8 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
g_simulator.desiredCoordinators = coordinatorCount;
g_simulator.killableMachines = startingConfig.maxMachineFailuresTolerated();
g_simulator.neededDatacenters = startingConfig.minDataCenters;
g_simulator.killableDatacenters = startingConfig.minDataCenters - 1;
g_simulator.neededDatacenters = 1;
g_simulator.killableDatacenters = 0;
g_simulator.physicalDatacenters = dataCenters;
g_simulator.maxCoordinatorsInDatacenter = ((coordinatorCount-1)/dataCenters) + 1;
g_simulator.machinesNeededForProgress = startingConfig.minMachinesRequired() + nonVersatileMachines;

View File

@ -263,10 +263,6 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, vector<StorageSer
req.criticalRecruitment = true;
for(auto s = servers->begin(); s != servers->end(); ++s)
req.excludeMachines.push_back(s->locality.zoneId());
if( dataCenters.size() < self->configuration.minDataCenters ) {
for(auto dc = dataCenters.begin(); dc != dataCenters.end(); ++dc)
req.excludeDCs.push_back(*dc);
}
TraceEvent("MasterRecruitingInitialStorageServer", self->dbgid)
.detail("ExcludingMachines", req.excludeMachines.size())