TeamCollection: Remove redundant teams
When the total number of teams is larger than the desired number, we should gracefully remove the redundant teams so that the number of teams is kept to a low number and the possibility of losing data is guaranteed to be extremely low even when multiple racks fail at the same time.
This commit is contained in:
parent
455024b3fe
commit
76d022f71c
|
@ -39,6 +39,8 @@ class TCTeamInfo;
|
|||
struct TCMachineInfo;
|
||||
class TCMachineTeamInfo;
|
||||
|
||||
//Future<Void> removeBadTeams(DDTeamCollection const *self);
|
||||
|
||||
struct TeamBuildInfo {
|
||||
int currentServerTeamNum;
|
||||
int desiredServerTeamNum;
|
||||
|
@ -147,6 +149,7 @@ class TCMachineTeamInfo : public ReferenceCounted<TCMachineTeamInfo> {
|
|||
public:
|
||||
vector<Reference<TCMachineInfo>> machines;
|
||||
vector<Standalone<StringRef>> machineIDs;
|
||||
bool redundant; // True if the server team is marked as the redundant team that makes the total number of teams larger than needed
|
||||
|
||||
explicit TCMachineTeamInfo(vector<Reference<TCMachineInfo>> const& machines) : machines(machines) {
|
||||
machineIDs.reserve(machines.size());
|
||||
|
@ -195,10 +198,11 @@ public:
|
|||
Future<Void> tracker;
|
||||
bool healthy;
|
||||
bool wrongConfiguration; //True if any of the servers in the team have the wrong configuration
|
||||
bool redundant; // True if the server team is marked as the redundant team that makes the total number of teams larger than needed
|
||||
int priority;
|
||||
|
||||
explicit TCTeamInfo(vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), priority(PRIORITY_TEAM_HEALTHY), wrongConfiguration(false) {
|
||||
: servers(servers), healthy(true), priority(PRIORITY_TEAM_HEALTHY), wrongConfiguration(false), redundant(false) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers");
|
||||
}
|
||||
|
@ -568,9 +572,12 @@ Future<Void> storageServerTracker(
|
|||
|
||||
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<TCTeamInfo> const& team, bool const& badTeam );
|
||||
|
||||
|
||||
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
||||
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
|
||||
|
||||
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
|
||||
// addActor is used to create the actorCollection when the dataDistributionTeamCollection is created
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
Database cx;
|
||||
UID masterId;
|
||||
|
@ -622,6 +629,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
Future<Void> checkTeamDelay;
|
||||
Promise<Void> addSubsetComplete;
|
||||
Future<Void> badTeamRemover;
|
||||
Future<Void> redundantTeamRemover;
|
||||
|
||||
Reference<LocalitySet> storageServerSet;
|
||||
std::vector<LocalityEntry> forcedEntries, resultEntries;
|
||||
|
@ -664,7 +672,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& serverChanges,
|
||||
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
|
||||
Reference<AsyncVar<bool>> processingUnhealthy)
|
||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder( Void() ), badTeamRemover( Void() ),
|
||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder( Void() ), badTeamRemover( Void() ), redundantTeamRemover( Void() ),
|
||||
configuration(configuration), serverChanges(serverChanges), readyToStart(readyToStart), checkTeamDelay( delay( SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution) ),
|
||||
initialFailureReactionDelay( delayed( readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ), storageServerSet(new LocalityMap<UID>()),
|
||||
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
|
||||
|
@ -926,7 +934,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
auto& serverTeams = servers[j]->teams;
|
||||
for( int k = 0; k < serverTeams.size(); k++ ) {
|
||||
auto &testTeam = serverTeams[k]->getServerIDs();
|
||||
bool allInTeam = true;
|
||||
bool allInTeam = true; // Does all servers in testTeam belong to the healthy servers, extracted from current badTeam
|
||||
for( int l = 0; l < testTeam.size(); l++ ) {
|
||||
bool foundServer = false;
|
||||
for( auto it : servers ) {
|
||||
|
@ -951,6 +959,17 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
if(servers.size() == self->configuration.storageTeamSize || self->satisfiesPolicy(servers, self->configuration.storageTeamSize)) {
|
||||
servers.resize(self->configuration.storageTeamSize);
|
||||
self->addTeam(servers, true);
|
||||
// if ( !self->redundantTeamRemover.isReady() ) {
|
||||
// wait( self->redundantTeamRemover );
|
||||
// }
|
||||
// NOTE: This code path adds a team in an emergency situation.
|
||||
// We should not allow the number of teams exceed the desired one.
|
||||
// Otherwise, the caller may keep trying to add emergency team, which causes infinite loop
|
||||
// if ( self->redundantTeamRemover.isReady() ) {
|
||||
// self->redundantTeamRemover = teamRemover(self);
|
||||
// self->addActor.send(self->redundantTeamRemover);
|
||||
// }
|
||||
self->traceTeamCollectionInfo();
|
||||
} else {
|
||||
tempSet->clear();
|
||||
for( auto it : servers ) {
|
||||
|
@ -967,6 +986,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
serverIds.push_back(*tempMap->getObject(it));
|
||||
}
|
||||
self->addTeam(serverIds.begin(), serverIds.end(), true);
|
||||
// if ( !self->redundantTeamRemover.isReady() ) {
|
||||
// wait( self->redundantTeamRemover );
|
||||
// }
|
||||
// if ( self->redundantTeamRemover.isReady() ) {
|
||||
// self->redundantTeamRemover = teamRemover(self);
|
||||
// self->addActor.send(self->redundantTeamRemover);
|
||||
// }
|
||||
self->traceTeamCollectionInfo();
|
||||
}
|
||||
} else {
|
||||
|
@ -996,6 +1022,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
state std::set<std::vector<UID>>::iterator teamIterEnd = self->primary ? initTeams->primaryTeams.end() : initTeams->remoteTeams.end();
|
||||
for(; teamIter != teamIterEnd; ++teamIter) {
|
||||
self->addTeam(teamIter->begin(), teamIter->end(), true);
|
||||
// if ( !self->redundantTeamRemover.isReady() ) {
|
||||
// wait( self->redundantTeamRemover );
|
||||
// }
|
||||
if ( self->redundantTeamRemover.isReady() ) {
|
||||
self->redundantTeamRemover = teamRemover(self);
|
||||
self->addActor.send(self->redundantTeamRemover);
|
||||
}
|
||||
self->traceTeamCollectionInfo();
|
||||
wait( yield() );
|
||||
}
|
||||
|
@ -1099,9 +1132,11 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
addTeam(newTeamServers, isInitialTeam);
|
||||
}
|
||||
|
||||
void addTeam(const vector<Reference<TCServerInfo>>& newTeamServers, bool isInitialTeam) {
|
||||
void addTeam(const vector<Reference<TCServerInfo>>& newTeamServers, bool isInitialTeam, bool redundantTeam = false) {
|
||||
Reference<TCTeamInfo> teamInfo(new TCTeamInfo(newTeamServers));
|
||||
bool badTeam = !satisfiesPolicy(teamInfo->servers) || teamInfo->servers.size() != configuration.storageTeamSize;
|
||||
teamInfo->redundant = redundantTeam;
|
||||
|
||||
bool badTeam = !satisfiesPolicy(teamInfo->servers) || teamInfo->servers.size() != configuration.storageTeamSize || redundantTeam;
|
||||
|
||||
//TODO: MT upgrade: add a bool to force it to be a badTeam
|
||||
teamInfo->tracker = teamTracker(this, teamInfo, badTeam);
|
||||
|
@ -1593,6 +1628,69 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
return totalHealthyMachineCount;
|
||||
}
|
||||
|
||||
// compare function for map with key type as Reference<TCMachineTeamInfo>
|
||||
struct TCMachineTeamInfoReferenceCompare
|
||||
{
|
||||
bool operator() (const Reference<TCMachineTeamInfo>& lhs, const Reference<TCMachineTeamInfo>& rhs) const
|
||||
{
|
||||
return lhs->machineIDs < rhs->machineIDs;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Find the machine team with the least number of process teams
|
||||
int getMachineTeamWithLeastProcessTeams(Reference<TCMachineTeamInfo> &mt) {
|
||||
std::map<Reference<TCMachineTeamInfo>, int, TCMachineTeamInfoReferenceCompare> machineTeamProcessTeamCounts;
|
||||
int numProcessTeams = std::numeric_limits<int>::max();
|
||||
int maxNumProcessTeams = std::numeric_limits<int>::min();
|
||||
|
||||
machineTeamProcessTeamCounts.clear();
|
||||
mt = Reference<TCMachineTeamInfo>();
|
||||
for (auto &team: teams) {
|
||||
if (machineTeamProcessTeamCounts.find(team->machineTeam) == machineTeamProcessTeamCounts.end()) {
|
||||
machineTeamProcessTeamCounts.insert(std::make_pair(team->machineTeam, 1));
|
||||
} else {
|
||||
printf("[DEBUG] more than 1 team for machine team:%s\n", team->machineTeam->getMachineIDsStr().c_str());
|
||||
machineTeamProcessTeamCounts[team->machineTeam] += 1;
|
||||
}
|
||||
}
|
||||
// find the machine team with least process team
|
||||
for (auto &mtc: machineTeamProcessTeamCounts) {
|
||||
if (mtc.second < numProcessTeams) {
|
||||
numProcessTeams = mtc.second;
|
||||
mt = mtc.first;
|
||||
}
|
||||
if (mtc.second > maxNumProcessTeams) {
|
||||
maxNumProcessTeams = mtc.second;
|
||||
}
|
||||
}
|
||||
|
||||
if ( maxNumProcessTeams * machineTeams.size() < teams.size() ) {
|
||||
printf("[DEBUG] number of teams per machine team. Total number of teams:%d, maxNumProcessTeams:%d, machineTeams:%d\n", teams.size(), maxNumProcessTeams, machineTeams.size());
|
||||
for (auto &teamCount : machineTeamProcessTeamCounts) {
|
||||
printf("\tTeamCount:%d MachineTeamInfo:%s\n", teamCount.second, teamCount.first->getMachineIDsStr().c_str());
|
||||
}
|
||||
}
|
||||
ASSERT(maxNumProcessTeams * machineTeams.size() >= teams.size());
|
||||
ASSERT(mt.isValid());
|
||||
|
||||
return numProcessTeams;
|
||||
}
|
||||
|
||||
int getHealthyMachineTeamCount() {
|
||||
int totalMachineTeamCount = 0;
|
||||
for (auto mt = machineTeams.begin(); mt != machineTeams.end(); ++mt) {
|
||||
ASSERT((*mt)->machines.size() == configuration.storageTeamSize);
|
||||
|
||||
if (isMachineTeamHealthy(*mt)) {
|
||||
++totalMachineTeamCount;
|
||||
}
|
||||
++totalMachineTeamCount;
|
||||
}
|
||||
|
||||
return totalMachineTeamCount;
|
||||
}
|
||||
|
||||
// Create server teams based on machine teams
|
||||
// Before the number of machine teams reaches the threshold, build a machine team for each server team
|
||||
// When it reaches the threshold, first try to build a server team with existing machine teams; if failed,
|
||||
|
@ -1730,6 +1828,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MaxTeamNumber", maxTeamNumber)
|
||||
.detail("StorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("CurrentMachineTeamNumber", machineTeams.size())
|
||||
.detail("CurrentHealthyMachineTeamNumber", healthyMachineTeamCount)
|
||||
.detail("DesiredMachineTeams", desiredMachineTeams)
|
||||
.detail("MaxMachineTeams", maxMachineTeams)
|
||||
.detail("TotalHealthyMachine", totalHealthyMachineCount)
|
||||
|
@ -1748,6 +1847,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
int totalHealthyMachineCount = calculateHealthyMachineCount();
|
||||
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_MACHINE * totalHealthyMachineCount;
|
||||
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_MACHINE * totalHealthyMachineCount;
|
||||
int healthyMachineTeamCount = getHealthyMachineTeamCount();
|
||||
|
||||
TraceEvent("TeamCollectionInfo", masterId)
|
||||
.detail("Primary", primary)
|
||||
|
@ -1758,6 +1858,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MaxTeamNumber", maxServerTeams)
|
||||
.detail("StorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("CurrentMachineTeamNumber", machineTeams.size())
|
||||
.detail("CurrentHealthyMachineTeamNumber", healthyMachineTeamCount)
|
||||
.detail("DesiredMachineTeams", desiredMachineTeams)
|
||||
.detail("MaxMachineTeams", maxMachineTeams)
|
||||
.detail("TotalHealthyMachine", totalHealthyMachineCount)
|
||||
|
@ -1842,11 +1943,21 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
// Debug: set true for traceAllInfo() to print out more information
|
||||
self->traceAllInfo();
|
||||
}
|
||||
if (addedTeams <= 0) {
|
||||
// if (!self->redundantTeamRemover.isReady()) {
|
||||
// wait(self->redundantTeamRemover);
|
||||
// }
|
||||
if ( self->redundantTeamRemover.isReady() ) {
|
||||
self->redundantTeamRemover = teamRemover(self);
|
||||
self->addActor.send(self->redundantTeamRemover);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int totalHealthyMachineCount = self->calculateHealthyMachineCount();
|
||||
|
||||
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_MACHINE * totalHealthyMachineCount;
|
||||
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_MACHINE * totalHealthyMachineCount;
|
||||
int healthyMachineTeamCount = self->getHealthyMachineTeamCount();
|
||||
|
||||
TraceEvent("TeamCollectionInfo", self->masterId)
|
||||
.detail("Primary", self->primary)
|
||||
|
@ -1857,6 +1968,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("MaxTeamNumber", maxTeams)
|
||||
.detail("StorageTeamSize", self->configuration.storageTeamSize)
|
||||
.detail("CurrentMachineTeamNumber", self->machineTeams.size())
|
||||
.detail("CurrentHealthyMachineTeamNumber", healthyMachineTeamCount)
|
||||
.detail("DesiredMachineTeams", desiredMachineTeams)
|
||||
.detail("MaxMachineTeams", maxMachineTeams)
|
||||
.detail("TotalHealthyMachine", totalHealthyMachineCount)
|
||||
|
@ -2023,6 +2135,29 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
// rebuildMachineLocalityMap();
|
||||
}
|
||||
|
||||
bool removeMachineTeam(Reference<TCMachineTeamInfo> targetMT) {
|
||||
bool foundMachineTeam = false;
|
||||
for (int i = 0; i < machineTeams.size(); i++) {
|
||||
Reference<TCMachineTeamInfo> mt = machineTeams[i];
|
||||
if (mt->machineIDs == targetMT->machineIDs) {
|
||||
machineTeams[i--] = machineTeams.back();
|
||||
machineTeams.pop_back();
|
||||
foundMachineTeam = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// remove machine team on each machine
|
||||
for (auto &machine: machine_info) {
|
||||
for(int i = 0; i < machine.second->machineTeams.size(); ++i) {
|
||||
if (machine.second->machineTeams[i]->machineIDs == targetMT->machineIDs) {
|
||||
machine.second->machineTeams[i--] = machine.second->machineTeams.back();
|
||||
machine.second->machineTeams.pop_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
return foundMachineTeam;
|
||||
}
|
||||
|
||||
void removeServer( UID removedServer ) {
|
||||
TraceEvent("RemovedStorageServer", masterId).detail("ServerID", removedServer);
|
||||
// ASSERT( !shardsAffectedByTeamFailure->getServersForTeam( t ) for all t in teams that contain removedServer )
|
||||
|
@ -2119,6 +2254,92 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
|
||||
wait(self->initialFailureReactionDelay);
|
||||
loop {
|
||||
while(self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
|
||||
wait(self->zeroHealthyTeams->onChange() || self->processingUnhealthy->onChange());
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority)); //After the team trackers wait on the initial failure reaction delay, they yield. We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
if(!self->zeroHealthyTeams->get() && !self->processingUnhealthy->get()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
wait(self->addSubsetComplete.getFuture());
|
||||
TraceEvent("DDRemovingBadTeams", self->masterId).detail("Primary", self->primary);
|
||||
for(auto it : self->badTeams) {
|
||||
it->tracker.cancel();
|
||||
}
|
||||
self->badTeams.clear();
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
|
||||
loop {
|
||||
// loop until the number of machine teams <= desiredMachineTeams
|
||||
int totalHealthyMachineCount = self->calculateHealthyMachineCount();
|
||||
|
||||
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_MACHINE * totalHealthyMachineCount;
|
||||
|
||||
if (self->machineTeams.size() > desiredMachineTeams) {
|
||||
// Pick the machine team with the least number of process teams and mark in undesired
|
||||
state Reference<TCMachineTeamInfo> mt;
|
||||
state int minNumProcessTeams = self->getMachineTeamWithLeastProcessTeams(mt);
|
||||
ASSERT(mt.isValid());
|
||||
mt->redundant = true; // unused as a placeholder to mark the machine team status
|
||||
|
||||
// Pick one process team, and mark it as a bad team
|
||||
// Remove the machine by removing its process team one by one
|
||||
state bool addedNewBadTeam = false;
|
||||
state Reference<TCTeamInfo> team;
|
||||
state int teamIndex = 0;
|
||||
state int numServerTeamRemoved = 0;
|
||||
for (teamIndex = 0; teamIndex < self->teams.size(); ++teamIndex) {
|
||||
team = self->teams[teamIndex];
|
||||
if (team->machineTeam->machineIDs != mt->machineIDs) {
|
||||
continue;
|
||||
}
|
||||
// The team will be marked as a bad team
|
||||
team->redundant = true; // unused as a placeholder to mark the team status
|
||||
numServerTeamRemoved++;
|
||||
if( self->removeTeam(team) ) {
|
||||
--teamIndex; //removeTeam() has side effect of swapping the last element to the current pos.
|
||||
self->addTeam(team->servers, true, true);
|
||||
addedNewBadTeam = true;
|
||||
}
|
||||
if(addedNewBadTeam) {
|
||||
TEST(true);
|
||||
// if (!self->badTeamRemover.isReady()) {
|
||||
// wait(self->badTeamRemover);
|
||||
// }
|
||||
if ( self->badTeamRemover.isReady() ) {
|
||||
self->badTeamRemover = removeBadTeams(self);
|
||||
self->addActor.send(self->badTeamRemover);
|
||||
}
|
||||
}
|
||||
// Wait for the bad team remover to return
|
||||
// if (!self->badTeamRemover.isReady()) {
|
||||
// wait(self->badTeamRemover);
|
||||
// }
|
||||
}
|
||||
|
||||
TraceEvent("TeamRemover").detail("MachineTeamToRemove", mt->getMachineIDsStr()).detail("NumServerTeamRemoved", numServerTeamRemoved)
|
||||
.detail("NumProcessTeamsOnTheMachineTeam", minNumProcessTeams)
|
||||
.detail("CurrentMachineTeamNumber", self->machineTeams.size())
|
||||
.detail("DesiredMachineTeam", desiredMachineTeams);
|
||||
ASSERT(numServerTeamRemoved == minNumProcessTeams);
|
||||
// Remove the machine team
|
||||
bool foundRemovedTeam = self->removeMachineTeam(mt);
|
||||
ASSERT(foundRemovedTeam);
|
||||
} else {
|
||||
TraceEvent("TeamRemoverDone").detail("CurrentMachineTeamNumber", self->machineTeams.size()).detail("DesiredMachineTeam", desiredMachineTeams);
|
||||
self->traceTeamCollectionInfo();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Track a team and issue RelocateShards when the level of degradation changes
|
||||
ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> team, bool badTeam ) {
|
||||
state int lastServersLeft = team->getServerIDs().size();
|
||||
|
@ -2491,25 +2712,6 @@ ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self,
|
|||
return type;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
|
||||
wait(self->initialFailureReactionDelay);
|
||||
loop {
|
||||
while(self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
|
||||
wait(self->zeroHealthyTeams->onChange() || self->processingUnhealthy->onChange());
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority)); //After the team trackers wait on the initial failure reaction delay, they yield. We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
if(!self->zeroHealthyTeams->get() && !self->processingUnhealthy->get()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
wait(self->addSubsetComplete.getFuture());
|
||||
TraceEvent("DDRemovingBadTeams", self->masterId).detail("Primary", self->primary);
|
||||
for(auto it : self->badTeams) {
|
||||
it->tracker.cancel();
|
||||
}
|
||||
self->badTeams.clear();
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerFailureTracker(
|
||||
DDTeamCollection* self,
|
||||
|
@ -2749,7 +2951,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
// Get the newBadTeams due to the locality change
|
||||
vector<Reference<TCTeamInfo>> newBadTeams;
|
||||
for (auto& serverTeam : server->teams) {
|
||||
if (!self->satisfiesPolicy(serverTeam->servers)) {
|
||||
if (!self->satisfiesPolicy(serverTeam->servers) || serverTeam->redundant) {
|
||||
newBadTeams.push_back(serverTeam);
|
||||
continue;
|
||||
}
|
||||
|
@ -2770,7 +2972,6 @@ ACTOR Future<Void> storageServerTracker(
|
|||
for(auto it : newBadTeams) {
|
||||
if( self->removeTeam(it) ) {
|
||||
self->addTeam(it->servers, true);
|
||||
self->traceTeamCollectionInfo();
|
||||
addedNewBadTeam = true;
|
||||
}
|
||||
}
|
||||
|
@ -2779,6 +2980,7 @@ ACTOR Future<Void> storageServerTracker(
|
|||
self->doBuildTeams = true;
|
||||
self->badTeamRemover = removeBadTeams(self);
|
||||
self->addActor.send(self->badTeamRemover);
|
||||
self->traceTeamCollectionInfo();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -253,3 +253,6 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
|
|||
|
||||
//Determines the maximum shard size based on the size of the database
|
||||
int64_t getMaxShardSize( double dbSizeEstimate );
|
||||
|
||||
class DDTeamCollection;
|
||||
Future<Void> teamRemover(DDTeamCollection* const& self);
|
|
@ -268,21 +268,27 @@ ACTOR Future<bool> getTeamCollectionValid( Database cx, WorkerInterface masterWo
|
|||
int64_t desiredTeamNumber;
|
||||
int64_t maxTeamNumber;
|
||||
int64_t currentMachineTeamNumber;
|
||||
int64_t healthyMachineTeamCount;
|
||||
int64_t desiredMachineTeamNumber;
|
||||
int64_t maxMachineTeamNumber;
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentTeamNumber").c_str(), "%lld", ¤tTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("DesiredTeamNumber").c_str(), "%lld", &desiredTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("MaxTeamNumber").c_str(), "%lld", &maxTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentMachineTeamNumber").c_str(), "%lld", ¤tMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentHealthyMachineTeamNumber").c_str(), "%lld", &healthyMachineTeamCount);
|
||||
sscanf(teamCollectionInfoMessage.getValue("DesiredMachineTeams").c_str(), "%lld", &desiredMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("MaxMachineTeams").c_str(), "%lld", &maxMachineTeamNumber);
|
||||
|
||||
if (currentTeamNumber > desiredTeamNumber || currentMachineTeamNumber > desiredMachineTeamNumber) {
|
||||
//if (currentTeamNumber > desiredTeamNumber || currentMachineTeamNumber > desiredMachineTeamNumber) {
|
||||
if (currentMachineTeamNumber > maxMachineTeamNumber || healthyMachineTeamCount > desiredMachineTeamNumber) {
|
||||
// printf("getTeamCollectionValid: currentTeamNumber:%ld, desiredTeamNumber:%ld, maxTeamNumber:%ld currentMachineTeamNumber:%ld, desiredMachineTeamNumber:%ld, maxMachineTeamNumber:%ld\n",
|
||||
// currentTeamNumber, desiredTeamNumber, maxTeamNumber, currentMachineTeamNumber, desiredMachineTeamNumber, maxMachineTeamNumber);
|
||||
TraceEvent("GetTeamCollectionValid").detail("CurrentTeamNumber", currentTeamNumber)
|
||||
.detail("DesiredTeamNumber", desiredTeamNumber).detail("MaxTeamNumber", maxTeamNumber)
|
||||
.detail("CurrentMachineTeamNumber", currentMachineTeamNumber).detail("DesiredMachineTeams", desiredMachineTeamNumber)
|
||||
.detail("DesiredTeamNumber", desiredTeamNumber)
|
||||
.detail("CurrentHealthyMachineTeamNumber", healthyMachineTeamCount)
|
||||
.detail("MaxTeamNumber", maxTeamNumber)
|
||||
.detail("CurrentMachineTeamNumber", currentMachineTeamNumber)
|
||||
.detail("DesiredMachineTeams", desiredMachineTeamNumber)
|
||||
.detail("MaxMachineTeams", maxMachineTeamNumber);
|
||||
return false;
|
||||
} else {
|
||||
|
@ -406,7 +412,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
.detail("DataDistributionActive", dataDistributionActive.get())
|
||||
.detail("StorageServersRecruiting", storageServersRecruiting.get());
|
||||
|
||||
if( dataInFlight.get() > dataInFlightGate || tLogQueueSize.get() > maxTLogQueueGate
|
||||
if ( dataInFlight.get() > dataInFlightGate || tLogQueueSize.get() > maxTLogQueueGate
|
||||
|| dataDistributionQueueSize.get() > maxDataDistributionQueueSize || storageQueueSize.get() > maxStorageServerQueueGate
|
||||
|| dataDistributionActive.get() == false || storageServersRecruiting.get() == true
|
||||
|| teamCollectionValid.get() == false) {
|
||||
|
|
Loading…
Reference in New Issue