diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index b566a6e78f..ee3e4a064e 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -356,6 +356,7 @@ struct DDQueueData { FlowLock startMoveKeysParallelismLock; FlowLock finishMoveKeysParallelismLock; + Reference fetchSourceLock; int activeRelocations; int queuedRelocations; @@ -422,7 +423,7 @@ struct DDQueueData { activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ), shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), distributorId( mid ), lock( lock ), cx( cx ), teamSize( teamSize ), singleRegionTeamSize( singleRegionTeamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), - finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), + finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), fetchSourceLock( new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM) ), lastLimited(lastLimited), suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar(false) ) {} void validate() { @@ -528,7 +529,7 @@ struct DDQueueData { } } - ACTOR Future getSourceServersForRange( Database cx, RelocateData input, PromiseStream output ) { + ACTOR Future getSourceServersForRange( Database cx, RelocateData input, PromiseStream output, Reference fetchLock ) { state std::set servers; state Transaction tr(cx); @@ -539,6 +540,9 @@ struct DDQueueData { wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) ); } + wait( fetchLock->take( TaskPriority::DataDistributionLaunch ) ); + state FlowLock::Releaser releaser( *fetchLock ); + loop { servers.clear(); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); @@ -678,7 +682,7 @@ struct DDQueueData { startRelocation(rrs.priority, rrs.healthPriority); fetchingSourcesQueue.insert( rrs ); - getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) ); + getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete, fetchSourceLock ) ); } else { RelocateData newData( rrs ); newData.keys = affectedQueuedItems[r]; diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 4c4e3e7246..7fa900e3f6 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -195,6 +195,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( DD_SHARD_SIZE_GRANULARITY, 5000000 ); init( DD_SHARD_SIZE_GRANULARITY_SIM, 500000 ); if( randomize && BUGGIFY ) DD_SHARD_SIZE_GRANULARITY_SIM = 0; init( DD_MOVE_KEYS_PARALLELISM, 15 ); if( randomize && BUGGIFY ) DD_MOVE_KEYS_PARALLELISM = 1; + init( DD_FETCH_SOURCE_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) DD_FETCH_SOURCE_PARALLELISM = 1; init( DD_MERGE_LIMIT, 2000 ); if( randomize && BUGGIFY ) DD_MERGE_LIMIT = 2; init( DD_SHARD_METRICS_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) DD_SHARD_METRICS_TIMEOUT = 0.1; init( DD_LOCATION_CACHE_SIZE, 2000000 ); if( randomize && BUGGIFY ) DD_LOCATION_CACHE_SIZE = 3; diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index f006621612..c33255b8b0 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -160,6 +160,7 @@ public: int64_t DD_SHARD_SIZE_GRANULARITY; int64_t DD_SHARD_SIZE_GRANULARITY_SIM; int DD_MOVE_KEYS_PARALLELISM; + int DD_FETCH_SOURCE_PARALLELISM; int DD_MERGE_LIMIT; double DD_SHARD_METRICS_TIMEOUT; int64_t DD_LOCATION_CACHE_SIZE;