Data distribution no longer attempts to pick teams which share members of the source unless the team matches exactly
This commit is contained in:
parent
032797ca5c
commit
ab7071932f
|
@ -755,68 +755,45 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
||||||
|
|
||||||
int64_t bestLoadBytes = 0;
|
int64_t bestLoadBytes = 0;
|
||||||
Optional<Reference<IDataDistributionTeam>> bestOption;
|
Optional<Reference<IDataDistributionTeam>> bestOption;
|
||||||
std::vector<std::pair<int, Reference<IDataDistributionTeam>>> randomTeams;
|
std::vector<Reference<IDataDistributionTeam>> randomTeams;
|
||||||
std::set< UID > sources;
|
std::set<UID> completeSources;
|
||||||
|
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||||
|
completeSources.insert( req.completeSources[i] );
|
||||||
|
}
|
||||||
if( !req.wantsNewServers ) {
|
if( !req.wantsNewServers ) {
|
||||||
std::vector<Reference<IDataDistributionTeam>> similarTeams;
|
int bestSize = 0;
|
||||||
bool foundExact = false;
|
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||||
|
if( self->server_info.count( req.completeSources[i] ) ) {
|
||||||
for( int i = 0; i < req.sources.size(); i++ )
|
auto& teamList = self->server_info[ req.completeSources[i] ]->teams;
|
||||||
sources.insert( req.sources[i] );
|
|
||||||
|
|
||||||
for( int i = 0; i < req.sources.size(); i++ ) {
|
|
||||||
if( self->server_info.count( req.sources[i] ) ) {
|
|
||||||
auto& teamList = self->server_info[ req.sources[i] ]->teams;
|
|
||||||
for( int j = 0; j < teamList.size(); j++ ) {
|
for( int j = 0; j < teamList.size(); j++ ) {
|
||||||
if( teamList[j]->isHealthy() && (!req.preferLowerUtilization || teamList[j]->hasHealthyFreeSpace())) {
|
bool found = true;
|
||||||
int sharedMembers = 0;
|
auto serverIDs = teamList[j]->getServerIDs();
|
||||||
for( const UID& id : teamList[j]->getServerIDs() )
|
for( int k = 0; k < teamList[j]->size(); k++ ) {
|
||||||
if( sources.count( id ) )
|
if( !completeSources.count( serverIDs[k] ) ) {
|
||||||
sharedMembers++;
|
found = false;
|
||||||
|
break;
|
||||||
if( !foundExact && sharedMembers == teamList[j]->size() ) {
|
|
||||||
foundExact = true;
|
|
||||||
bestOption = Optional<Reference<IDataDistributionTeam>>();
|
|
||||||
similarTeams.clear();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if( (sharedMembers == teamList[j]->size()) || (!foundExact && req.wantsTrueBest) ) {
|
if(found && teamList[j]->isHealthy() && teamList[j]->size() > bestSize) {
|
||||||
int64_t loadBytes = SOME_SHARED * teamList[j]->getLoadBytes(true, req.inflightPenalty);
|
|
||||||
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
|
|
||||||
bestLoadBytes = loadBytes;
|
|
||||||
bestOption = teamList[j];
|
bestOption = teamList[j];
|
||||||
|
bestSize = teamList[j]->size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if( !req.wantsTrueBest && !foundExact )
|
break;
|
||||||
similarTeams.push_back( teamList[j] );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) {
|
if(bestOption.present()) {
|
||||||
ASSERT( bestOption.present() );
|
|
||||||
// Check the team size: be sure team size is correct
|
|
||||||
ASSERT(bestOption.get()->size() == self->configuration.storageTeamSize);
|
|
||||||
req.reply.send( bestOption );
|
req.reply.send( bestOption );
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
if( !req.wantsTrueBest ) {
|
|
||||||
while( similarTeams.size() && randomTeams.size() < SERVER_KNOBS->BEST_TEAM_OPTION_COUNT ) {
|
|
||||||
int randomTeam = deterministicRandom()->randomInt( 0, similarTeams.size() );
|
|
||||||
randomTeams.push_back( std::make_pair( SOME_SHARED, similarTeams[randomTeam] ) );
|
|
||||||
swapAndPop( &similarTeams, randomTeam );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if( req.wantsTrueBest ) {
|
if( req.wantsTrueBest ) {
|
||||||
ASSERT( !bestOption.present() );
|
ASSERT( !bestOption.present() );
|
||||||
for( int i = 0; i < self->teams.size(); i++ ) {
|
for( int i = 0; i < self->teams.size(); i++ ) {
|
||||||
if( self->teams[i]->isHealthy() && (!req.preferLowerUtilization || self->teams[i]->hasHealthyFreeSpace()) ) {
|
if( self->teams[i]->isHealthy() && (!req.preferLowerUtilization || self->teams[i]->hasHealthyFreeSpace()) ) {
|
||||||
int64_t loadBytes = NONE_SHARED * self->teams[i]->getLoadBytes(true, req.inflightPenalty);
|
int64_t loadBytes = self->teams[i]->getLoadBytes(true, req.inflightPenalty);
|
||||||
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
|
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
|
||||||
bestLoadBytes = loadBytes;
|
bestLoadBytes = loadBytes;
|
||||||
bestOption = self->teams[i];
|
bestOption = self->teams[i];
|
||||||
|
@ -830,21 +807,24 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
||||||
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
|
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
|
||||||
|
|
||||||
bool ok = dest->isHealthy() && (!req.preferLowerUtilization || dest->hasHealthyFreeSpace());
|
bool ok = dest->isHealthy() && (!req.preferLowerUtilization || dest->hasHealthyFreeSpace());
|
||||||
for(int i=0; ok && i<randomTeams.size(); i++)
|
for(int i=0; ok && i<randomTeams.size(); i++) {
|
||||||
if (randomTeams[i].second->getServerIDs() == dest->getServerIDs())
|
if (randomTeams[i]->getServerIDs() == dest->getServerIDs()) {
|
||||||
ok = false;
|
ok = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (ok)
|
if (ok)
|
||||||
randomTeams.push_back( std::make_pair( NONE_SHARED, dest ) );
|
randomTeams.push_back( dest );
|
||||||
else
|
else
|
||||||
nTries++;
|
nTries++;
|
||||||
}
|
}
|
||||||
|
|
||||||
for( int i = 0; i < randomTeams.size(); i++ ) {
|
for( int i = 0; i < randomTeams.size(); i++ ) {
|
||||||
int64_t loadBytes = randomTeams[i].first * randomTeams[i].second->getLoadBytes(true, req.inflightPenalty);
|
int64_t loadBytes = randomTeams[i]->getLoadBytes(true, req.inflightPenalty);
|
||||||
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
|
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
|
||||||
bestLoadBytes = loadBytes;
|
bestLoadBytes = loadBytes;
|
||||||
bestOption = randomTeams[i].second;
|
bestOption = randomTeams[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -853,11 +833,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
||||||
// We will get stuck at this! This only happens when a DC fails. No need to consider it right now.
|
// We will get stuck at this! This only happens when a DC fails. No need to consider it right now.
|
||||||
if(!bestOption.present() && self->zeroHealthyTeams->get()) {
|
if(!bestOption.present() && self->zeroHealthyTeams->get()) {
|
||||||
//Attempt to find the unhealthy source server team and return it
|
//Attempt to find the unhealthy source server team and return it
|
||||||
std::set<UID> completeSources;
|
|
||||||
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
|
||||||
completeSources.insert( req.completeSources[i] );
|
|
||||||
}
|
|
||||||
|
|
||||||
int bestSize = 0;
|
int bestSize = 0;
|
||||||
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||||
if( self->server_info.count( req.completeSources[i] ) ) {
|
if( self->server_info.count( req.completeSources[i] ) ) {
|
||||||
|
|
|
@ -38,11 +38,6 @@ struct RelocateShard {
|
||||||
RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {}
|
RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
|
||||||
SOME_SHARED = 2,
|
|
||||||
NONE_SHARED = 3
|
|
||||||
};
|
|
||||||
|
|
||||||
struct IDataDistributionTeam {
|
struct IDataDistributionTeam {
|
||||||
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() = 0;
|
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() = 0;
|
||||||
virtual int size() = 0;
|
virtual int size() = 0;
|
||||||
|
@ -81,7 +76,6 @@ struct GetTeamRequest {
|
||||||
bool wantsTrueBest;
|
bool wantsTrueBest;
|
||||||
bool preferLowerUtilization;
|
bool preferLowerUtilization;
|
||||||
double inflightPenalty;
|
double inflightPenalty;
|
||||||
std::vector<UID> sources;
|
|
||||||
std::vector<UID> completeSources;
|
std::vector<UID> completeSources;
|
||||||
Promise< Optional< Reference<IDataDistributionTeam> > > reply;
|
Promise< Optional< Reference<IDataDistributionTeam> > > reply;
|
||||||
|
|
||||||
|
|
|
@ -54,14 +54,7 @@ struct RelocateData {
|
||||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
|
||||||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
|
||||||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
|
||||||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
|
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT), interval("QueuedRelocation") {}
|
||||||
mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {}
|
|
||||||
|
|
||||||
static bool mergeWantsNewServers(KeyRangeRef keys, int priority) {
|
|
||||||
return priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD &&
|
|
||||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 ||
|
|
||||||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff"))));
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool isHealthPriority(int priority) {
|
static bool isHealthPriority(int priority) {
|
||||||
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
|
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
|
||||||
|
@ -937,7 +930,6 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
|
||||||
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
|
||||||
|
|
||||||
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
|
||||||
req.sources = rd.src;
|
|
||||||
req.completeSources = rd.completeSources;
|
req.completeSources = rd.completeSources;
|
||||||
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
|
||||||
// If a DC has no healthy team, we stop checking the other DCs until
|
// If a DC has no healthy team, we stop checking the other DCs until
|
||||||
|
|
|
@ -105,7 +105,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
||||||
init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
|
init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
|
||||||
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
|
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
|
||||||
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
|
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
|
||||||
init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2;
|
|
||||||
|
|
||||||
init( PRIORITY_RECOVER_MOVE, 110 );
|
init( PRIORITY_RECOVER_MOVE, 110 );
|
||||||
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
|
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
|
||||||
|
|
|
@ -105,7 +105,6 @@ public:
|
||||||
double INFLIGHT_PENALTY_REDUNDANT;
|
double INFLIGHT_PENALTY_REDUNDANT;
|
||||||
double INFLIGHT_PENALTY_UNHEALTHY;
|
double INFLIGHT_PENALTY_UNHEALTHY;
|
||||||
double INFLIGHT_PENALTY_ONE_LEFT;
|
double INFLIGHT_PENALTY_ONE_LEFT;
|
||||||
int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards.
|
|
||||||
|
|
||||||
// Higher priorities are executed first
|
// Higher priorities are executed first
|
||||||
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
// Priority/100 is the "priority group"/"superpriority". Priority inversion
|
||||||
|
|
Loading…
Reference in New Issue