Merge pull request #1908 from etschannen/feature-better-dd
A few data distribution improvements
This commit is contained in:
commit
a78a97f186
|
@ -1022,23 +1022,71 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MachineMaxTeams", maxMachineTeams);
|
||||
}
|
||||
|
||||
bool teamExists( vector<UID> &team ) {
|
||||
int overlappingMembers( vector<UID> &team ) {
|
||||
if (team.empty()) {
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int maxMatchingServers = 0;
|
||||
UID& serverID = team[0];
|
||||
for (auto& usedTeam : server_info[serverID]->teams) {
|
||||
if (team == usedTeam->getServerIDs()) {
|
||||
return true;
|
||||
auto used = usedTeam->getServerIDs();
|
||||
int teamIdx = 0;
|
||||
int usedIdx = 0;
|
||||
int matchingServers = 0;
|
||||
while(teamIdx < team.size() && usedIdx < used.size()) {
|
||||
if(team[teamIdx] == used[usedIdx]) {
|
||||
matchingServers++;
|
||||
teamIdx++;
|
||||
usedIdx++;
|
||||
} else if(team[teamIdx] < used[usedIdx]) {
|
||||
teamIdx++;
|
||||
} else {
|
||||
usedIdx++;
|
||||
}
|
||||
}
|
||||
ASSERT(matchingServers > 0);
|
||||
maxMatchingServers = std::max(maxMatchingServers, matchingServers);
|
||||
if(maxMatchingServers == team.size()) {
|
||||
return maxMatchingServers;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
return maxMatchingServers;
|
||||
}
|
||||
|
||||
// SOMEDAY: when machineTeams is changed from vector to set, we may check the existance faster
|
||||
bool machineTeamExists(vector<Standalone<StringRef>>& machineIDs) { return findMachineTeam(machineIDs).isValid(); }
|
||||
int overlappingMachineMembers( vector<Standalone<StringRef>>& team ) {
|
||||
if (team.empty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int maxMatchingServers = 0;
|
||||
Standalone<StringRef>& serverID = team[0];
|
||||
for (auto& usedTeam : machine_info[serverID]->machineTeams) {
|
||||
auto used = usedTeam->machineIDs;
|
||||
int teamIdx = 0;
|
||||
int usedIdx = 0;
|
||||
int matchingServers = 0;
|
||||
while(teamIdx < team.size() && usedIdx < used.size()) {
|
||||
if(team[teamIdx] == used[usedIdx]) {
|
||||
matchingServers++;
|
||||
teamIdx++;
|
||||
usedIdx++;
|
||||
} else if(team[teamIdx] < used[usedIdx]) {
|
||||
teamIdx++;
|
||||
} else {
|
||||
usedIdx++;
|
||||
}
|
||||
}
|
||||
ASSERT(matchingServers > 0);
|
||||
maxMatchingServers = std::max(maxMatchingServers, matchingServers);
|
||||
if(maxMatchingServers == team.size()) {
|
||||
return maxMatchingServers;
|
||||
}
|
||||
}
|
||||
|
||||
return maxMatchingServers;
|
||||
}
|
||||
|
||||
Reference<TCMachineTeamInfo> findMachineTeam(vector<Standalone<StringRef>>& machineIDs) {
|
||||
if (machineIDs.empty()) {
|
||||
|
@ -1421,10 +1469,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
ASSERT_WE_THINK(isMachineTeamHealthy(machineIDs));
|
||||
|
||||
std::sort(machineIDs.begin(), machineIDs.end());
|
||||
if (machineTeamExists(machineIDs)) {
|
||||
int overlap = overlappingMachineMembers(machineIDs);
|
||||
if (overlap == machineIDs.size()) {
|
||||
maxAttempts += 1;
|
||||
continue;
|
||||
}
|
||||
score += SERVER_KNOBS->DD_OVERLAP_PENALTY*overlap;
|
||||
|
||||
// SOMEDAY: randomly pick one from teams with the lowest score
|
||||
if (score < bestScore) {
|
||||
|
@ -1853,7 +1903,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
ASSERT(serverTeam.size() == configuration.storageTeamSize);
|
||||
|
||||
std::sort(serverTeam.begin(), serverTeam.end());
|
||||
if (teamExists(serverTeam)) {
|
||||
int overlap = overlappingMembers(serverTeam);
|
||||
if (overlap == serverTeam.size()) {
|
||||
maxAttempts += 1;
|
||||
continue;
|
||||
}
|
||||
|
@ -1861,7 +1912,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
// Pick the server team with smallest score in all attempts
|
||||
// If we use different metric here, DD may oscillate infinitely in creating and removing teams.
|
||||
// SOMEDAY: Improve the code efficiency by using reservoir algorithm
|
||||
int score = 0;
|
||||
int score = SERVER_KNOBS->DD_OVERLAP_PENALTY*overlap;
|
||||
for (auto& server : serverTeam) {
|
||||
score += server_info[server]->teams.size();
|
||||
}
|
||||
|
@ -2023,7 +2074,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MachineTeamCount", self->machineTeams.size())
|
||||
.detail("MachineCount", self->machine_info.size())
|
||||
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER);
|
||||
|
||||
|
||||
self->lastBuildTeamsFailed = false;
|
||||
if (teamsToBuild > 0 || self->notEnoughTeamsForAServer()) {
|
||||
state vector<std::vector<UID>> builtTeams;
|
||||
|
|
|
@ -1111,13 +1111,30 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
return false;
|
||||
}
|
||||
|
||||
std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
Promise<int64_t> req;
|
||||
self->getAverageShardBytes.send( req );
|
||||
|
||||
state int64_t averageShardBytes = wait(req.getFuture());
|
||||
state std::vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
|
||||
|
||||
if( !shards.size() )
|
||||
return false;
|
||||
|
||||
state KeyRange moveShard = deterministicRandom()->randomChoice( shards );
|
||||
StorageMetrics metrics = wait( brokenPromiseToNever( self->getShardMetrics.getReply(GetMetricsRequest(moveShard)) ) );
|
||||
state KeyRange moveShard;
|
||||
state StorageMetrics metrics;
|
||||
state int retries = 0;
|
||||
while(retries < SERVER_KNOBS->REBALANCE_MAX_RETRIES) {
|
||||
state KeyRange testShard = deterministicRandom()->randomChoice( shards );
|
||||
StorageMetrics testMetrics = wait( brokenPromiseToNever( self->getShardMetrics.getReply(GetMetricsRequest(testShard)) ) );
|
||||
if(testMetrics.bytes > metrics.bytes) {
|
||||
moveShard = testShard;
|
||||
metrics = testMetrics;
|
||||
if(metrics.bytes > averageShardBytes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
retries++;
|
||||
}
|
||||
|
||||
int64_t sourceBytes = sourceTeam->getLoadBytes(false);
|
||||
int64_t destBytes = destTeam->getLoadBytes();
|
||||
|
@ -1133,6 +1150,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
|
|||
.detail("SourceBytes", sourceBytes)
|
||||
.detail("DestBytes", destBytes)
|
||||
.detail("ShardBytes", metrics.bytes)
|
||||
.detail("AverageShardBytes", averageShardBytes)
|
||||
.detail("SourceTeam", sourceTeam->getDesc())
|
||||
.detail("DestTeam", destTeam->getDesc());
|
||||
|
||||
|
|
|
@ -182,6 +182,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( DEBOUNCE_RECRUITING_DELAY, 5.0 );
|
||||
init( DD_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) DD_FAILURE_TIME = 10.0;
|
||||
init( DD_ZERO_HEALTHY_TEAM_DELAY, 1.0 );
|
||||
init( REBALANCE_MAX_RETRIES, 100 );
|
||||
init( DD_OVERLAP_PENALTY, 10000 );
|
||||
|
||||
// TeamRemover
|
||||
TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
|
||||
|
|
|
@ -141,6 +141,8 @@ public:
|
|||
int64_t DD_LOCATION_CACHE_SIZE;
|
||||
double MOVEKEYS_LOCK_POLLING_DELAY;
|
||||
double DEBOUNCE_RECRUITING_DELAY;
|
||||
int REBALANCE_MAX_RETRIES;
|
||||
int DD_OVERLAP_PENALTY;
|
||||
|
||||
// TeamRemover to remove redundant teams
|
||||
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
|
||||
|
|
Loading…
Reference in New Issue