TeamCollection: Test the number of teams

Call the traceTeamCollectionInfo function to record the team numbers
when we add a team directly from the shard information, instead of
using addTeamsBestOf logic.
This commit is contained in:
Meng Xu 2019-02-05 14:18:25 -08:00
parent f5171d1b57
commit 2b73c89e98
4 changed files with 136 additions and 36 deletions

View File

@ -39,6 +39,16 @@ class TCTeamInfo;
struct TCMachineInfo;
class TCMachineTeamInfo;
struct TeamBuildInfo {
int currentServerTeamNum;
int desiredServerTeamNum;
int maxServerTeamNum;
int currentMachineTeamNum;
int desiredMachineTeamNum;
int maxMachineTeamNum;
};
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id;
StorageServerInterface lastKnownInterface;
@ -957,6 +967,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
serverIds.push_back(*tempMap->getObject(it));
}
self->addTeam(serverIds.begin(), serverIds.end(), true);
self->traceTeamCollectionInfo();
}
} else {
serverIds.clear();
@ -985,6 +996,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
state std::set<std::vector<UID>>::iterator teamIterEnd = self->primary ? initTeams->primaryTeams.end() : initTeams->remoteTeams.end();
for(; teamIter != teamIterEnd; ++teamIter) {
self->addTeam(teamIter->begin(), teamIter->end(), true);
self->traceTeamCollectionInfo();
wait( yield() );
}
@ -1560,6 +1572,27 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return true;
}
int calculateHealthyServerCount() {
int serverCount = 0;
for (auto i =server_info.begin(); i != server_info.end(); ++i) {
if (!server_status.get(i->first).isUnhealthy()) {
++serverCount;
}
}
return serverCount;
}
int calculateHealthyMachineCount() {
int totalHealthyMachineCount = 0;
for (auto m : machine_info) {
if (isMachineHealthy(m.second)) {
++totalHealthyMachineCount;
}
}
return totalHealthyMachineCount;
}
// Create server teams based on machine teams
// Before the number of machine teams reaches the threshold, build a machine team for each server team
// When it reaches the threshold, first try to build a server team with existing machine teams; if failed,
@ -1585,12 +1618,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
++totalMachineTeamCount;
}
int totalHealthyMachineCount = 0;
for (auto m : machine_info) {
if (isMachineHealthy(m.second)) {
++totalHealthyMachineCount;
}
}
int totalHealthyMachineCount = calculateHealthyMachineCount();
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyMachineCount;
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyMachineCount;
@ -1687,7 +1715,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
TraceEvent("AddTeamsBestOf", masterId)
// teamBuildInfo.currentServerTeamNum = teams.size();
// teamBuildInfo.currentMachineTeamNum = machineTeams.size();
// teamBuildInfo.desiredMachineTeamNum = desiredMachineTeams;
// teamBuildInfo.maxMachineTeamNum = maxMachineTeams;
TraceEvent("TeamCollectionInfo", masterId)
.detail("Primary", primary)
.detail("AddedTeamNumber", addedTeams)
.detail("AimToBuildTeamNumber", teamsToBuild)
@ -1699,12 +1733,38 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("MaxMachineTeams", maxMachineTeams)
.detail("TotalHealthyMachine", totalHealthyMachineCount)
.trackLatest( "AddTeamsBestOf" );
.trackLatest( "TeamCollectionInfo" );
return addedTeams;
}
// Check if the number of server (and machine teams) is larger than the maximum allowed number
void traceTeamCollectionInfo() {
int totalHealthyServerCount = calculateHealthyServerCount();
int desiredServerTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyServerCount;
int maxServerTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyServerCount;
int totalHealthyMachineCount = calculateHealthyMachineCount();
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyMachineCount;
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyMachineCount;
TraceEvent("TeamCollectionInfo", masterId)
.detail("Primary", primary)
.detail("AddedTeamNumber", 0)
.detail("AimToBuildTeamNumber", 0)
.detail("CurrentTeamNumber", teams.size())
.detail("DesiredTeamNumber", desiredServerTeams)
.detail("MaxTeamNumber", maxServerTeams)
.detail("StorageTeamSize", configuration.storageTeamSize)
.detail("CurrentMachineTeamNumber", machineTeams.size())
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("MaxMachineTeams", maxMachineTeams)
.detail("TotalHealthyMachine", totalHealthyMachineCount)
.trackLatest( "TeamCollectionInfo" );
}
// Use the current set of known processes (from server_info) to compute an optimized set of storage server teams.
// The following are guarantees of the process:
// - Each newly-built team will meet the replication policy
@ -1770,6 +1830,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
state vector<std::vector<UID>> builtTeams;
// state struct TeamBuildInfo teamBuildInfo;
// teamBuildInfo.desiredServerTeamNum = desiredTeams;
// teamBuildInfo.maxServerTeamNum = maxTeams;
int addedTeams = self->addTeamsBestOf(teamsToBuild, desiredTeams, maxTeams);
if (addedTeams <= 0 && self->teams.size() == 0) {
TraceEvent(SevWarn, "NoTeamAfterBuildTeam")
@ -1778,6 +1842,25 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Debug: set true for traceAllInfo() to print out more information
self->traceAllInfo();
}
} else {
int totalHealthyMachineCount = self->calculateHealthyMachineCount();
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyMachineCount;
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyMachineCount;
TraceEvent("TeamCollectionInfo", self->masterId)
.detail("Primary", self->primary)
.detail("AddedTeamNumber", 0)
.detail("AimToBuildTeamNumber", teamsToBuild)
.detail("CurrentTeamNumber", self->teams.size())
.detail("DesiredTeamNumber", desiredTeams)
.detail("MaxTeamNumber", maxTeams)
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("CurrentMachineTeamNumber", self->machineTeams.size())
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("MaxMachineTeams", maxMachineTeams)
.detail("TotalHealthyMachine", totalHealthyMachineCount)
.trackLatest( "TeamCollectionInfo" );
}
}
@ -2687,6 +2770,7 @@ ACTOR Future<Void> storageServerTracker(
for(auto it : newBadTeams) {
if( self->removeTeam(it) ) {
self->addTeam(it->servers, true);
self->traceTeamCollectionInfo();
addedNewBadTeam = true;
}
}

View File

@ -254,39 +254,53 @@ ACTOR Future<int64_t> getDataDistributionQueueSize( Database cx, Reference<Async
//Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
ACTOR Future<bool> getTeamCollectionValid( Database cx, WorkerInterface masterWorker) {
try {
TraceEvent("GetTeamCollectionValid").detail("Stage", "ContactingMaster");
state int attempts = 0;
loop {
try {
TraceEvent("GetTeamCollectionValid").detail("Stage", "ContactingMaster");
TraceEventFields addTeamsBestOfMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( LiteralStringRef("AddTeamsBestOf") ) ), 1.0 ) );
TraceEventFields teamCollectionInfoMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( LiteralStringRef("TeamCollectionInfo") ) ), 1.0 ) );
TraceEvent("GetTeamCollectionValid").detail("Stage", "GotString");
TraceEvent("GetTeamCollectionValid").detail("Stage", "GotString");
int64_t currentTeamNumber;
int64_t desiredTeamNumber;
int64_t maxTeamNumber;
int64_t currentMachineTeamNumber;
int64_t desiredMachineTeamNumber;
int64_t maxMachineTeamNumber;
sscanf(addTeamsBestOfMessage.getValue("CurrentTeamNumber").c_str(), "%lld", &currentTeamNumber);
sscanf(addTeamsBestOfMessage.getValue("DesiredTeamNumber").c_str(), "%lld", &desiredTeamNumber);
sscanf(addTeamsBestOfMessage.getValue("MaxTeamNumber").c_str(), "%lld", &maxTeamNumber);
sscanf(addTeamsBestOfMessage.getValue("CurrentMachineTeamNumber").c_str(), "%lld", &currentMachineTeamNumber);
sscanf(addTeamsBestOfMessage.getValue("DesiredMachineTeams").c_str(), "%lld", &desiredMachineTeamNumber);
sscanf(addTeamsBestOfMessage.getValue("MaxMachineTeams").c_str(), "%lld", &maxMachineTeamNumber);
int64_t currentTeamNumber;
int64_t desiredTeamNumber;
int64_t maxTeamNumber;
int64_t currentMachineTeamNumber;
int64_t desiredMachineTeamNumber;
int64_t maxMachineTeamNumber;
sscanf(teamCollectionInfoMessage.getValue("CurrentTeamNumber").c_str(), "%lld", &currentTeamNumber);
sscanf(teamCollectionInfoMessage.getValue("DesiredTeamNumber").c_str(), "%lld", &desiredTeamNumber);
sscanf(teamCollectionInfoMessage.getValue("MaxTeamNumber").c_str(), "%lld", &maxTeamNumber);
sscanf(teamCollectionInfoMessage.getValue("CurrentMachineTeamNumber").c_str(), "%lld", &currentMachineTeamNumber);
sscanf(teamCollectionInfoMessage.getValue("DesiredMachineTeams").c_str(), "%lld", &desiredMachineTeamNumber);
sscanf(teamCollectionInfoMessage.getValue("MaxMachineTeams").c_str(), "%lld", &maxMachineTeamNumber);
if (currentTeamNumber > maxTeamNumber || currentMachineTeamNumber > maxMachineTeamNumber) {
printf("getTeamCollectionValid: currentTeamNumber:%ld, desiredTeamNumber:%ld, maxTeamNumber:%ld currentMachineTeamNumber:%ld, desiredMachineTeamNumber:%ld, maxMachineTeamNumber:%ld.",
currentTeamNumber, desiredTeamNumber, maxTeamNumber, currentMachineTeamNumber, desiredMachineTeamNumber, maxMachineTeamNumber);
return false;
} else {
return true;
if (currentTeamNumber > maxTeamNumber || currentMachineTeamNumber > maxMachineTeamNumber) {
printf("getTeamCollectionValid: currentTeamNumber:%ld, desiredTeamNumber:%ld, maxTeamNumber:%ld currentMachineTeamNumber:%ld, desiredMachineTeamNumber:%ld, maxMachineTeamNumber:%ld.",
currentTeamNumber, desiredTeamNumber, maxTeamNumber, currentMachineTeamNumber, desiredMachineTeamNumber, maxMachineTeamNumber);
TraceEvent("GetTeamCollectionValid").detail("CurrentTeamNumber", currentTeamNumber)
.detail("DesiredTeamNumber", desiredTeamNumber).detail("MaxTeamNumber", maxTeamNumber)
.detail("CurrentMachineTeamNumber", currentMachineTeamNumber).detail("DesiredMachineTeams", desiredMachineTeamNumber)
.detail("MaxMachineTeams", maxMachineTeamNumber);
return false;
} else {
return true;
}
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract GetTeamCollectionValid information");
attempts++;
if ( attempts > 10 ) {
TraceEvent("QuietDatabaseNoTeamCollectionInfo", masterWorker.id()).detail("Reason", "Had never called build team to build any team");
return true;
}
//throw;
wait( delay(5.0) );
}
};
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract GetTeamCollectionValid information");
throw;
}
}
//Gets if the number of process and machine teams does not exceed the maximum allowed number of teams

View File

@ -690,6 +690,8 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
}
}
printf("MX: success:%d failure:%d\n", success, failure);
if( spec.phases & TestWorkload::METRICS ) {
state std::vector< Future<vector<PerfMetric>> > metricTasks;
printf("fetching metrics (%s)...\n", printable(spec.title).c_str());

View File

@ -210,7 +210,7 @@ struct ConsistencyCheckWorkload : TestWorkload
bool teamCollectionValid = wait(getTeamCollectionValid(cx, self->dbInfo));
if (!teamCollectionValid)
{
TraceEvent("ConsistencyCheck_TooManyTeams");
TraceEvent(SevError, "ConsistencyCheck_TooManyTeams");
self->testFailure("The number of process or machine teams is larger than the allowed maximum number of teams");
}