Merge pull request #4495 from sfc-gh-etschannen/feature-fix-exlude-failed

Execute exclude failed commands after shutting down the rest of data distribution
This commit is contained in:
Markus Pilman 2021-03-23 13:59:00 -06:00 committed by GitHub
commit 35700f919f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 22 deletions

View File

@ -658,6 +658,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics;
Promise<UID> removeFailedServer;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
@ -695,7 +696,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<AsyncVar<bool>> zeroHealthyTeams,
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics)
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()),
@ -710,7 +712,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0),
getShardMetrics(getShardMetrics) {
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer) {
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
}
@ -4145,10 +4147,14 @@ ACTOR Future<Void> storageServerTracker(
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
wait(removeKeysFromFailedServer(cx, server->id, self->lock, ddEnabledState));
if (BUGGIFY)
wait(delay(5.0));
self->shardsAffectedByTeamFailure->eraseServer(server->id);
wait(delay(0.0)); //Do not throw an error while still inside trackExcludedServers
while (!ddEnabledState->isDDEnabled()) {
wait(delay(1.0));
}
if (self->removeFailedServer.canBeSet()) {
self->removeFailedServer.send(server->id);
}
throw movekeys_conflict();
}
}
@ -4944,6 +4950,7 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
}
}
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
const DDEnabledState* ddEnabledState) {
@ -4973,7 +4980,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
state Promise<UID> removeFailedServer;
try {
loop {
TraceEvent("DDInitTakingMoveKeysLock", self->ddId);
@ -5204,7 +5211,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[0],
true,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
remoteTeamCollection =
@ -5220,7 +5228,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams[1],
false,
processingUnhealthy,
getShardMetrics);
getShardMetrics,
removeFailedServer);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
@ -5252,14 +5261,23 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict)
TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err);
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
} else {
if (err.code() != error_code_movekeys_conflict) {
throw err;
}
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if (ddEnabled)
if (ddEnabled) {
throw err;
}
}
}
}
}
static std::set<int> const& normalDataDistributorErrors() {
@ -5682,7 +5700,8 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
makeReference<AsyncVar<bool>>(true),
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>()));
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
for (int id = 1; id <= processCount; ++id) {
UID uid(id, 0);
@ -5723,7 +5742,8 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
makeReference<AsyncVar<bool>>(true),
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>()));
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
for (int id = 1; id <= processCount; id++) {
UID uid(id, 0);

View File

@ -178,7 +178,6 @@ public:
void moveShard(KeyRangeRef keys, std::vector<Team> destinationTeam);
void finishMove(KeyRangeRef keys);
void check();
void eraseServer(UID ssID);
private:
struct OrderByTeamKey {

View File

@ -999,10 +999,6 @@ void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
}
}
void ShardsAffectedByTeamFailure::eraseServer(UID ssID) {
storageServerShards[ssID] = 0;
}
void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
if (team_shards.insert(std::pair<Team, KeyRange>(team, range)).second) {
for (auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)