ratekeeper does not control on remote storage servers
This commit is contained in:
parent
b79feaddd3
commit
0bdd25df23
|
@ -484,7 +484,7 @@ Future<Void> storageServerTracker(
|
|||
MoveKeysLock const& lock,
|
||||
UID const& masterId,
|
||||
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& changes,
|
||||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& changes,
|
||||
Promise<Void> const& errorOut,
|
||||
Version const& addedVersion);
|
||||
|
||||
|
@ -513,7 +513,7 @@ struct DDTeamCollection {
|
|||
PromiseStream<UID> removedServers;
|
||||
std::set<UID> recruitingIds; // The IDs of the SS which are being recruited
|
||||
std::set<NetworkAddress> recruitingLocalities;
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
|
||||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges;
|
||||
Future<Void> initialFailureReactionDelay;
|
||||
Future<Void> initializationDoneActor;
|
||||
Promise<Void> serverTrackerErrorOut;
|
||||
|
@ -544,7 +544,7 @@ struct DDTeamCollection {
|
|||
DatabaseConfiguration configuration,
|
||||
std::vector<Optional<Key>> includedDCs,
|
||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
|
||||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& serverChanges,
|
||||
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
|
||||
Reference<AsyncVar<bool>> processingUnhealthy)
|
||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
|
||||
|
@ -1578,7 +1578,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
MoveKeysLock lock,
|
||||
UID masterId,
|
||||
std::map<UID, Reference<TCServerInfo>>* other_servers,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > changes,
|
||||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> changes,
|
||||
Promise<Void> errorOut,
|
||||
Version addedVersion)
|
||||
{
|
||||
|
@ -1593,7 +1593,9 @@ ACTOR Future<Void> storageServerTracker(
|
|||
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
|
||||
state bool hasWrongStoreTypeOrDC = false;
|
||||
|
||||
changes.send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||
if(changes.present()) {
|
||||
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||
}
|
||||
|
||||
try {
|
||||
loop {
|
||||
|
@ -1680,7 +1682,9 @@ ACTOR Future<Void> storageServerTracker(
|
|||
when( Void _ = wait( failureTracker ) ) {
|
||||
// The server is failed AND all data has been removed from it, so permanently remove it.
|
||||
TraceEvent("StatusMapChange", masterId).detail("ServerID", server->id).detail("Status", "Removing");
|
||||
changes.send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
|
||||
if(changes.present()) {
|
||||
changes.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
|
||||
}
|
||||
|
||||
// Remove server from FF/serverList
|
||||
Void _ = wait( removeStorageServer( cx, server->id, lock ) );
|
||||
|
@ -1699,7 +1703,9 @@ ACTOR Future<Void> storageServerTracker(
|
|||
server->lastKnownInterface = newInterface.first;
|
||||
server->lastKnownClass = newInterface.second;
|
||||
interfaceChanged = server->onInterfaceChanged;
|
||||
changes.send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||
if(changes.present()) {
|
||||
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||
}
|
||||
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location
|
||||
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
|
||||
|
||||
|
@ -1918,7 +1924,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
|||
DatabaseConfiguration configuration,
|
||||
std::vector<Optional<Key>> includedDCs,
|
||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges,
|
||||
Future<Void> readyToStart,
|
||||
Reference<AsyncVar<bool>> zeroHealthyTeams,
|
||||
bool primary,
|
||||
|
@ -2208,7 +2214,7 @@ ACTOR Future<Void> dataDistribution(
|
|||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
if (configuration.usableRegions > 1) {
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||
}
|
||||
|
||||
Void _ = wait( waitForAll( actors ) );
|
||||
|
|
Loading…
Reference in New Issue