Added a flow lock to prevent too many source server fetches from happening at the same time and running the data distributor out of memory
This commit is contained in:
parent
44975be5b2
commit
0e2f5e8bb5
|
@ -356,6 +356,7 @@ struct DDQueueData {
|
|||
|
||||
FlowLock startMoveKeysParallelismLock;
|
||||
FlowLock finishMoveKeysParallelismLock;
|
||||
Reference<FlowLock> fetchSourceLock;
|
||||
|
||||
int activeRelocations;
|
||||
int queuedRelocations;
|
||||
|
@ -422,7 +423,7 @@ struct DDQueueData {
|
|||
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
|
||||
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), distributorId( mid ), lock( lock ),
|
||||
cx( cx ), teamSize( teamSize ), singleRegionTeamSize( singleRegionTeamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
|
||||
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited),
|
||||
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), fetchSourceLock( new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM) ), lastLimited(lastLimited),
|
||||
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}
|
||||
|
||||
void validate() {
|
||||
|
@ -528,7 +529,7 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getSourceServersForRange( Database cx, RelocateData input, PromiseStream<RelocateData> output ) {
|
||||
ACTOR Future<Void> getSourceServersForRange( Database cx, RelocateData input, PromiseStream<RelocateData> output, Reference<FlowLock> fetchLock ) {
|
||||
state std::set<UID> servers;
|
||||
state Transaction tr(cx);
|
||||
|
||||
|
@ -539,6 +540,9 @@ struct DDQueueData {
|
|||
wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) );
|
||||
}
|
||||
|
||||
wait( fetchLock->take( TaskPriority::DataDistributionLaunch ) );
|
||||
state FlowLock::Releaser releaser( *fetchLock );
|
||||
|
||||
loop {
|
||||
servers.clear();
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
|
@ -678,7 +682,7 @@ struct DDQueueData {
|
|||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
|
||||
fetchingSourcesQueue.insert( rrs );
|
||||
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) );
|
||||
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete, fetchSourceLock ) );
|
||||
} else {
|
||||
RelocateData newData( rrs );
|
||||
newData.keys = affectedQueuedItems[r];
|
||||
|
|
|
@ -195,6 +195,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( DD_SHARD_SIZE_GRANULARITY, 5000000 );
|
||||
init( DD_SHARD_SIZE_GRANULARITY_SIM, 500000 ); if( randomize && BUGGIFY ) DD_SHARD_SIZE_GRANULARITY_SIM = 0;
|
||||
init( DD_MOVE_KEYS_PARALLELISM, 15 ); if( randomize && BUGGIFY ) DD_MOVE_KEYS_PARALLELISM = 1;
|
||||
init( DD_FETCH_SOURCE_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) DD_FETCH_SOURCE_PARALLELISM = 1;
|
||||
init( DD_MERGE_LIMIT, 2000 ); if( randomize && BUGGIFY ) DD_MERGE_LIMIT = 2;
|
||||
init( DD_SHARD_METRICS_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) DD_SHARD_METRICS_TIMEOUT = 0.1;
|
||||
init( DD_LOCATION_CACHE_SIZE, 2000000 ); if( randomize && BUGGIFY ) DD_LOCATION_CACHE_SIZE = 3;
|
||||
|
|
|
@ -160,6 +160,7 @@ public:
|
|||
int64_t DD_SHARD_SIZE_GRANULARITY;
|
||||
int64_t DD_SHARD_SIZE_GRANULARITY_SIM;
|
||||
int DD_MOVE_KEYS_PARALLELISM;
|
||||
int DD_FETCH_SOURCE_PARALLELISM;
|
||||
int DD_MERGE_LIMIT;
|
||||
double DD_SHARD_METRICS_TIMEOUT;
|
||||
int64_t DD_LOCATION_CACHE_SIZE;
|
||||
|
|
Loading…
Reference in New Issue