prevented a slow task when too many shards were sent to the data distribution queue after switching to a fearless deployment

This commit is contained in:
Evan Tschannen 2018-08-09 12:37:46 -07:00
parent 7f7755165c
commit 6f02ea843a
4 changed files with 49 additions and 36 deletions

View File

@ -1379,6 +1379,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
for(int i=0; i<shards.size(); i++) { for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority(); int maxPriority = team->getPriority();
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) { for( int t=0; t<teams.size(); t++) {
if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) { if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) {
@ -1398,6 +1399,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF
} }
} }
}
if( maxPriority == team->getPriority() || lastPriority > maxPriority ) { if( maxPriority == team->getPriority() || lastPriority > maxPriority ) {
RelocateShard rs; RelocateShard rs;
@ -2193,6 +2195,7 @@ ACTOR Future<Void> dataDistribution(
ASSERT(configuration.storageTeamSize > 0); ASSERT(configuration.storageTeamSize > 0);
state PromiseStream<RelocateShard> output; state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
state PromiseStream<Promise<int64_t>> getAverageShardBytes; state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<GetMetricsRequest> getShardMetrics; state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) ); state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) );
@ -2218,6 +2221,7 @@ ACTOR Future<Void> dataDistribution(
} }
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
actors.push_back(yieldPromiseStream(output.getFuture(), input));
for(int s=0; s<initData->shards.size() - 1; s++) { for(int s=0; s<initData->shards.size() - 1; s++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key); KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
@ -2236,8 +2240,8 @@ ACTOR Future<Void> dataDistribution(
} }
actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) { if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );

View File

@ -210,6 +210,7 @@ Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData, Reference<InitialDataDistribution> const& initData,
Database const& cx, Database const& cx,
PromiseStream<RelocateShard> const& output, PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics, PromiseStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<Promise<int64_t>> const& getAverageShardBytes, FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart, Promise<Void> const& readyToStart,
@ -218,7 +219,8 @@ Future<Void> dataDistributionTracker(
Future<Void> dataDistributionQueue( Future<Void> dataDistributionQueue(
Database const& cx, Database const& cx,
PromiseStream<RelocateShard> const& input, PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics, PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy, Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection, vector<TeamCollectionInterface> const& teamCollection,

View File

@ -362,7 +362,8 @@ struct DDQueueData {
PromiseStream<RelocateData> relocationComplete; PromiseStream<RelocateData> relocationComplete;
PromiseStream<RelocateData> fetchSourceServersComplete; PromiseStream<RelocateData> fetchSourceServersComplete;
PromiseStream<RelocateShard> input; PromiseStream<RelocateShard> output;
FutureStream<RelocateShard> input;
PromiseStream<GetMetricsRequest> getShardMetrics; PromiseStream<GetMetricsRequest> getShardMetrics;
double* lastLimited; double* lastLimited;
@ -393,10 +394,10 @@ struct DDQueueData {
DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections, DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes, Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
int teamSize, PromiseStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) : int teamSize, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) :
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ), activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ), shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ),
cx( cx ), teamSize( teamSize ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), cx( cx ), teamSize( teamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), recoveryVersion(recoveryVersion), finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), recoveryVersion(recoveryVersion),
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {} suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}
@ -569,10 +570,6 @@ struct DDQueueData {
//This function cannot handle relocation requests which split a shard into three pieces //This function cannot handle relocation requests which split a shard into three pieces
void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) { void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) {
// Update sabtf for changes from DDTracker
if( rd.changesBoundaries() )
shardsAffectedByTeamFailure->defineShard( rd.keys );
//TraceEvent("QueueRelocationBegin").detail("Begin", printable(rd.keys.begin)).detail("End", printable(rd.keys.end)); //TraceEvent("QueueRelocationBegin").detail("Begin", printable(rd.keys.begin)).detail("End", printable(rd.keys.end));
// remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten) // remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten)
@ -1086,7 +1083,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
.detail("SourceTeam", sourceTeam->getDesc()) .detail("SourceTeam", sourceTeam->getDesc())
.detail("DestTeam", destTeam->getDesc()); .detail("DestTeam", destTeam->getDesc());
self->input.send( RelocateShard( moveShard, priority ) ); self->output.send( RelocateShard( moveShard, priority ) );
return true; return true;
} }
} }
@ -1166,7 +1163,8 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
ACTOR Future<Void> dataDistributionQueue( ACTOR Future<Void> dataDistributionQueue(
Database cx, Database cx,
PromiseStream<RelocateShard> input, PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics, PromiseStream<GetMetricsRequest> getShardMetrics,
Reference<AsyncVar<bool>> processingUnhealthy, Reference<AsyncVar<bool>> processingUnhealthy,
std::vector<TeamCollectionInterface> teamCollections, std::vector<TeamCollectionInterface> teamCollections,
@ -1178,7 +1176,7 @@ ACTOR Future<Void> dataDistributionQueue(
double* lastLimited, double* lastLimited,
Version recoveryVersion) Version recoveryVersion)
{ {
state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, input, getShardMetrics, lastLimited, recoveryVersion ); state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited, recoveryVersion );
state std::set<UID> serversToLaunchFrom; state std::set<UID> serversToLaunchFrom;
state KeyRange keysToLaunchFrom; state KeyRange keysToLaunchFrom;
state RelocateData launchData; state RelocateData launchData;
@ -1213,7 +1211,7 @@ ACTOR Future<Void> dataDistributionQueue(
ASSERT( launchData.startTime == -1 && keysToLaunchFrom.empty() ); ASSERT( launchData.startTime == -1 && keysToLaunchFrom.empty() );
choose { choose {
when ( RelocateShard rs = waitNext( self.input.getFuture() ) ) { when ( RelocateShard rs = waitNext( self.input ) ) {
bool wasEmpty = serversToLaunchFrom.empty(); bool wasEmpty = serversToLaunchFrom.empty();
self.queueRelocation( rs, serversToLaunchFrom ); self.queueRelocation( rs, serversToLaunchFrom );
if(wasEmpty && !serversToLaunchFrom.empty()) if(wasEmpty && !serversToLaunchFrom.empty())

View File

@ -74,14 +74,15 @@ struct DataDistributionTracker {
// CapacityTracker // CapacityTracker
PromiseStream<RelocateShard> output; PromiseStream<RelocateShard> output;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
Promise<Void> readyToStart; Promise<Void> readyToStart;
Reference<AsyncVar<bool>> anyZeroHealthyTeams; Reference<AsyncVar<bool>> anyZeroHealthyTeams;
DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<AsyncVar<bool>> anyZeroHealthyTeams) DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ), : cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
maxShardSize( new AsyncVar<Optional<int64_t>>() ), maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), anyZeroHealthyTeams(anyZeroHealthyTeams) {} sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
~DataDistributionTracker() ~DataDistributionTracker()
{ {
@ -357,10 +358,16 @@ ACTOR Future<Void> shardSplitter(
for( int i = numShards-1; i > skipRange; i-- ) for( int i = numShards-1; i > skipRange; i-- )
restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) ); restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) );
for( int i = 0; i < skipRange; i++ ) for( int i = 0; i < skipRange; i++ ) {
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) ); KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
for( int i = numShards-1; i > skipRange; i-- ) self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) ); self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}
for( int i = numShards-1; i > skipRange; i-- ) {
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) ); self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
} else { } else {
@ -461,6 +468,7 @@ Future<Void> shardMerger(
.detail("TrackerID", trackerId); .detail("TrackerID", trackerId);
restartShardTrackers( self, mergeRange, endingStats ); restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) ); self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) );
// We are about to be cancelled by the call to restartShardTrackers // We are about to be cancelled by the call to restartShardTrackers
@ -661,13 +669,14 @@ ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData, Reference<InitialDataDistribution> initData,
Database cx, Database cx,
PromiseStream<RelocateShard> output, PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics, PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes, FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID masterId) UID masterId)
{ {
state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams); state DataDistributionTracker self(cx, masterId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
state Future<Void> loggingTrigger = Void(); state Future<Void> loggingTrigger = Void();
try { try {
Void _ = wait( trackInitialShards( &self, initData ) ); Void _ = wait( trackInitialShards( &self, initData ) );