From 2341e5d8ade54ae0529c02e8356d30deced8b06a Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Mon, 13 Aug 2018 19:46:47 -0700 Subject: [PATCH] fix: we must yield when updating shardsAffectedByTeamFailure with the initial shards. A test with 1 million shards caused a 22 second slow task --- fdbserver/DataDistribution.actor.cpp | 37 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b4cdee0379..e1a81f922c 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -2215,6 +2215,25 @@ ACTOR Future dataDistribution( state PromiseStream getShardMetrics; state Reference> processingUnhealthy( new AsyncVar(false) ); state Promise readyToStart; + state Reference shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); + + state int shard = 0; + for(; shardshards.size() - 1; shard++) { + KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard+1].key); + shardsAffectedByTeamFailure->defineShard(keys); + std::vector teams; + teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true)); + if(configuration.usableRegions > 1) { + teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false)); + } + shardsAffectedByTeamFailure->moveShard(keys, teams); + if(initData->shards[shard].hasDest) { + // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in + // DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement. + output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) ); + } + Void _ = wait( yield(TaskDataDistribution) ); + } vector tcis; Reference> anyZeroHealthyTeams; @@ -2235,25 +2254,7 @@ ACTOR Future dataDistribution( anyZeroHealthyTeams = zeroHealthyTeams[0]; } - Reference shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); actors.push_back(yieldPromiseStream(output.getFuture(), input)); - - for(int s=0; sshards.size() - 1; s++) { - KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key); - shardsAffectedByTeamFailure->defineShard(keys); - std::vector teams; - teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].primarySrc, true)); - if(configuration.usableRegions > 1) { - teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].remoteSrc, false)); - } - shardsAffectedByTeamFailure->moveShard(keys, teams); - if(initData->shards[s].hasDest) { - // This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in - // DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement. - output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) ); - } - } - actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );