TeamCollection: Cleanup code and add checks

Remove unnecessary sanity checks and remove the dead code.
Add some necessary sanity checks.

Signed-off-by: Meng Xu <meng_xu@apple.com>
This commit is contained in:
Meng Xu 2018-11-30 17:40:18 -08:00
parent ea3bd1502d
commit f311455c45
1 changed files with 50 additions and 135 deletions

View File

@ -1117,7 +1117,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Note an initial team may be added at init() even though the team size is not storageTeamSize
if (!machineTeamInfo.isValid() && !machineIDs.empty()) {
machineTeamInfo = addMachineTeam(machineIDs.begin(), machineIDs.end());
teamInfo->machineTeam = machineTeamInfo;
}
if (!machineTeamInfo.isValid()) {
@ -1307,8 +1306,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
for (auto& team : machineTeams) {
TraceEvent("MachineTeamInfo")
.detail("TeamIndex", i++)
.detail("MachineIDs", team->getMachineIDsStr())
.detail("MachineTeamScore", calculateMachineTeamScore(team));
.detail("MachineIDs", team->getMachineIDsStr());
}
}
@ -1427,6 +1425,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
LocalityEntry process = tcMachineInfo->localityEntry;
forcedAttributes.push_back(process);
}
// TODO: If leastUsedServers is empty, you can simply return because you will never succeed.
// Step 4: Reuse Policy's selectReplicas() to create team for the representative process.
std::vector<UID*> bestTeam;
@ -1460,10 +1459,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Standalone<StringRef> machine_id = server->lastKnownInterface.locality.zoneId().get();
machineIDs.push_back(machine_id);
}
if (!isMachineIDValid(machineIDs)) {
maxAttempts += 1;
continue;
}
// Only choose healthy machines into machine team
ASSERT(isMachineTeamHealthy(machineIDs));
@ -1554,19 +1550,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return false;
}
bool isMachineIDValid(vector<Standalone<StringRef>> const& machineIDs) {
for (auto& id : machineIDs) {
if (machine_info.find(id) == machine_info.end() || machine_info[id]->serversOnMachine.size() == 0) {
TraceEvent(SevError, "InvalidMachineID")
.detail("MachineID", id.contents().printable())
.detail("ServerNumber", machine_info[id]->serversOnMachine.size());
return false;
}
}
return true;
}
// Return the healthy server with the least number of correct-size server teams
Reference<TCServerInfo> findOneLeastUsedServer() {
vector<Reference<TCServerInfo>> leastUsedServers;
@ -1575,7 +1558,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Only pick healthy server, which is not failed or excluded.
if (server_status.get(server.first).isUnhealthy()) continue;
int numTeams = countCorrectSizeTeam(server.second, configuration.storageTeamSize);
int numTeams = server.second->teams.size();
if (numTeams < minTeamNumber) {
minTeamNumber = numTeams;
leastUsedServers.clear();
@ -1610,38 +1593,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return Reference<TCMachineTeamInfo>();
}
// TODO: Remove this function if ASSERT() is never triggered
// A server's team may have incorrect size. We do NOT want to count those teams because they will be deleted any way
int countCorrectSizeTeam(Reference<TCServerInfo>& server, int expectedSize) {
int count = 0;
for (auto& team : server->teams) {
if (team->size() == expectedSize) {
++count;
}
}
ASSERT(count == server->teams.size());
return count;
}
// Machine team score is the total number of server teams owned by the servers on the machine team
// plus the machine score.
// Machine score is the max number of server teams of servers on the machine.
// Adding machine core to penalize the machine that includes a server that owns a large number of teams.
int calculateMachineTeamScore(Reference<TCMachineTeamInfo> machineTeam) {
int score = 0;
for (auto& machine : machineTeam->machines) {
int machineScore = 0;
for (auto& server : machine->serversOnMachine) {
score += server->teams.size();
if (server->teams.size() > machineScore) machineScore = server->teams.size();
}
score += machineScore;
}
return score;
}
// Check if machines on which server team is on belong to a machine team.
// A server team should always come from servers on a machine team
// Check if it is true
bool isOnSameMachineTeam(Reference<TCTeamInfo>& team) {
std::vector<Standalone<StringRef>> machineIDs;
for (auto& server : team->servers) {
@ -1666,20 +1619,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Sanity check the property of teams in unit test
// Return true if all server teams belong to machine teams
bool sanityCheckTeams() {
int teamIndex = 0;
int alwaysOnSameMachineTeam = true;
for (auto& team : teams) {
bool onSameMachineTeam = isOnSameMachineTeam(team);
if (onSameMachineTeam == false) alwaysOnSameMachineTeam = false;
int memberIndex = 0;
for (auto& server : team->servers) {
memberIndex++;
if (isOnSameMachineTeam(team) == false) {
return false;
}
teamIndex++;
}
return alwaysOnSameMachineTeam;
return true;
}
// A machine may have some invalid machine teams, which will eventually be removed
@ -1710,24 +1656,32 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int addedMachineTeams = 0;
int addedTeams = 0;
int loopCount = 0;
bool ignoreMachineTeamThreshhold = false;
// Exclude machine teams who have members in the wrong configuration.
// When we change configuration, we may have machine teams with storageTeamSize in the old configuration.
int healthyMachineTeamCount = 0;
int totalMachineTeamCount = 0;
for (auto mt= machineTeams.begin(); mt != machineTeams.end(); ++mt) {
if ((*mt)->machines.size() == configuration.storageTeamSize) {
if ((*mt)->machines.size() == configuration.storageTeamSize) { // TODO: Remove this if if SevError never happens
if (isMachineTeamHealthy(*mt)) {
++healthyMachineTeamCount;
}
++totalMachineTeamCount;
} else {
TraceEvent(SevError, "MachineTeamHasIncorrectSize")
.detail("Observation", "WeMustCheckMachineTeamSize");
}
}
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * machine_info.size();
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * machine_info.size();
int totalHealthyMachineCount = 0;
for (auto m : machine_info) {
if (isMachineHealthy(m.second)) {
++totalHealthyMachineCount;
}
}
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyMachineCount;
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyMachineCount;
// machineTeamsToBuild mimics how the teamsToBuild is calculated in buildTeams()
int machineTeamsToBuild = std::min(desiredMachineTeams - healthyMachineTeamCount, maxMachineTeams - totalMachineTeamCount);
@ -1738,33 +1692,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
while (addedTeams < teamsToBuild) {
// Step 1: Create 1 best machine team
if (machineTeams.size() > maxMachineTeams &&
ignoreMachineTeamThreshhold ) { // SOMEDAY: only count valid machine teams
TEST(true);
machineTeamsToBuild = 1; // Ignore the machine team limit and build a new one, hoping to find a server team
addedMachineTeams = addBestMachineTeams(machineTeamsToBuild);
// Record the exception that we build more machine teams than the threshold
TraceEvent(SevWarn, "MachineTeamNumReachThreshold")
.detail("IgnoreThreshold", ignoreMachineTeamThreshhold)
.detail("Primary", primary)
.detail("AddedMachineTeamsNumber", addedMachineTeams)
.detail("AimToBuildMachineNumber", machineTeamsToBuild)
.detail("MachineTeamsNumber", machineTeams.size())
.detail("CurrentUniqueMachineTeamNum", countMachineTeams())
.detail("StorageTeamSize", configuration.storageTeamSize)
.detail("TeamsToBuild", teamsToBuild)
.detail("CurrentTeamNumber", teams.size());
}
// Log the situation when we build too many machine teams
if (machineTeams.size() > 2 * maxMachineTeams) {
TraceEvent(SevWarnAlways, "TooManyMachineTeams")
.suppressFor(1.0)
.detail("NumberOfMachines", machineTeams.size())
.detail("MachineTeamThreshold", maxMachineTeams)
.detail("CurrentNumberOfMachineTeams", machineTeams.size());
}
std::vector<UID> bestServerTeam;
int bestScore = std::numeric_limits<int>::max();
int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; // BEST_OF_AMT = 4
@ -1778,9 +1705,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<TCMachineTeamInfo> chosenMachineTeam = findOneRandomMachineTeam(chosenServer);
if (!chosenMachineTeam.isValid()) {
// TODO: MX: Debug: may change SevWarn to SevError to trigger error in correctness test.
// TODO: MX: Ask Evan: We may face the situation that temporarily we have no healthy machine. What
// should we do?
// We may face the situation that temporarily we have no healthy machine.
TraceEvent(SevWarn, "MachineTeamNotFound")
.detail("Primary", primary)
.detail("MachineTeamNumber", machineTeams.size());
@ -1790,24 +1715,25 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// From here, chosenMachineTeam must have a healthy server team
// Step 3: Randomly pick 1 server from each machine in the chosen machine team to form a server team
vector<UID> serverTeam;
int tmpIndex = 0;
int chosenServerCount = 0;
for (auto& machine : chosenMachineTeam->machines) {
std::vector<Reference<TCServerInfo>> healthyProcesses;
for(auto it : machine->serversOnMachine) {
if(!server_status.get(it->id).isUnhealthy()) {
healthyProcesses.push_back(it);
}
}
UID serverID;
if ( machine == chosenServer->machine ) {
if (machine == chosenServer->machine) {
serverID = chosenServer->id;
++chosenServerCount;
} else {
std::vector<Reference<TCServerInfo>> healthyProcesses;
for(auto it : machine->serversOnMachine) {
if(!server_status.get(it->id).isUnhealthy()) {
healthyProcesses.push_back(it);
}
}
serverID = g_random->randomChoice(healthyProcesses)->id;
}
serverTeam.push_back(serverID);
}
ASSERT(chosenServerCount == 1); // chosenServer should be used exactly once
ASSERT(serverTeam.size() == configuration.storageTeamSize);
std::sort(serverTeam.begin(), serverTeam.end());
@ -1828,13 +1754,9 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
if (bestServerTeam.size() != configuration.storageTeamSize) { // Not find any team
if (ignoreMachineTeamThreshhold) { // Has tried to build one more machine team but still failed
break;
} else {
ignoreMachineTeamThreshhold = true; // Try to find a server team by building more machine teams
continue; // while( addedTeams < teamsToBuild )
}
if (bestServerTeam.size() != configuration.storageTeamSize) {
// Not find any team and will unlikely find a team
break;
}
// Step 4: Add the server team
@ -1845,6 +1767,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
break;
}
}
TraceEvent("AddTeamsBestOf")
.detail("Primary", primary)
.detail("AddedTeamNumber", addedTeams)
@ -1883,10 +1806,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("ServerNumber", self->server_info.size())
.detail("UniqueMachines", uniqueMachines)
.detail("StorageTeamSize", self->configuration.storageTeamSize);
if (self->teams.size() == 0) {
TraceEvent("BuildTeams").detail("ZeroTeams", "Check the server and machine info below");
self->traceAllInfo(true);
}
// If there are too few machines to even build teams or there are too few represented datacenters, build no new teams
if( uniqueMachines >= self->configuration.storageTeamSize ) {
@ -1930,7 +1849,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
TraceEvent(SevWarn, "NoTeamAfterBuildTeam")
.detail("TeamNum", self->teams.size())
.detail("Debug", "Check information below");
self->traceAllInfo(true);
// Debug: set true for traceAllInfo() to print out more information
self->traceAllInfo();
}
}
}
@ -1962,19 +1882,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("NonFailedServerCount", desiredServerVector.size());
}
void countHealthyTeams() {
int healthy = 0;
for (auto it = teams.begin(); it != teams.end(); it++) {
if ((*it)->isHealthy()) {
healthy++;
}
}
TraceEvent(healthy == healthyTeamCount ? SevInfo : SevWarnAlways, "HealthyTeamCheck", masterId)
.detail("ValidatedCount", healthy)
.detail("ProvidedCount", healthyTeamCount)
.detail("Primary", primary);
}
bool shouldHandleServer(const StorageServerInterface &newServer) {
return (includedDCs.empty() ||
std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end() ||
@ -2056,11 +1963,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Step: Remove machine info related to removedServer
// Remove the server from its machine
Reference<TCMachineInfo> removedMachineInfo = removedServerInfo->machine;
bool representServerRemoved = false;
for (int i = 0; i < removedMachineInfo->serversOnMachine.size(); ++i) {
if (removedMachineInfo->serversOnMachine[i] == removedServerInfo) {
// Safe even when removedServerInfo is the last one
removedMachineInfo->serversOnMachine[i--] = removedMachineInfo->serversOnMachine.back();
removedMachineInfo->serversOnMachine.pop_back();
if ( i == 0 ) {
representServerRemoved = true;
}
break;
}
}
@ -2100,7 +2011,11 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
TraceEvent("MachineLocalityMapUpdate")
.detail("MachineUIDRemoved", removedMachineInfo->machineID.toString())
.detail("ServerID", removedServerInfo->id.toString());
} else { // Update machine's locality if machine uses removed server's locality and machine still has servers
// Update macineLocalityMap when a machine is removed
// FIXME: add remove support to localitySet so we do not have to recreate it
rebuildMachineLocalityMap();
} else if (representServerRemoved) { // Update machine's locality if machine uses removed server's locality and machine still has servers
auto& representativeServer = removedMachineInfo->serversOnMachine[0];
auto& locality = representativeServer->lastKnownInterface.locality;
ASSERT(server_info.find(representativeServer->id) != server_info.end());
@ -2110,10 +2025,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("ServerIDOld", removedServerInfo->id.toString())
.detail("ServerIDNew", representativeServer->id.toString())
.detail("IsNewServerIDValid", server_info.find(representativeServer->id) != server_info.end());
// Update macineLocalityMap when a machine locality is changed
rebuildMachineLocalityMap();
}
// Update macineLocalityMap by rebuilding the map
// FIXME: add remove support to localitySet so we do not have to recreate it
rebuildMachineLocalityMap();
// Step: Remove removedServer from server's global data
for (int s = 0; s < allServers.size(); s++) {