fix: we must yield when updating shardsAffectedByTeamFailure with the initial shards. A test with 1 million shards caused a 22 second slow task
This commit is contained in:
parent
8fc8aa0493
commit
2341e5d8ad
|
@ -2215,6 +2215,25 @@ ACTOR Future<Void> dataDistribution(
|
|||
state PromiseStream<GetMetricsRequest> getShardMetrics;
|
||||
state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) );
|
||||
state Promise<Void> readyToStart;
|
||||
state Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
|
||||
|
||||
state int shard = 0;
|
||||
for(; shard<initData->shards.size() - 1; shard++) {
|
||||
KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard+1].key);
|
||||
shardsAffectedByTeamFailure->defineShard(keys);
|
||||
std::vector<ShardsAffectedByTeamFailure::Team> teams;
|
||||
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true));
|
||||
if(configuration.usableRegions > 1) {
|
||||
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
|
||||
}
|
||||
shardsAffectedByTeamFailure->moveShard(keys, teams);
|
||||
if(initData->shards[shard].hasDest) {
|
||||
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
|
||||
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
|
||||
output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) );
|
||||
}
|
||||
Void _ = wait( yield(TaskDataDistribution) );
|
||||
}
|
||||
|
||||
vector<TeamCollectionInterface> tcis;
|
||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
|
||||
|
@ -2235,25 +2254,7 @@ ACTOR Future<Void> dataDistribution(
|
|||
anyZeroHealthyTeams = zeroHealthyTeams[0];
|
||||
}
|
||||
|
||||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
|
||||
actors.push_back(yieldPromiseStream(output.getFuture(), input));
|
||||
|
||||
for(int s=0; s<initData->shards.size() - 1; s++) {
|
||||
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
|
||||
shardsAffectedByTeamFailure->defineShard(keys);
|
||||
std::vector<ShardsAffectedByTeamFailure::Team> teams;
|
||||
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].primarySrc, true));
|
||||
if(configuration.usableRegions > 1) {
|
||||
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].remoteSrc, false));
|
||||
}
|
||||
shardsAffectedByTeamFailure->moveShard(keys, teams);
|
||||
if(initData->shards[s].hasDest) {
|
||||
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
|
||||
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
|
||||
output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) );
|
||||
}
|
||||
}
|
||||
|
||||
actors.push_back( pollMoveKeysLock(cx, lock) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
|
|
Loading…
Reference in New Issue