TeamCollection: Apply clang-format
This commit is contained in:
parent
214a72fba3
commit
3ae8767ee8
|
@ -559,7 +559,6 @@ Future<Void> storageServerTracker(
|
|||
|
||||
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<TCTeamInfo> const& team, bool const& badTeam );
|
||||
|
||||
|
||||
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
||||
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
|
||||
|
||||
|
@ -647,24 +646,27 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
return result && resultEntries.size() == 0;
|
||||
}
|
||||
|
||||
DDTeamCollection(
|
||||
Database const& cx,
|
||||
UID masterId,
|
||||
MoveKeysLock const& lock,
|
||||
DDTeamCollection(Database const& cx, UID masterId, MoveKeysLock const& lock,
|
||||
PromiseStream<RelocateShard> const& output,
|
||||
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
|
||||
DatabaseConfiguration configuration,
|
||||
std::vector<Optional<Key>> includedDCs,
|
||||
DatabaseConfiguration configuration, std::vector<Optional<Key>> includedDCs,
|
||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
||||
Optional<PromiseStream<std::pair<UID, Optional<StorageServerInterface>>>> const& serverChanges,
|
||||
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
|
||||
Reference<AsyncVar<bool>> processingUnhealthy)
|
||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder( Void() ), badTeamRemover( Void() ), redundantTeamRemover( Void() ),
|
||||
configuration(configuration), serverChanges(serverChanges), readyToStart(readyToStart), checkTeamDelay( delay( SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution) ),
|
||||
initialFailureReactionDelay( delayed( readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ), storageServerSet(new LocalityMap<UID>()),
|
||||
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
|
||||
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary), processingUnhealthy(processingUnhealthy)
|
||||
{
|
||||
: cx(cx), masterId(masterId), lock(lock), output(output),
|
||||
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder(Void()),
|
||||
badTeamRemover(Void()), redundantTeamRemover(Void()), configuration(configuration),
|
||||
serverChanges(serverChanges), readyToStart(readyToStart),
|
||||
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution)),
|
||||
initialFailureReactionDelay(
|
||||
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution)),
|
||||
healthyTeamCount(0), storageServerSet(new LocalityMap<UID>()),
|
||||
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)),
|
||||
optimalTeamCount(0), recruitingStream(0), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
|
||||
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs),
|
||||
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
|
||||
processingUnhealthy(processingUnhealthy) {
|
||||
if(!primary || configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", masterId)
|
||||
.detail( "State", "Inactive" )
|
||||
|
@ -967,14 +969,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
serverIds.push_back(*tempMap->getObject(it));
|
||||
}
|
||||
self->addTeam(serverIds.begin(), serverIds.end(), true);
|
||||
// if ( !self->redundantTeamRemover.isReady() ) {
|
||||
// wait( self->redundantTeamRemover );
|
||||
// }
|
||||
// if ( self->redundantTeamRemover.isReady() ) {
|
||||
// self->redundantTeamRemover = teamRemover(self);
|
||||
// self->addActor.send(self->redundantTeamRemover);
|
||||
// }
|
||||
self->traceTeamCollectionInfo();
|
||||
}
|
||||
} else {
|
||||
serverIds.clear();
|
||||
|
@ -994,6 +988,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
self->redundantTeamRemover = teamRemover(self);
|
||||
self->addActor.send(self->redundantTeamRemover);
|
||||
}
|
||||
// Trace and record the current number of teams for correctness test
|
||||
self->traceTeamCollectionInfo();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1118,12 +1114,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
addTeam(newTeamServers, isInitialTeam);
|
||||
}
|
||||
|
||||
void addTeam(const vector<Reference<TCServerInfo>>& newTeamServers, bool isInitialTeam, bool redundantTeam = false) {
|
||||
void addTeam(const vector<Reference<TCServerInfo>>& newTeamServers, bool isInitialTeam,
|
||||
bool redundantTeam = false) {
|
||||
Reference<TCTeamInfo> teamInfo(new TCTeamInfo(newTeamServers));
|
||||
|
||||
bool badTeam = !satisfiesPolicy(teamInfo->servers) || teamInfo->servers.size() != configuration.storageTeamSize || redundantTeam;
|
||||
bool badTeam = !satisfiesPolicy(teamInfo->servers) ||
|
||||
teamInfo->servers.size() != configuration.storageTeamSize || redundantTeam;
|
||||
|
||||
//TODO: MT upgrade: add a bool to force it to be a badTeam
|
||||
teamInfo->tracker = teamTracker(this, teamInfo, badTeam);
|
||||
// ASSERT( teamInfo->serverIDs.size() > 0 ); //team can be empty at DB initialization
|
||||
if (badTeam) {
|
||||
|
@ -1213,7 +1210,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
|
||||
void traceConfigInfo() {
|
||||
TraceEvent("DDConfig").detail("StorageTeamSize", configuration.storageTeamSize)
|
||||
TraceEvent("DDConfig")
|
||||
.detail("StorageTeamSize", configuration.storageTeamSize)
|
||||
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER)
|
||||
.detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER);
|
||||
}
|
||||
|
@ -1273,7 +1271,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
|
||||
TraceEvent("MachineTeamInfo").detail("Size", machineTeams.size());
|
||||
for (auto& team : machineTeams) {
|
||||
TraceEvent("MachineTeamInfo").detail("TeamIndex", i++)
|
||||
TraceEvent("MachineTeamInfo")
|
||||
.detail("TeamIndex", i++)
|
||||
.detail("MachineIDs", team->getMachineIDsStr())
|
||||
.detail("ServerTeamNumber", team->serverTeams.size());
|
||||
}
|
||||
|
@ -1634,13 +1633,14 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
}
|
||||
if (ret == false) {
|
||||
TraceEvent(SevError, "ServerTeamNumberOnMachineIncorrect").detail("MachineTeam", mt->getMachineIDsStr())
|
||||
.detail("ServerTeamsSize", mt->serverTeams.size()).detail("CountedServerTeamNumber", num);
|
||||
TraceEvent(SevError, "ServerTeamNumberOnMachineIncorrect")
|
||||
.detail("MachineTeam", mt->getMachineIDsStr())
|
||||
.detail("ServerTeamsSize", mt->serverTeams.size())
|
||||
.detail("CountedServerTeamNumber", num);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// Find the machine team with the least number of server teams
|
||||
int getMachineTeamWithLeastProcessTeams(Reference<TCMachineTeamInfo>& ret) {
|
||||
int minNumProcessTeams = std::numeric_limits<int>::max();
|
||||
|
@ -1808,7 +1808,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
.detail("TotalHealthyMachine", totalHealthyMachineCount)
|
||||
.trackLatest("TeamCollectionInfo");
|
||||
|
||||
|
||||
return addedTeams;
|
||||
}
|
||||
|
||||
|
@ -2241,7 +2240,9 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
|
|||
while (self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
|
||||
wait(self->zeroHealthyTeams->onChange() || self->processingUnhealthy->onChange());
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority)); //After the team trackers wait on the initial failure reaction delay, they yield. We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
// After the team trackers wait on the initial failure reaction delay, they yield.
|
||||
// We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority));
|
||||
if (!self->zeroHealthyTeams->get() && !self->processingUnhealthy->get()) {
|
||||
break;
|
||||
}
|
||||
|
@ -2263,7 +2264,9 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
|
|||
while (self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
|
||||
wait(self->zeroHealthyTeams->onChange() || self->processingUnhealthy->onChange());
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority)); //After the team trackers wait on the initial failure reaction delay, they yield. We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
// After the team trackers wait on the initial failure reaction delay, they yield.
|
||||
// We want to make sure every tracker has had the opportunity to send their relocations to the queue.
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskLowPriority));
|
||||
if (!self->zeroHealthyTeams->get() && !self->processingUnhealthy->get()) {
|
||||
break;
|
||||
}
|
||||
|
@ -2285,7 +2288,8 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
|
|||
// Sanity check all machine teams are healthy
|
||||
int currentHealthyMTCount = self->getHealthyMachineTeamCount();
|
||||
if (currentHealthyMTCount != self->machineTeams.size()) {
|
||||
TraceEvent(SevError, "InvalidAssumption").detail("TotalHealthyMachineCount", totalHealthyMachineCount)
|
||||
TraceEvent(SevError, "InvalidAssumption")
|
||||
.detail("TotalHealthyMachineCount", totalHealthyMachineCount)
|
||||
.detail("MachineNumber", self->machine_info.size())
|
||||
.detail("CurrentHealthyMTCount", currentHealthyMTCount)
|
||||
.detail("MachineTeamNumber", self->machineTeams.size());
|
||||
|
@ -2312,7 +2316,8 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
|
|||
// The team will be marked as a bad team
|
||||
bool foundTeam = self->removeTeam(team);
|
||||
ASSERT(foundTeam == true);
|
||||
//removeTeam() has side effect of swapping the last element to the current pos in the serverTeams vector in the machine team.
|
||||
// removeTeam() has side effect of swapping the last element to the current pos
|
||||
// in the serverTeams vector in the machine team.
|
||||
--teamIndex;
|
||||
self->addTeam(team->servers, true, true);
|
||||
TEST(true);
|
||||
|
@ -2323,7 +2328,8 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
|
|||
self->addActor.send(self->badTeamRemover);
|
||||
}
|
||||
|
||||
TraceEvent("TeamRemover").detail("MachineTeamToRemove", mt->getMachineIDsStr())
|
||||
TraceEvent("TeamRemover")
|
||||
.detail("MachineTeamToRemove", mt->getMachineIDsStr())
|
||||
.detail("NumProcessTeamsOnTheMachineTeam", minNumProcessTeams)
|
||||
.detail("CurrentMachineTeamNumber", self->machineTeams.size())
|
||||
.detail("DesiredMachineTeam", desiredMachineTeams);
|
||||
|
|
|
@ -120,7 +120,7 @@ public:
|
|||
double FREE_SPACE_RATIO_CUTOFF;
|
||||
double FREE_SPACE_RATIO_DD_CUTOFF;
|
||||
int DESIRED_TEAMS_PER_SERVER;
|
||||
int MAX_TEAMS_PER_SERVER;;
|
||||
int MAX_TEAMS_PER_SERVER;
|
||||
int64_t DD_SHARD_SIZE_GRANULARITY;
|
||||
int64_t DD_SHARD_SIZE_GRANULARITY_SIM;
|
||||
int DD_MOVE_KEYS_PARALLELISM;
|
||||
|
|
|
@ -259,8 +259,8 @@ ACTOR Future<bool> getTeamCollectionValid( Database cx, WorkerInterface masterWo
|
|||
try {
|
||||
TraceEvent("GetTeamCollectionValid").detail("Stage", "ContactingMaster");
|
||||
|
||||
TraceEventFields teamCollectionInfoMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( LiteralStringRef("TeamCollectionInfo") ) ), 1.0 ) );
|
||||
TraceEventFields teamCollectionInfoMessage = wait(timeoutError(
|
||||
masterWorker.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TeamCollectionInfo"))), 1.0));
|
||||
|
||||
TraceEvent("GetTeamCollectionValid").detail("Stage", "GotString");
|
||||
|
||||
|
@ -274,15 +274,15 @@ ACTOR Future<bool> getTeamCollectionValid( Database cx, WorkerInterface masterWo
|
|||
sscanf(teamCollectionInfoMessage.getValue("CurrentTeamNumber").c_str(), "%lld", ¤tTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("DesiredTeamNumber").c_str(), "%lld", &desiredTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("MaxTeamNumber").c_str(), "%lld", &maxTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentMachineTeamNumber").c_str(), "%lld", ¤tMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentHealthyMachineTeamNumber").c_str(), "%lld", &healthyMachineTeamCount);
|
||||
sscanf(teamCollectionInfoMessage.getValue("DesiredMachineTeams").c_str(), "%lld", &desiredMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentMachineTeamNumber").c_str(), "%lld",
|
||||
¤tMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("CurrentHealthyMachineTeamNumber").c_str(), "%lld",
|
||||
&healthyMachineTeamCount);
|
||||
sscanf(teamCollectionInfoMessage.getValue("DesiredMachineTeams").c_str(), "%lld",
|
||||
&desiredMachineTeamNumber);
|
||||
sscanf(teamCollectionInfoMessage.getValue("MaxMachineTeams").c_str(), "%lld", &maxMachineTeamNumber);
|
||||
|
||||
//if (currentTeamNumber > desiredTeamNumber || currentMachineTeamNumber > desiredMachineTeamNumber) {
|
||||
if (currentMachineTeamNumber > maxMachineTeamNumber || healthyMachineTeamCount > desiredMachineTeamNumber) {
|
||||
// printf("getTeamCollectionValid: currentTeamNumber:%ld, desiredTeamNumber:%ld, maxTeamNumber:%ld currentMachineTeamNumber:%ld, desiredMachineTeamNumber:%ld, maxMachineTeamNumber:%ld\n",
|
||||
// currentTeamNumber, desiredTeamNumber, maxTeamNumber, currentMachineTeamNumber, desiredMachineTeamNumber, maxMachineTeamNumber);
|
||||
TraceEvent("GetTeamCollectionValid")
|
||||
.detail("CurrentTeamNumber", currentTeamNumber)
|
||||
.detail("DesiredTeamNumber", desiredTeamNumber)
|
||||
|
@ -297,17 +297,18 @@ ACTOR Future<bool> getTeamCollectionValid( Database cx, WorkerInterface masterWo
|
|||
}
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract GetTeamCollectionValid information");
|
||||
TraceEvent("QuietDatabaseFailure", masterWorker.id())
|
||||
.detail("Reason", "Failed to extract GetTeamCollectionValid information");
|
||||
attempts++;
|
||||
if (attempts > 10) {
|
||||
TraceEvent("QuietDatabaseNoTeamCollectionInfo", masterWorker.id()).detail("Reason", "Had never called build team to build any team");
|
||||
TraceEvent("QuietDatabaseNoTeamCollectionInfo", masterWorker.id())
|
||||
.detail("Reason", "Had never called build team to build any team");
|
||||
return true;
|
||||
}
|
||||
// throw;
|
||||
wait(delay(10.0));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
|
||||
|
@ -401,9 +402,9 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
state Future<bool> dataDistributionActive = getDataDistributionActive( cx, masterWorker );
|
||||
state Future<bool> storageServersRecruiting = getStorageServersRecruiting ( cx, dbInfo, masterWorker );
|
||||
|
||||
wait( success( dataInFlight ) && success( tLogQueueSize ) && success( dataDistributionQueueSize )
|
||||
&& success( teamCollectionValid ) && success( storageQueueSize )
|
||||
&& success( dataDistributionActive ) && success( storageServersRecruiting ) );
|
||||
wait(success(dataInFlight) && success(tLogQueueSize) && success(dataDistributionQueueSize) &&
|
||||
success(teamCollectionValid) && success(storageQueueSize) && success(dataDistributionActive) &&
|
||||
success(storageServersRecruiting));
|
||||
TraceEvent(("QuietDatabase" + phase).c_str())
|
||||
.detail("DataInFlight", dataInFlight.get())
|
||||
.detail("MaxTLogQueueSize", tLogQueueSize.get())
|
||||
|
@ -413,10 +414,10 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
|
|||
.detail("DataDistributionActive", dataDistributionActive.get())
|
||||
.detail("StorageServersRecruiting", storageServersRecruiting.get());
|
||||
|
||||
if ( dataInFlight.get() > dataInFlightGate || tLogQueueSize.get() > maxTLogQueueGate
|
||||
|| dataDistributionQueueSize.get() > maxDataDistributionQueueSize || storageQueueSize.get() > maxStorageServerQueueGate
|
||||
|| dataDistributionActive.get() == false || storageServersRecruiting.get() == true
|
||||
|| teamCollectionValid.get() == false) {
|
||||
if (dataInFlight.get() > dataInFlightGate || tLogQueueSize.get() > maxTLogQueueGate ||
|
||||
dataDistributionQueueSize.get() > maxDataDistributionQueueSize ||
|
||||
storageQueueSize.get() > maxStorageServerQueueGate || dataDistributionActive.get() == false ||
|
||||
storageServersRecruiting.get() == true || teamCollectionValid.get() == false) {
|
||||
|
||||
wait( delay( 1.0 ) );
|
||||
numSuccesses = 0;
|
||||
|
|
|
@ -206,10 +206,10 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
self->testFailure("Non-zero data distribution queue/in-flight size");
|
||||
}
|
||||
|
||||
//Check that the number of process (and machine) teams is no larger than the allowed maximum number of teams
|
||||
// Check that the number of process (and machine) teams is no larger than
|
||||
// the allowed maximum number of teams
|
||||
bool teamCollectionValid = wait(getTeamCollectionValid(cx, self->dbInfo));
|
||||
if (!teamCollectionValid)
|
||||
{
|
||||
if (!teamCollectionValid) {
|
||||
TraceEvent(SevError, "ConsistencyCheck_TooManyTeams");
|
||||
self->testFailure("The number of process or machine teams is larger than the allowed maximum number of teams");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue