exclude failed shuts down data distribution while the server is being removed to avoid two processes making changes to the key servers at the same time

This commit is contained in:
Evan Tschannen 2021-03-15 10:43:06 -07:00
parent 6bed45fc6d
commit 4e4149b070
1 changed files with 19 additions and 9 deletions

View File

@ -658,6 +658,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics;
Promise<UID> removeFailedServer;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
@ -695,7 +696,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics)
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()),
@ -710,7 +712,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0),
getShardMetrics(getShardMetrics) {
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer) {
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
}
@ -4142,13 +4144,16 @@ ACTOR Future<Void> storageServerTracker(
status.isUndesired = true;
status.isWrongConfiguration = true;
if (worstStatus == DDTeamCollection::Status::FAILED) {
while (!ddEnabledState->isDDEnabled()) {
wait(delay(1.0));
}
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
wait(removeKeysFromFailedServer(cx, server->id, self->lock, ddEnabledState));
if (BUGGIFY)
wait(delay(5.0));
self->shardsAffectedByTeamFailure->eraseServer(server->id);
if (self->removeFailedServer.canBeSet()) {
self->removeFailedServer.send(server->id);
}
throw movekeys_conflict();
}
}
@ -4973,7 +4978,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
state Promise<UID> removeFailedServer;
try {
loop {
TraceEvent("DDInitTakingMoveKeysLock", self->ddId);
@ -5204,7 +5209,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[0],
true,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
remoteTeamCollection =
@ -5220,7 +5226,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[1],
false,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
@ -5252,6 +5259,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
wait(shards.clearAsync());
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
}
if (err.code() != error_code_movekeys_conflict)
throw err;
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));