The mountainChopper and valleyFiller only move larger than average shards, to avoid moving high bandwidth shards which are generally smaller.
This commit is contained in:
parent
cc4481b71a
commit
6b5e683de5
|
@ -1101,13 +1101,33 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
return false;
|
||||
}
|
||||
|
||||
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
Promise<int64_t> req;
|
||||
self->getAverageShardBytes.send( req );
|
||||
|
||||
state int64_t averageShardBytes = wait(req.getFuture());
|
||||
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
|
||||
if( !shards.size() )
|
||||
return false;
|
||||
|
||||
state KeyRange moveShard = deterministicRandom()->randomChoice( shards );
|
||||
StorageMetrics metrics = wait( brokenPromiseToNever( self->getShardMetrics.getReply(GetMetricsRequest(moveShard)) ) );
|
||||
state KeyRange moveShard;
|
||||
state StorageMetrics metrics;
|
||||
state int retries = 0;
|
||||
while(retries < 100) {
|
||||
state KeyRange testShard = deterministicRandom()->randomChoice( shards );
|
||||
StorageMetrics testMetrics = wait( brokenPromiseToNever( self->getShardMetrics.getReply(GetMetricsRequest(testShard)) ) );
|
||||
if(metrics.bytes >= averageShardBytes) {
|
||||
moveShard = testShard;
|
||||
metrics = testMetrics;
|
||||
break;
|
||||
}
|
||||
retries++;
|
||||
}
|
||||
|
||||
if(retries == 100) {
|
||||
TraceEvent(SevWarn, "CannotFindSmallShard", self->distributorId).detail("Src", sourceTeam->getDesc()).detail("AverageShardBytes", averageShardBytes).detail("Shards", shards.size());
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t sourceBytes = sourceTeam->getLoadBytes(false);
|
||||
int64_t destBytes = destTeam->getLoadBytes();
|
||||
|
@ -1123,6 +1143,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
.detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
.detail("AverageShardBytes", averageShardBytes)
|
||||
.detail("SourceTeam", sourceTeam->getDesc())
|
||||
.detail("DestTeam", destTeam->getDesc());
|
||||
|
||||
|
|
Loading…
Reference in New Issue