removed a separately configurable storage team size for the remote data center, because it did not make sense
fix: the master did not monitor for the failure of remote logs stop merge attempts when a data center is failed fixed a variety of other problems with data distribution when a data center is failed
This commit is contained in:
parent
766964ff48
commit
ebd94bb654
|
@ -155,25 +155,23 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
|
|||
}
|
||||
|
||||
std::string remote_redundancy, remote_log_replicas;
|
||||
IRepPolicyRef remoteStoragePolicy;
|
||||
IRepPolicyRef remoteTLogPolicy;
|
||||
bool remoteRedundancySpecified = true;
|
||||
if (mode == "remote_single") {
|
||||
remote_redundancy="1";
|
||||
remote_log_replicas="1";
|
||||
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyOne());
|
||||
remoteTLogPolicy = IRepPolicyRef(new PolicyOne());
|
||||
} else if(mode == "remote_double") {
|
||||
remote_redundancy="2";
|
||||
remote_log_replicas="2";
|
||||
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
} else if(mode == "remote_triple") {
|
||||
remote_redundancy="3";
|
||||
remote_log_replicas="3";
|
||||
remoteStoragePolicy = remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
} else if(mode == "remote_three_data_hall") {
|
||||
remote_redundancy="3";
|
||||
remote_log_replicas="4";
|
||||
remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne())));
|
||||
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall",
|
||||
IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne())))
|
||||
));
|
||||
|
@ -186,10 +184,6 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
|
|||
out[p+"log_routers"] = remote_log_replicas;
|
||||
|
||||
BinaryWriter policyWriter(IncludeVersion());
|
||||
serializeReplicationPolicy(policyWriter, remoteStoragePolicy);
|
||||
out[p+"remote_storage_policy"] = policyWriter.toStringRef().toString();
|
||||
|
||||
policyWriter = BinaryWriter(IncludeVersion());
|
||||
serializeReplicationPolicy(policyWriter, remoteTLogPolicy);
|
||||
out[p+"remote_log_policy"] = policyWriter.toStringRef().toString();
|
||||
return out;
|
||||
|
|
|
@ -1100,15 +1100,13 @@ public:
|
|||
bool remoteTLogsDead = tLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteProcessesDead, tLogPolicy, remoteLocalitiesLeft, tLogWriteAntiQuorum, false) : remoteProcessesDead.validate(tLogPolicy);
|
||||
|
||||
if(!hasSatelliteReplication) {
|
||||
tooManyDead = primaryTLogsDead || remoteTLogsDead ||
|
||||
( ( primaryProcessesDead.validate(storagePolicy) || primaryProcessesDead.validate(remoteStoragePolicy) ) && ( remoteProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(remoteStoragePolicy) ) );
|
||||
tooManyDead = primaryTLogsDead || remoteTLogsDead || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
|
||||
notEnoughLeft = ( !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) ) && ( !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) );
|
||||
} else {
|
||||
bool primarySatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, primarySatelliteProcessesDead, satelliteTLogPolicy, primarySatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : primarySatelliteProcessesDead.validate(satelliteTLogPolicy);
|
||||
bool remoteSatelliteTLogsDead = satelliteTLogWriteAntiQuorum ? !validateAllCombinations(badCombo, remoteSatelliteProcessesDead, satelliteTLogPolicy, remoteSatelliteLocalitiesLeft, satelliteTLogWriteAntiQuorum, false) : remoteSatelliteProcessesDead.validate(satelliteTLogPolicy);
|
||||
|
||||
tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) ||
|
||||
( ( primaryProcessesDead.validate(storagePolicy) || primaryProcessesDead.validate(remoteStoragePolicy) ) && ( remoteProcessesDead.validate(storagePolicy) || remoteProcessesDead.validate(remoteStoragePolicy) ) );
|
||||
tooManyDead = ( primaryTLogsDead && primarySatelliteTLogsDead ) || ( remoteTLogsDead && remoteSatelliteTLogsDead ) || ( primaryProcessesDead.validate(storagePolicy) && remoteProcessesDead.validate(storagePolicy) );
|
||||
notEnoughLeft = ( !primaryProcessesLeft.validate(tLogPolicy) || !primaryProcessesLeft.validate(storagePolicy) || !primarySatelliteProcessesLeft.validate(satelliteTLogPolicy) ) && ( !remoteProcessesLeft.validate(tLogPolicy) || !remoteProcessesLeft.validate(storagePolicy) || !remoteSatelliteProcessesLeft.validate(satelliteTLogPolicy) );
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,7 +292,6 @@ public:
|
|||
bool hasRemoteReplication;
|
||||
IRepPolicyRef remoteTLogPolicy;
|
||||
Optional<Standalone<StringRef>> remoteDcId;
|
||||
IRepPolicyRef remoteStoragePolicy;
|
||||
bool hasSatelliteReplication;
|
||||
IRepPolicyRef satelliteTLogPolicy;
|
||||
int32_t satelliteTLogWriteAntiQuorum;
|
||||
|
|
|
@ -632,7 +632,7 @@ public:
|
|||
result.storageServers.push_back(primaryStorageServers[i].first);
|
||||
|
||||
if(req.configuration.remoteTLogReplicationFactor > 0) {
|
||||
auto remoteStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.remoteStoragePolicy, result.remoteDcId );
|
||||
auto remoteStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.storagePolicy, result.remoteDcId );
|
||||
for(int i = 0; i < remoteStorageServers.size(); i++)
|
||||
result.storageServers.push_back(remoteStorageServers[i].first);
|
||||
}
|
||||
|
|
|
@ -118,6 +118,7 @@ public:
|
|||
v.push_back(servers[i]->lastKnownInterface);
|
||||
return v;
|
||||
}
|
||||
virtual int size() { return servers.size(); }
|
||||
virtual vector<UID> const& getServerIDs() { return serverIDs; }
|
||||
virtual void addDataInFlightToTeam( int64_t delta ) {
|
||||
for(int i=0; i<servers.size(); i++)
|
||||
|
@ -470,7 +471,7 @@ struct DDTeamCollection {
|
|||
Debouncer restartRecruiting;
|
||||
|
||||
int healthyTeamCount;
|
||||
AsyncVar<bool> zeroHealthyTeams;
|
||||
Reference<AsyncVar<bool>> zeroHealthyTeams;
|
||||
|
||||
int optimalTeamCount;
|
||||
AsyncVar<bool> zeroOptimalTeams;
|
||||
|
@ -487,12 +488,12 @@ struct DDTeamCollection {
|
|||
DatabaseConfiguration configuration,
|
||||
std::vector<Optional<Key>> includedDCs,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
|
||||
Future<Void> readyToStart )
|
||||
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams )
|
||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
|
||||
configuration(configuration), serverChanges(serverChanges),
|
||||
initialFailureReactionDelay( delay( BUGGIFY ? 0 : SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
|
||||
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
|
||||
unhealthyServers(0), includedDCs(includedDCs)
|
||||
unhealthyServers(0), includedDCs(includedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true)
|
||||
{
|
||||
TraceEvent("DDTrackerStarting", masterId)
|
||||
.detail( "State", "Inactive" )
|
||||
|
@ -673,28 +674,28 @@ struct DDTeamCollection {
|
|||
}
|
||||
}
|
||||
|
||||
if(!bestOption.present() && self->zeroHealthyTeams.get()) {
|
||||
if(!bestOption.present() && self->zeroHealthyTeams->get()) {
|
||||
//Attempt to find the unhealthy source server team and return it
|
||||
if(!sources.size()) {
|
||||
for( int i = 0; i < req.sources.size(); i++ ) {
|
||||
sources.insert( req.sources[i] );
|
||||
}
|
||||
std::set<UID> completeSources;
|
||||
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||
completeSources.insert( req.completeSources[i] );
|
||||
}
|
||||
|
||||
int bestSize = 0;
|
||||
for( int i = 0; i < req.sources.size(); i++ ) {
|
||||
if( self->server_info.count( req.sources[i] ) ) {
|
||||
auto& teamList = self->server_info[ req.sources[i] ]->teams;
|
||||
for( int j = 0; j < teamList.size(); j++ ) {
|
||||
bool found = true;
|
||||
for( int k = 0; k < teamList[j]->serverIDs.size(); k++ ) {
|
||||
if( !sources.count( teamList[j]->serverIDs[k] ) ) {
|
||||
if( !completeSources.count( teamList[j]->serverIDs[k] ) ) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(found) {
|
||||
if(found && teamList[j]->serverIDs.size() > bestSize) {
|
||||
bestOption = teamList[j];
|
||||
break;
|
||||
bestSize = teamList[j]->serverIDs.size();
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -1179,7 +1180,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
|
||||
if(lastHealthy) {
|
||||
self->healthyTeamCount++;
|
||||
self->zeroHealthyTeams.set(false);
|
||||
self->zeroHealthyTeams->set(false);
|
||||
}
|
||||
|
||||
if(lastOptimal) {
|
||||
|
@ -1187,7 +1188,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
self->zeroOptimalTeams.set(false);
|
||||
}
|
||||
|
||||
state bool lastZeroHealthy = self->zeroHealthyTeams.get();
|
||||
state bool lastZeroHealthy = self->zeroHealthyTeams->get();
|
||||
|
||||
TraceEvent("TeamTrackerStarting", self->masterId).detail("Reason", "Initial wait complete (sc)").detail("Team", team->getDesc());
|
||||
|
||||
|
@ -1217,11 +1218,11 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
|
||||
if( !self->initialFailureReactionDelay.isReady() )
|
||||
change.push_back( self->initialFailureReactionDelay );
|
||||
change.push_back( self->zeroHealthyTeams.onChange() );
|
||||
change.push_back( self->zeroHealthyTeams->onChange() );
|
||||
|
||||
bool recheck = (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams.get())) && (!matchesPolicy || anyUndesired || team->getServerIDs().size() != self->configuration.storageTeamSize);
|
||||
bool recheck = (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get())) && (!matchesPolicy || anyUndesired || team->getServerIDs().size() != self->configuration.storageTeamSize);
|
||||
lastReady = self->initialFailureReactionDelay.isReady();
|
||||
lastZeroHealthy = self->zeroHealthyTeams.get();
|
||||
lastZeroHealthy = self->zeroHealthyTeams->get();
|
||||
|
||||
if( serversLeft != lastServersLeft || anyUndesired != lastAnyUndesired || anyWrongConfiguration != lastWrongConfiguration || wrongSize || recheck ) {
|
||||
TraceEvent("TeamHealthChanged", self->masterId)
|
||||
|
@ -1248,7 +1249,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
self->healthyTeamCount += healthy ? 1 : -1;
|
||||
|
||||
ASSERT( self->healthyTeamCount >= 0 );
|
||||
self->zeroHealthyTeams.set(self->healthyTeamCount == 0);
|
||||
self->zeroHealthyTeams->set(self->healthyTeamCount == 0);
|
||||
|
||||
if( self->healthyTeamCount == 0 ) {
|
||||
TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->masterId).detail("SignallingTeam", team->getDesc());
|
||||
|
@ -1285,8 +1286,8 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
team->setPriority( PRIORITY_TEAM_HEALTHY );
|
||||
TraceEvent("TeamPriorityChange", self->masterId).detail("Priority", team->getPriority());
|
||||
|
||||
lastZeroHealthy = self->zeroHealthyTeams.get(); //set this again in case it changed from this teams health changing
|
||||
if( self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams.get() ) {
|
||||
lastZeroHealthy = self->zeroHealthyTeams->get(); //set this again in case it changed from this teams health changing
|
||||
if( self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get() ) {
|
||||
vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( team->getServerIDs() );
|
||||
|
||||
for(int i=0; i<shards.size(); i++) {
|
||||
|
@ -1349,7 +1350,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
|
|||
|
||||
if( self->healthyTeamCount == 0 ) {
|
||||
TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->masterId).detail("SignallingTeam", team->getDesc());
|
||||
self->zeroHealthyTeams.set(true);
|
||||
self->zeroHealthyTeams->set(true);
|
||||
}
|
||||
}
|
||||
throw;
|
||||
|
@ -1821,9 +1822,10 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
DatabaseConfiguration configuration,
|
||||
std::vector<Optional<Key>> includedDCs,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
Future<Void> readyToStart )
|
||||
Future<Void> readyToStart,
|
||||
Reference<AsyncVar<bool>> zeroHealthyTeams )
|
||||
{
|
||||
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, serverChanges, readyToStart );
|
||||
state DDTeamCollection self( cx, masterId, lock, output, shardsAffectedByTeamFailure, configuration, includedDCs, serverChanges, readyToStart, zeroHealthyTeams );
|
||||
state Future<Void> loggingTrigger = Void();
|
||||
state PromiseStream<Void> serverRemoved;
|
||||
state Future<Void> interfaceChanges;
|
||||
|
@ -1856,8 +1858,8 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
|
||||
self.restartRecruiting.trigger();
|
||||
}
|
||||
when( Void _ = wait( self.zeroHealthyTeams.onChange() ) ) {
|
||||
if(self.zeroHealthyTeams.get()) {
|
||||
when( Void _ = wait( self.zeroHealthyTeams->onChange() ) ) {
|
||||
if(self.zeroHealthyTeams->get()) {
|
||||
self.restartRecruiting.trigger();
|
||||
self.noHealthyTeams();
|
||||
}
|
||||
|
@ -2136,21 +2138,33 @@ ACTOR Future<Void> dataDistribution(
|
|||
state Promise<Void> readyToStart;
|
||||
|
||||
vector<TeamCollectionInterface> tcis;
|
||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
|
||||
vector<Reference<AsyncVar<bool>>> zeroHealthyTeams;
|
||||
tcis.push_back(TeamCollectionInterface());
|
||||
zeroHealthyTeams.push_back(Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ));
|
||||
int storageTeamSize = configuration.storageTeamSize;
|
||||
|
||||
vector<Future<Void>> actors;
|
||||
if (configuration.remoteTLogReplicationFactor > 0) {
|
||||
tcis.push_back(TeamCollectionInterface());
|
||||
storageTeamSize = 2*configuration.storageTeamSize;
|
||||
|
||||
zeroHealthyTeams.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ) );
|
||||
anyZeroHealthyTeams = Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) );
|
||||
actors.push_back( anyTrue(zeroHealthyTeams, anyZeroHealthyTeams) );
|
||||
} else {
|
||||
anyZeroHealthyTeams = zeroHealthyTeams[0];
|
||||
}
|
||||
|
||||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
|
||||
vector<Future<Void>> actors;
|
||||
|
||||
actors.push_back( pollMoveKeysLock(cx, lock) );
|
||||
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, configuration.storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, serverChanges, readyToStart.getFuture() ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, shardsAffectedByTeamFailure, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0] ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
if (configuration.remoteTLogReplicationFactor > 0) {
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcId, serverChanges, readyToStart.getFuture() ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcId, serverChanges, readyToStart.getFuture(), zeroHealthyTeams[1] ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll( actors ) );
|
||||
|
@ -2189,7 +2203,8 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
|
|||
conf,
|
||||
{},
|
||||
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>(),
|
||||
Future<Void>(Void())
|
||||
Future<Void>(Void()),
|
||||
Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) )
|
||||
);
|
||||
|
||||
for(int id = 1; id <= processCount; id++) {
|
||||
|
|
|
@ -61,6 +61,7 @@ enum {
|
|||
|
||||
struct IDataDistributionTeam {
|
||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() = 0;
|
||||
virtual int size() = 0;
|
||||
virtual vector<UID> const& getServerIDs() = 0;
|
||||
virtual void addDataInFlightToTeam( int64_t delta ) = 0;
|
||||
virtual int64_t getDataInFlightToTeam() = 0;
|
||||
|
@ -97,10 +98,11 @@ struct GetTeamRequest {
|
|||
bool preferLowerUtilization;
|
||||
double inflightPenalty;
|
||||
std::vector<UID> sources;
|
||||
std::vector<UID> completeSources;
|
||||
Promise< Optional< Reference<IDataDistributionTeam> > > reply;
|
||||
|
||||
GetTeamRequest() {}
|
||||
GetTeamRequest( bool wantsNewServers, bool wantsTrueBest, bool preferLowerUtilization, double inflightPenalty = 1.0 ) : wantsNewServers( wantsNewServers ), wantsTrueBest( wantsTrueBest ), preferLowerUtilization( preferLowerUtilization ), inflightPenalty(inflightPenalty) {}
|
||||
GetTeamRequest( bool wantsNewServers, bool wantsTrueBest, bool preferLowerUtilization, double inflightPenalty = 1.0 ) : wantsNewServers( wantsNewServers ), wantsTrueBest( wantsTrueBest ), preferLowerUtilization( preferLowerUtilization ), inflightPenalty( inflightPenalty ) {}
|
||||
};
|
||||
|
||||
struct GetMetricsRequest {
|
||||
|
@ -183,6 +185,7 @@ Future<Void> dataDistributionTracker(
|
|||
PromiseStream<GetMetricsRequest> const& getShardMetrics,
|
||||
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
|
||||
Promise<Void> const& readyToStart,
|
||||
Reference<AsyncVar<bool>> const& zeroHealthyTeams,
|
||||
UID const& masterId);
|
||||
|
||||
Future<Void> dataDistributionQueue(
|
||||
|
|
|
@ -39,6 +39,7 @@ struct RelocateData {
|
|||
UID randomId;
|
||||
int workFactor;
|
||||
std::vector<UID> src;
|
||||
std::vector<UID> completeSources;
|
||||
bool wantsNewServers;
|
||||
TraceInterval interval;
|
||||
|
||||
|
@ -55,7 +56,7 @@ struct RelocateData {
|
|||
}
|
||||
|
||||
bool operator== (const RelocateData& rhs) const {
|
||||
return priority == rhs.priority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
return priority == rhs.priority && keys == rhs.keys && startTime == rhs.startTime && workFactor == rhs.workFactor && src == rhs.src && completeSources == rhs.completeSources && wantsNewServers == rhs.wantsNewServers && randomId == rhs.randomId;
|
||||
}
|
||||
|
||||
bool changesBoundaries() {
|
||||
|
@ -120,6 +121,14 @@ public:
|
|||
});
|
||||
}
|
||||
|
||||
virtual int size() {
|
||||
int totalSize = 0;
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
totalSize += (*it)->size();
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
virtual vector<UID> const& getServerIDs() {
|
||||
tempServerIDs.clear();
|
||||
for (auto it = teams.begin(); it != teams.end(); it++) {
|
||||
|
@ -336,7 +345,7 @@ struct DDQueueData {
|
|||
int bytesWritten;
|
||||
std::map<int, int> priority_relocations;
|
||||
int teamSize;
|
||||
int durableStorageQuorum;
|
||||
int durableStorageQuorumPerTeam;
|
||||
|
||||
std::map<UID, Busyness> busymap;
|
||||
|
||||
|
@ -361,11 +370,11 @@ struct DDQueueData {
|
|||
|
||||
DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections,
|
||||
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
|
||||
int teamSize, int durableStorageQuorum, PromiseStream<RelocateShard> input,
|
||||
int teamSize, int durableStorageQuorumPerTeam, PromiseStream<RelocateShard> input,
|
||||
PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited ) :
|
||||
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
|
||||
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ),
|
||||
cx( cx ), teamSize( teamSize ), durableStorageQuorum( durableStorageQuorum ), input( input ),
|
||||
cx( cx ), teamSize( teamSize ), durableStorageQuorumPerTeam( durableStorageQuorumPerTeam ), input( input ),
|
||||
getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
|
||||
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited) {}
|
||||
|
||||
|
@ -496,8 +505,19 @@ struct DDQueueData {
|
|||
vector<UID> src, dest;
|
||||
decodeKeyServersValue( keyServersEntries[shard].value, src, dest );
|
||||
ASSERT( src.size() );
|
||||
for( int i = 0; i < src.size(); i++ )
|
||||
for( int i = 0; i < src.size(); i++ ) {
|
||||
servers.insert( src[i] );
|
||||
}
|
||||
if(shard == 0) {
|
||||
input.completeSources = src;
|
||||
} else {
|
||||
for(int i = 0; i < input.completeSources.size(); i++) {
|
||||
if(std::find(src.begin(), src.end(), input.completeSources[i]) == src.end()) {
|
||||
std::swap(input.completeSources[i--], input.completeSources.back());
|
||||
input.completeSources.pop_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(servers.size() > 0);
|
||||
|
@ -815,6 +835,9 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
state bool signalledTransferComplete = false;
|
||||
state UID masterId = self->mi.id();
|
||||
state ParallelTCInfo destination;
|
||||
state ParallelTCInfo healthyDestinations;
|
||||
state bool anyHealthy = false;
|
||||
state int durableStorageQuorum = 0;
|
||||
|
||||
try {
|
||||
TraceEvent(relocateShardInterval.begin(), masterId)
|
||||
|
@ -829,8 +852,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
loop {
|
||||
state int tciIndex = 0;
|
||||
state bool foundTeams = true;
|
||||
state int healthyCount = 0;
|
||||
destination.clear();
|
||||
healthyDestinations.clear();
|
||||
anyHealthy = false;
|
||||
durableStorageQuorum = 0;
|
||||
loop{
|
||||
if (tciIndex == self->teamCollections.size()) {
|
||||
break;
|
||||
|
@ -842,12 +867,17 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
|
||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||
req.sources = rd.src;
|
||||
req.completeSources = rd.completeSources;
|
||||
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
||||
if (bestTeam.present()) {
|
||||
if(bestTeam.get()->isHealthy()) {
|
||||
healthyCount++;
|
||||
}
|
||||
destination.addTeam(bestTeam.get());
|
||||
if(bestTeam.get()->isHealthy()) {
|
||||
healthyDestinations.addTeam(bestTeam.get());
|
||||
anyHealthy = true;
|
||||
durableStorageQuorum += self->durableStorageQuorumPerTeam;
|
||||
} else {
|
||||
durableStorageQuorum += bestTeam.get()->size();
|
||||
}
|
||||
}
|
||||
else {
|
||||
foundTeams = false;
|
||||
|
@ -855,7 +885,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
}
|
||||
tciIndex++;
|
||||
}
|
||||
if (foundTeams && healthyCount > 0) {
|
||||
if (foundTeams) {
|
||||
break;
|
||||
}
|
||||
TEST(true); //did not find a healthy destination team on the first attempt
|
||||
|
@ -870,7 +900,6 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
Void _ = wait( delay( SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskDataDistributionLaunch ) );
|
||||
}
|
||||
|
||||
ASSERT(destination.isHealthy()); // team failure tracking is edge triggered, so must never put something on an unhealthy team!
|
||||
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destination.getServerIDs());
|
||||
|
||||
//FIXME: do not add data in flight to servers that were already in the src.
|
||||
|
@ -883,12 +912,12 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
state Error error = success();
|
||||
state Promise<Void> dataMovementComplete;
|
||||
state Future<Void> doMoveKeys = moveKeys(
|
||||
self->cx, rd.keys, destination.getServerIDs(), self->lock,
|
||||
self->durableStorageQuorum, dataMovementComplete,
|
||||
self->cx, rd.keys, destination.getServerIDs(), healthyDestinations.getServerIDs(), self->lock,
|
||||
durableStorageQuorum, dataMovementComplete,
|
||||
&self->startMoveKeysParallelismLock,
|
||||
&self->finishMoveKeysParallelismLock,
|
||||
relocateShardInterval.pairID );
|
||||
state Future<Void> pollHealth = signalledTransferComplete ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskDataDistributionLaunch );
|
||||
state Future<Void> pollHealth = (!anyHealthy || signalledTransferComplete) ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskDataDistributionLaunch );
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
|
@ -897,7 +926,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
|||
break;
|
||||
}
|
||||
when( Void _ = wait( pollHealth ) ) {
|
||||
if (!destination.isHealthy()) {
|
||||
if (!healthyDestinations.isHealthy()) {
|
||||
if (!signalledTransferComplete) {
|
||||
signalledTransferComplete = true;
|
||||
self->dataTransferComplete.send(rd);
|
||||
|
|
|
@ -76,11 +76,12 @@ struct DataDistributionTracker {
|
|||
PromiseStream<RelocateShard> output;
|
||||
|
||||
Promise<Void> readyToStart;
|
||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
|
||||
|
||||
DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output)
|
||||
DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
|
||||
: cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
|
||||
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
|
||||
sizeChanges(false), readyToStart(readyToStart), output( output ) {}
|
||||
sizeChanges(false), readyToStart(readyToStart), output( output ), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
|
||||
|
||||
~DataDistributionTracker()
|
||||
{
|
||||
|
@ -286,7 +287,7 @@ struct HasBeenTrueFor : NonCopyable {
|
|||
Future<Void> set() {
|
||||
if( !trigger.isValid() ) {
|
||||
cleared = Promise<Void>();
|
||||
trigger = delay( enough, TaskDataDistribution - 1 ) || cleared.getFuture();
|
||||
trigger = delayJittered( enough, TaskDataDistribution - 1 ) || cleared.getFuture();
|
||||
}
|
||||
return trigger;
|
||||
}
|
||||
|
@ -487,14 +488,17 @@ ACTOR Future<Void> shardEvaluator(
|
|||
getBandwidthStatus( stats ) == BandwidthStatusLow;
|
||||
|
||||
// Every invocation must set this or clear it
|
||||
if (shouldMerge) {
|
||||
if(shouldMerge && !self->anyZeroHealthyTeams->get()) {
|
||||
auto whenLongEnough = wantsToMerge->set();
|
||||
if( !wantsToMerge->hasBeenTrueForLongEnough() ) {
|
||||
onChange = onChange || whenLongEnough;
|
||||
}
|
||||
}
|
||||
else
|
||||
} else {
|
||||
wantsToMerge->clear();
|
||||
if(shouldMerge) {
|
||||
onChange = onChange || self->anyZeroHealthyTeams->onChange();
|
||||
}
|
||||
}
|
||||
|
||||
/*TraceEvent("ShardEvaluator", self->masterId)
|
||||
.detail("TrackerId", trackerID)
|
||||
|
@ -502,7 +506,7 @@ ACTOR Future<Void> shardEvaluator(
|
|||
.detail("ShouldMerge", shouldMerge)
|
||||
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough());*/
|
||||
|
||||
if(wantsToMerge->hasBeenTrueForLongEnough()) {
|
||||
if(!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
|
||||
onChange = onChange || shardMerger( self, trackerID, keys, shardSize );
|
||||
}
|
||||
if( shouldSplit ) {
|
||||
|
@ -693,9 +697,10 @@ ACTOR Future<Void> dataDistributionTracker(
|
|||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
FutureStream<Promise<int64_t>> getAverageShardBytes,
|
||||
Promise<Void> readyToStart,
|
||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
||||
UID masterId)
|
||||
{
|
||||
state DataDistributionTracker self(cx, masterId, readyToStart, output);
|
||||
state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams);
|
||||
state Future<Void> loggingTrigger = Void();
|
||||
try {
|
||||
Void _ = wait( trackInitialShards( &self, initData, shardsAffectedByTeamFailure ) );
|
||||
|
|
|
@ -35,9 +35,9 @@ void DatabaseConfiguration::resetInternal() {
|
|||
autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS;
|
||||
autoDesiredTLogCount = CLIENT_KNOBS->DEFAULT_AUTO_LOGS;
|
||||
primaryDcId = remoteDcId = Optional<Standalone<StringRef>>();
|
||||
tLogPolicy = storagePolicy = remoteTLogPolicy = remoteStoragePolicy = satelliteTLogPolicy = IRepPolicyRef();
|
||||
tLogPolicy = storagePolicy = remoteTLogPolicy = satelliteTLogPolicy = IRepPolicyRef();
|
||||
|
||||
remoteDesiredTLogCount = remoteTLogReplicationFactor = remoteDurableStorageQuorum = remoteStorageTeamSize = satelliteDesiredTLogCount = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = logRouterCount = 0;
|
||||
remoteDesiredTLogCount = remoteTLogReplicationFactor = satelliteDesiredTLogCount = satelliteTLogReplicationFactor = satelliteTLogWriteAntiQuorum = satelliteTLogUsableDcs = logRouterCount = 0;
|
||||
primarySatelliteDcIds.clear();
|
||||
remoteSatelliteDcIds.clear();
|
||||
}
|
||||
|
@ -65,7 +65,6 @@ void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) {
|
|||
|
||||
void DatabaseConfiguration::setDefaultReplicationPolicy() {
|
||||
storagePolicy = IRepPolicyRef(new PolicyAcross(storageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
remoteStoragePolicy = IRepPolicyRef(new PolicyAcross(remoteStorageTeamSize, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
tLogPolicy = IRepPolicyRef(new PolicyAcross(tLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(remoteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(satelliteTLogReplicationFactor, "zoneid", IRepPolicyRef(new PolicyOne())));
|
||||
|
@ -90,9 +89,8 @@ bool DatabaseConfiguration::isValid() const {
|
|||
tLogPolicy &&
|
||||
remoteDesiredTLogCount >= 0 &&
|
||||
remoteTLogReplicationFactor >= 0 &&
|
||||
( remoteTLogReplicationFactor == 0 || ( remoteStoragePolicy && remoteTLogPolicy && primaryDcId.present() && remoteDcId.present() && remoteDurableStorageQuorum >= 1 && logRouterCount >= 1 ) ) &&
|
||||
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && primaryDcId.present() && remoteDcId.present() && logRouterCount >= 1 && durableStorageQuorum == storageTeamSize ) ) &&
|
||||
primaryDcId.present() == remoteDcId.present() &&
|
||||
remoteDurableStorageQuorum <= remoteStorageTeamSize &&
|
||||
satelliteDesiredTLogCount >= 0 &&
|
||||
satelliteTLogReplicationFactor >= 0 &&
|
||||
satelliteTLogWriteAntiQuorum >= 0 &&
|
||||
|
@ -175,16 +173,14 @@ std::map<std::string, std::string> DatabaseConfiguration::toMap() const {
|
|||
result["satellite_replication"] = format("%d", satelliteTLogReplicationFactor);
|
||||
}
|
||||
|
||||
if( remoteDurableStorageQuorum == remoteStorageTeamSize && remoteDurableStorageQuorum > 0) {
|
||||
if( remoteTLogReplicationFactor == 1 && remoteDurableStorageQuorum == 1 )
|
||||
result["remote_redundancy_mode"] = "remote_single";
|
||||
else if( remoteTLogReplicationFactor == 2 && remoteDurableStorageQuorum == 2 )
|
||||
result["remote_redundancy_mode"] = "remote_double";
|
||||
else if( remoteTLogReplicationFactor == 3 && remoteDurableStorageQuorum == 3 )
|
||||
result["remote_redundancy_mode"] = "remote_triple";
|
||||
else
|
||||
result["remote_redundancy_mode"] = "custom";
|
||||
}
|
||||
if( remoteTLogReplicationFactor == 1 )
|
||||
result["remote_redundancy_mode"] = "remote_single";
|
||||
else if( remoteTLogReplicationFactor == 2 )
|
||||
result["remote_redundancy_mode"] = "remote_double";
|
||||
else if( remoteTLogReplicationFactor == 3 )
|
||||
result["remote_redundancy_mode"] = "remote_triple";
|
||||
else if(remoteTLogReplicationFactor > 0)
|
||||
result["remote_redundancy_mode"] = "custom";
|
||||
|
||||
if( desiredTLogCount != -1 )
|
||||
result["logs"] = format("%d", desiredTLogCount);
|
||||
|
@ -239,10 +235,7 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("remote_logs")) parse(&remoteDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("remote_log_replicas")) parse(&remoteTLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("remote_log_policy")) parseReplicationPolicy(&remoteTLogPolicy, value);
|
||||
else if (ck == LiteralStringRef("remote_storage_policy")) parseReplicationPolicy(&remoteStoragePolicy, value);
|
||||
else if (ck == LiteralStringRef("satellite_log_policy")) parseReplicationPolicy(&satelliteTLogPolicy, value);
|
||||
else if (ck == LiteralStringRef("remote_storage_quorum")) parse(&remoteDurableStorageQuorum, value);
|
||||
else if (ck == LiteralStringRef("remote_storage_replicas")) parse(&remoteStorageTeamSize, value);
|
||||
else if (ck == LiteralStringRef("satellite_logs")) parse(&satelliteDesiredTLogCount, value);
|
||||
else if (ck == LiteralStringRef("satellite_log_replicas")) parse(&satelliteTLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("satellite_anti_quorum")) parse(&satelliteTLogWriteAntiQuorum, value);
|
||||
|
|
|
@ -87,11 +87,6 @@ struct DatabaseConfiguration {
|
|||
IRepPolicyRef remoteTLogPolicy;
|
||||
Optional<Standalone<StringRef>> remoteDcId;
|
||||
|
||||
// Remote Storage Servers
|
||||
IRepPolicyRef remoteStoragePolicy;
|
||||
int32_t remoteDurableStorageQuorum;
|
||||
int32_t remoteStorageTeamSize;
|
||||
|
||||
// Satellite TLogs
|
||||
IRepPolicyRef satelliteTLogPolicy;
|
||||
int32_t satelliteDesiredTLogCount;
|
||||
|
|
|
@ -261,9 +261,7 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
|
|||
src.push_back(uid.get());
|
||||
}
|
||||
}
|
||||
|
||||
std::sort( src.begin(), src.end() );
|
||||
src.resize( std::unique( src.begin(), src.end() ) - src.begin() );
|
||||
uniquify(src);
|
||||
|
||||
//Update dest servers for this range to be equal to servers
|
||||
krmSetPreviouslyEmptyRange( &tr, keyServersPrefix, rangeIntersectKeys, keyServersValue(src, servers), old[i+1].value );
|
||||
|
@ -276,11 +274,8 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
|
|||
}
|
||||
|
||||
//Keep track of src shards so that we can preserve their values when we overwrite serverKeys
|
||||
std::set<UID> sources;
|
||||
for(auto s = src.begin(); s != src.end(); ++s)
|
||||
sources.insert(*s);
|
||||
for(auto s = sources.begin(); s != sources.end(); ++s) {
|
||||
shardMap[*s].push_back(old.arena(), rangeIntersectKeys);
|
||||
for(auto& uid : src) {
|
||||
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
|
||||
/*TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId)
|
||||
.detail("Server", *s);*/
|
||||
}
|
||||
|
@ -458,6 +453,7 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
state std::set<UID> allServers;
|
||||
state std::set<UID> intendedTeam(destinationTeam.begin(), destinationTeam.end());
|
||||
state vector<UID> src;
|
||||
vector<UID> completeSrc;
|
||||
|
||||
//Iterate through the beginning of keyServers until we find one that hasn't already been processed
|
||||
int currentIndex;
|
||||
|
@ -465,12 +461,25 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
decodeKeyServersValue( keyServers[currentIndex].value, src, dest );
|
||||
|
||||
std::set<UID> srcSet;
|
||||
for(int s = 0; s < src.size(); s++)
|
||||
for(int s = 0; s < src.size(); s++) {
|
||||
srcSet.insert(src[s]);
|
||||
}
|
||||
|
||||
if(currentIndex == 0) {
|
||||
completeSrc = src;
|
||||
} else {
|
||||
for(int i = 0; i < completeSrc.size(); i++) {
|
||||
if(!srcSet.count(completeSrc[i])) {
|
||||
std::swap(completeSrc[i--], completeSrc.back());
|
||||
completeSrc.pop_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::set<UID> destSet;
|
||||
for(int s = 0; s < dest.size(); s++)
|
||||
for(int s = 0; s < dest.size(); s++) {
|
||||
destSet.insert(dest[s]);
|
||||
}
|
||||
|
||||
allServers.insert(srcSet.begin(), srcSet.end());
|
||||
allServers.insert(destSet.begin(), destSet.end());
|
||||
|
@ -509,6 +518,13 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
for(int s = 0; s < src2.size(); s++)
|
||||
srcSet.insert(src2[s]);
|
||||
|
||||
for(int i = 0; i < completeSrc.size(); i++) {
|
||||
if(!srcSet.count(completeSrc[i])) {
|
||||
std::swap(completeSrc[i--], completeSrc.back());
|
||||
completeSrc.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
allServers.insert(srcSet.begin(), srcSet.end());
|
||||
|
||||
alreadyMoved = dest2.empty() && srcSet == intendedTeam;
|
||||
|
@ -546,12 +562,19 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
// They must also have at least the transaction read version so they can't "forget" the shard between
|
||||
// now and when this transaction commits.
|
||||
state vector< Future<Void> > serverReady; // only for count below
|
||||
state vector<UID> newDestinations;
|
||||
std::set<UID> completeSrcSet(completeSrc.begin(), completeSrc.end());
|
||||
for(auto& it : dest) {
|
||||
if(!completeSrcSet.count(it)) {
|
||||
newDestinations.push_back(it);
|
||||
}
|
||||
}
|
||||
|
||||
// for smartQuorum
|
||||
state vector<StorageServerInterface> storageServerInterfaces;
|
||||
vector< Future< Optional<Value> > > serverListEntries;
|
||||
for(int s=0; s<dest.size(); s++)
|
||||
serverListEntries.push_back( tr.get( serverListKeyFor(dest[s]) ) );
|
||||
for(int s=0; s<newDestinations.size(); s++)
|
||||
serverListEntries.push_back( tr.get( serverListKeyFor(newDestinations[s]) ) );
|
||||
state vector<Optional<Value>> serverListValues = wait( getAll(serverListEntries) );
|
||||
|
||||
releaser.release();
|
||||
|
@ -559,16 +582,16 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
for(int s=0; s<serverListValues.size(); s++) {
|
||||
ASSERT( serverListValues[s].present() ); // There should always be server list entries for servers in keyServers
|
||||
auto si = decodeServerListValue(serverListValues[s].get());
|
||||
ASSERT( si.id() == dest[s] );
|
||||
ASSERT( si.id() == newDestinations[s] );
|
||||
storageServerInterfaces.push_back( si );
|
||||
}
|
||||
|
||||
for(int s=0; s<storageServerInterfaces.size(); s++)
|
||||
serverReady.push_back( waitForShardReady( storageServerInterfaces[s], keys, tr.getReadVersion().get(), GetShardStateRequest::READABLE) );
|
||||
Void _ = wait( timeout(
|
||||
smartQuorum( serverReady, durableStorageQuorum, SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, TaskMoveKeys ),
|
||||
smartQuorum( serverReady, std::max<int>(0, durableStorageQuorum - (dest.size() - newDestinations.size())), SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, TaskMoveKeys ),
|
||||
SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, Void(), TaskMoveKeys ) );
|
||||
int count = 0;
|
||||
int count = dest.size() - newDestinations.size();
|
||||
for(int s=0; s<serverReady.size(); s++)
|
||||
count += serverReady[s].isReady() && !serverReady[s].isError();
|
||||
|
||||
|
@ -589,9 +612,6 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
}
|
||||
|
||||
Void _ = wait(waitForAll(actors));
|
||||
|
||||
//printf(" fMK: committing\n");
|
||||
|
||||
Void _ = wait( tr.commit() );
|
||||
|
||||
begin = endKey;
|
||||
|
@ -617,8 +637,6 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
|
|||
TraceEvent(SevDebug, interval.end(), relocationIntervalId).error(e, true);
|
||||
throw;
|
||||
}
|
||||
//printf("Moved keys: ( '%s'-'%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -795,6 +813,7 @@ ACTOR Future<Void> moveKeys(
|
|||
Database cx,
|
||||
KeyRange keys,
|
||||
vector<UID> destinationTeam,
|
||||
vector<UID> healthyDestinations,
|
||||
MoveKeysLock lock,
|
||||
int durableStorageQuorum,
|
||||
Promise<Void> dataMovementComplete,
|
||||
|
@ -806,7 +825,7 @@ ACTOR Future<Void> moveKeys(
|
|||
std::sort( destinationTeam.begin(), destinationTeam.end() );
|
||||
Void _ = wait( startMoveKeys( cx, keys, destinationTeam, lock, durableStorageQuorum, startMoveKeysParallelismLock, relocationIntervalId ) );
|
||||
|
||||
state Future<Void> completionSignaller = checkFetchingState( cx, destinationTeam, keys, dataMovementComplete, relocationIntervalId );
|
||||
state Future<Void> completionSignaller = checkFetchingState( cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId );
|
||||
|
||||
Void _ = wait( finishMoveKeys( cx, keys, destinationTeam, lock, durableStorageQuorum, finishMoveKeysParallelismLock, relocationIntervalId ) );
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ Future<Void> moveKeys(
|
|||
Database const& occ,
|
||||
KeyRange const& keys,
|
||||
vector<UID> const& destinationTeam,
|
||||
vector<UID> const& healthyDestinations,
|
||||
MoveKeysLock const& lock,
|
||||
int const& durableStorageQuorum,
|
||||
Promise<Void> const& dataMovementComplete,
|
||||
|
|
|
@ -813,7 +813,6 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
g_simulator.hasRemoteReplication = simconfig.db.remoteTLogReplicationFactor > 0;
|
||||
g_simulator.remoteTLogPolicy = simconfig.db.remoteTLogPolicy;
|
||||
g_simulator.remoteDcId = simconfig.db.remoteDcId;
|
||||
g_simulator.remoteStoragePolicy = simconfig.db.remoteStoragePolicy;
|
||||
g_simulator.hasSatelliteReplication = simconfig.db.satelliteTLogReplicationFactor > 0;
|
||||
g_simulator.satelliteTLogPolicy = simconfig.db.satelliteTLogPolicy;
|
||||
g_simulator.satelliteTLogWriteAntiQuorum = simconfig.db.satelliteTLogWriteAntiQuorum;
|
||||
|
@ -821,7 +820,7 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
g_simulator.remoteSatelliteDcIds = simconfig.db.remoteSatelliteDcIds;
|
||||
|
||||
ASSERT(g_simulator.storagePolicy && g_simulator.tLogPolicy);
|
||||
ASSERT(!g_simulator.hasRemoteReplication || (g_simulator.remoteTLogPolicy && g_simulator.remoteStoragePolicy));
|
||||
ASSERT(!g_simulator.hasRemoteReplication || g_simulator.remoteTLogPolicy);
|
||||
ASSERT(!g_simulator.hasSatelliteReplication || g_simulator.satelliteTLogPolicy);
|
||||
TraceEvent("simulatorConfig").detail("ConfigString", startingConfigString);
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
Future<Void> remoteRecoveryComplete;
|
||||
bool recoveryCompleteWrittenToCoreState;
|
||||
bool remoteLogsWrittenToCoreState;
|
||||
bool hasRemoteServers;
|
||||
|
||||
Optional<Version> epochEndVersion;
|
||||
std::set< Tag > epochEndTags;
|
||||
|
@ -73,7 +74,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
ActorCollection actors;
|
||||
std::vector<OldLogData> oldLogData;
|
||||
|
||||
TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits<int>::max()), expectedLogSets(0) {}
|
||||
TagPartitionedLogSystem( UID dbgid, LocalityData locality ) : dbgid(dbgid), locality(locality), actors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), minRouters(std::numeric_limits<int>::max()), expectedLogSets(0), hasRemoteServers(false) {}
|
||||
|
||||
virtual void stopRejoins() {
|
||||
rejoins = Future<Void>();
|
||||
|
@ -294,25 +295,43 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
|
||||
virtual Future<Void> onError() {
|
||||
return onError_internal(this);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> onError_internal( TagPartitionedLogSystem* self ) {
|
||||
// Never returns normally, but throws an error if the subsystem stops working
|
||||
// FIXME: Run waitFailureClient on the master instead of these onFailedFor?
|
||||
vector<Future<Void>> failed;
|
||||
loop {
|
||||
vector<Future<Void>> failed;
|
||||
vector<Future<Void>> changes;
|
||||
|
||||
for( auto& it : tLogs ) {
|
||||
for(auto &t : it->logServers) {
|
||||
if( t->get().present() ) {
|
||||
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
|
||||
for( auto& it : self->tLogs ) {
|
||||
for(auto &t : it->logServers) {
|
||||
if( t->get().present() ) {
|
||||
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
|
||||
} else {
|
||||
changes.push_back(t->onChange());
|
||||
}
|
||||
}
|
||||
for(auto &t : it->logRouters) {
|
||||
if( t->get().present() ) {
|
||||
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
|
||||
} else {
|
||||
changes.push_back(t->onChange());
|
||||
}
|
||||
}
|
||||
}
|
||||
for(auto &t : it->logRouters) {
|
||||
if( t->get().present() ) {
|
||||
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
|
||||
}
|
||||
|
||||
if(self->hasRemoteServers && !self->remoteRecovery.isReady()) {
|
||||
changes.push_back(self->remoteRecovery);
|
||||
}
|
||||
|
||||
if(!changes.size()) {
|
||||
changes.push_back(Never()); //waiting on an empty vector will return immediately
|
||||
}
|
||||
|
||||
ASSERT( failed.size() >= 1 );
|
||||
Void _ = wait( quorum(changes, 1) || tagError<Void>( quorum( failed, 1 ), master_tlog_failed() ) || self->actors.getResult() );
|
||||
}
|
||||
|
||||
ASSERT( failed.size() >= 1 );
|
||||
return tagError<Void>( quorum( failed, 1 ), master_tlog_failed() ) || actors.getResult();
|
||||
}
|
||||
|
||||
virtual Future<Void> push( Version prevVersion, Version version, Version knownCommittedVersion, LogPushData& data, Optional<UID> debugID ) {
|
||||
|
@ -1200,8 +1219,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->recoveryComplete = waitForAll(recoveryComplete);
|
||||
|
||||
if(configuration.remoteTLogReplicationFactor > 0) {
|
||||
logSystem->hasRemoteServers = true;
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality);
|
||||
} else {
|
||||
logSystem->hasRemoteServers = false;
|
||||
logSystem->remoteRecovery = logSystem->recoveryComplete;
|
||||
logSystem->remoteRecoveryComplete = logSystem->recoveryComplete;
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ struct MoveKeysWorkload : TestWorkload {
|
|||
|
||||
try {
|
||||
state Promise<Void> signal;
|
||||
Void _ = wait( moveKeys( cx, keys, destinationTeamIDs, lock,
|
||||
Void _ = wait( moveKeys( cx, keys, destinationTeamIDs, destinationTeamIDs, lock,
|
||||
self->configuration.durableStorageQuorum,
|
||||
signal, &fl1, &fl2, relocateShardInterval.pairID ) );
|
||||
TraceEvent(relocateShardInterval.end()).detail("Result","Success");
|
||||
|
|
|
@ -30,6 +30,19 @@ ACTOR Future<bool> allTrue( std::vector<Future<bool>> all ) {
|
|||
return true;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> input, Reference<AsyncVar<bool>> output ) {
|
||||
loop {
|
||||
bool oneTrue = false;
|
||||
std::vector<Future<Void>> changes;
|
||||
for(auto it : input) {
|
||||
if( it->get() ) oneTrue = true;
|
||||
changes.push_back( it->onChange() );
|
||||
}
|
||||
output->set( oneTrue );
|
||||
Void _ = wait( waitForAny(changes) );
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> cancelOnly( std::vector<Future<Void>> futures ) {
|
||||
// We don't do anything with futures except hold them, we never return, but if we are cancelled we (naturally) drop the futures
|
||||
Void _ = wait( Never() );
|
||||
|
|
|
@ -726,6 +726,7 @@ void forwardVector( Future<V> values, std::vector<Promise<T>> out ) {
|
|||
}
|
||||
|
||||
Future<bool> allTrue( const std::vector<Future<bool>>& all );
|
||||
Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
|
||||
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
|
||||
Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
|
||||
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
|
||||
|
|
Loading…
Reference in New Issue