when doing data movement where one region has the data and the other doesn’t, first move a single replica to the other region to save WAN bandwidth

This commit is contained in:
Evan Tschannen 2018-06-19 23:15:30 -07:00
parent e7999e7a3e
commit e951df95b7
1 changed files with 65 additions and 27 deletions

View File

@ -856,10 +856,12 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
state PromiseStream<RelocateData> relocationComplete( self->relocationComplete );
state bool signalledTransferComplete = false;
state UID masterId = self->mi.id();
state ParallelTCInfo destination;
state std::vector<ShardsAffectedByTeamFailure::Team> destinationTeams;
state ParallelTCInfo healthyDestinations;
state bool anyHealthy = false;
state bool allHealthy = true;
state bool anyWithSource = false;
state std::vector<std::pair<Reference<IDataDistributionTeam>,bool>> bestTeams;
try {
if(now() - self->lastInterval < 1.0) {
@ -884,15 +886,11 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
loop {
state int tciIndex = 0;
state bool foundTeams = true;
destination.clear();
destinationTeams.clear();
healthyDestinations.clear();
anyHealthy = false;
loop{
if (tciIndex == self->teamCollections.size()) {
break;
}
allHealthy = true;
anyWithSource = false;
bestTeams.clear();
while( tciIndex < self->teamCollections.size() ) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if(rd.priority >= PRIORITY_TEAM_UNHEALTHY) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if(rd.priority >= PRIORITY_TEAM_1_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
@ -901,18 +899,26 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
req.sources = rd.src;
req.completeSources = rd.completeSources;
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
if (bestTeam.present()) {
destination.addTeam(bestTeam.get());
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(bestTeam.get()->getServerIDs(), tciIndex == 0));
if(bestTeam.get()->isHealthy()) {
healthyDestinations.addTeam(bestTeam.get());
anyHealthy = true;
}
}
else {
if(!bestTeam.present()) {
foundTeams = false;
break;
}
if(!bestTeam.get()->isHealthy()) {
allHealthy = false;
} else {
anyHealthy = true;
}
bool foundSource = false;
if(!rd.wantsNewServers && self->teamCollections.size() > 1) {
for(auto& it : bestTeam.get()->getServerIDs()) {
if(std::find(rd.src.begin(), rd.src.end(), it) != rd.src.end()) {
foundSource = true;
anyWithSource = true;
break;
}
}
}
bestTeams.push_back(std::make_pair(bestTeam.get(), foundSource));
tciIndex++;
}
if (foundTeams) {
@ -930,6 +936,33 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
Void _ = wait( delay( SERVER_KNOBS->BEST_TEAM_STUCK_DELAY, TaskDataDistributionLaunch ) );
}
state std::vector<UID> destIds;
state std::vector<UID> healthyIds;
state std::vector<UID> extraIds;
state std::vector<ShardsAffectedByTeamFailure::Team> destinationTeams;
for(int i = 0; i < bestTeams.size(); i++) {
auto& serverIds = bestTeams[i].first->getServerIDs();
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(serverIds, i == 0));
if(allHealthy && anyWithSource && !bestTeams[i].second) {
int idx = g_random->randomInt(0, serverIds.size());
destIds.push_back(serverIds[idx]);
healthyIds.push_back(serverIds[idx]);
for(int j = 0; j < serverIds.size(); j++) {
if(j != idx) {
extraIds.push_back(serverIds[j]);
}
}
healthyDestinations.addTeam(bestTeams[i].first);
} else {
destIds.insert(destIds.end(), serverIds.begin(), serverIds.end());
if(bestTeams[i].first->isHealthy()) {
healthyIds.insert(healthyIds.end(), serverIds.begin(), serverIds.end());
healthyDestinations.addTeam(bestTeams[i].first);
}
}
}
self->shardsAffectedByTeamFailure->moveShard(rd.keys, destinationTeams);
//FIXME: do not add data in flight to servers that were already in the src.
@ -937,21 +970,26 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
TraceEvent(relocateShardInterval.severity, "RelocateShardHasDestination", masterId)
.detail("PairId", relocateShardInterval.pairID)
.detail("DestinationTeam", destination.getDesc());
.detail("DestinationTeam", describe(destIds))
.detail("ExtraIds", describe(extraIds));
state Error error = success();
state Promise<Void> dataMovementComplete;
state Future<Void> doMoveKeys = moveKeys(
self->cx, rd.keys, destination.getServerIDs(), healthyDestinations.getServerIDs(), self->lock,
dataMovementComplete, &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock,
self->recoveryVersion,self->teamCollections.size() > 1, relocateShardInterval.pairID );
state Future<Void> doMoveKeys = moveKeys(self->cx, rd.keys, destIds, healthyIds, self->lock, dataMovementComplete, &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock, self->recoveryVersion, self->teamCollections.size() > 1, relocateShardInterval.pairID );
state Future<Void> pollHealth = (!anyHealthy || signalledTransferComplete) ? Never() : delay( SERVER_KNOBS->HEALTH_POLL_TIME, TaskDataDistributionLaunch );
try {
loop {
choose {
when( Void _ = wait( doMoveKeys ) ) {
self->fetchKeysComplete.insert( rd );
break;
if(extraIds.size()) {
destIds.insert(destIds.end(), extraIds.begin(), extraIds.end());
healthyIds.insert(healthyIds.end(), extraIds.begin(), extraIds.end());
extraIds.clear();
doMoveKeys = moveKeys(self->cx, rd.keys, destIds, healthyIds, self->lock, Promise<Void>(), &self->startMoveKeysParallelismLock, &self->finishMoveKeysParallelismLock, self->recoveryVersion, self->teamCollections.size() > 1, relocateShardInterval.pairID );
} else {
self->fetchKeysComplete.insert( rd );
break;
}
}
when( Void _ = wait( pollHealth ) ) {
if (!healthyDestinations.isHealthy()) {
@ -992,7 +1030,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
if( !error.code() ) {
TraceEvent(relocateShardInterval.end(), masterId).detail("Result","Success");
if(rd.keys.begin == keyServersPrefix) {
TraceEvent("MovedKeyServerKeys").detail("Dest", destination.getDesc()).trackLatest("MovedKeyServers");
TraceEvent("MovedKeyServerKeys").detail("Dest", describe(destIds)).trackLatest("MovedKeyServers");
}
if( !signalledTransferComplete ) {