TeamCollection: Bug fix in handle server locality change

Make sure the link between server and machine is updated
in both server and machine.
Rename function name to better reflect its functionality.

Signed-off-by: Meng Xu <meng_xu@apple.com>
This commit is contained in:
Meng Xu 2018-12-12 11:44:05 -08:00
parent e069b5c31c
commit ad7040efcd
1 changed files with 53 additions and 41 deletions

View File

@ -1156,29 +1156,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return addMachineTeam(machines);
}
// Construct a machine for the server uid (which is the input)
int constructMachineFor1Server(UID const& uid) {
ASSERT(server_info.find(uid) != server_info.end());
auto& server = server_info[uid];
auto& locality = server->lastKnownInterface.locality;
Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type
if (machine_info.find(machine_id) ==
machine_info.end()) { // uid is the first storage server process on the machine
// For each machine, store the first server's localityEntry into machineInfo for later use.
LocalityEntry localityEntry = machineLocalityMap.add(locality, &uid);
Reference<TCMachineInfo> machineInfo = Reference<TCMachineInfo>(new TCMachineInfo(server, localityEntry));
server->machine = machineInfo;
machine_info.insert(std::make_pair(machine_id, machineInfo));
} else {
Reference<TCMachineInfo> machineInfo = machine_info.find(machine_id)->second;
machineInfo->serversOnMachine.push_back(server);
server->machine = machineInfo;
}
return 0;
}
// Group storage servers (process) based on their machineId in LocalityData
// All created machines are healthy
// Return The number of healthy servers we grouped into machines
@ -1186,7 +1163,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int totalServerIndex = 0;
for(auto i = server_info.begin(); i != server_info.end(); ++i) {
if (!server_status.get(i->first).isUnhealthy()) {
constructMachineFor1Server(i->first);
checkAndCreateMachine(i->second);
totalServerIndex++;
}
}
@ -1839,7 +1816,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass, includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end(), storageServerSet ) );
// Establish the relation between server and machine
constructMachineFor1Server(newServer.id());
checkAndCreateMachine(r);
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, errorOut, addedVersion );
doBuildTeams = true; // Adding a new server triggers to build new teams
@ -1873,8 +1850,38 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return found;
}
// Check if the server belongs to a machine; if not, create the machine.
// Establish the two-direction link between server and machine
Reference<TCMachineInfo> checkAndCreateMachine(Reference<TCServerInfo> server) {
ASSERT(server.isValid() && server_info.find(server->id) != server_info.end());
auto& locality = server->lastKnownInterface.locality;
Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type
Reference<TCMachineInfo> machineInfo;
if (machine_info.find(machine_id) ==
machine_info.end()) { // uid is the first storage server process on the machine
TEST(true);
// For each machine, store the first server's localityEntry into machineInfo for later use.
LocalityEntry localityEntry = machineLocalityMap.add(locality, &server->id);
machineInfo = Reference<TCMachineInfo>(new TCMachineInfo(server, localityEntry));
machine_info.insert(std::make_pair(machine_id, machineInfo));
} else {
machineInfo = machine_info.find(machine_id)->second;
machineInfo->serversOnMachine.push_back(server);
}
server->machine = machineInfo;
return machineInfo;
}
Reference<TCMachineInfo> checkAndCreateMachineByServerID(UID& id) {
ASSERT(server_info.find(id) != server_info.end());
Reference<TCServerInfo> server = server_info[id];
return checkAndCreateMachine(server);
}
// Check if the serverTeam belongs to a machine team; If not, create the machine team
void checkAndCreateMachineTeam(Reference<TCTeamInfo> serverTeam) {
Reference<TCMachineTeamInfo> checkAndCreateMachineTeam(Reference<TCTeamInfo> serverTeam) {
std::vector<Standalone<StringRef>> machineIDs;
for (auto& server : serverTeam->servers) {
Reference<TCMachineInfo> machine = server->machine;
@ -1882,9 +1889,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
std::sort(machineIDs.begin(), machineIDs.end());
if (!machineTeamExists(machineIDs)) { // Create the machine team if it does not exist
addMachineTeam(machineIDs.begin(), machineIDs.end());
Reference<TCMachineTeamInfo> machineTeam = findMachineTeam(machineIDs);
if (!machineTeam.isValid()) { // Create the machine team if it does not exist
machineTeam = addMachineTeam(machineIDs.begin(), machineIDs.end());
}
return machineTeam;
}
void updateMachineRepresentativeServer(Reference<TCMachineInfo> machine) {
@ -2639,26 +2649,26 @@ ACTOR Future<Void> storageServerTracker(
} else {
// we remove the server from the machine, and
// update locality entry for the machine and the global machineLocalityMap
bool foundServer = false;
int serverIndex = -1;
for (int i = 0; i < machine->serversOnMachine.size(); ++i) {
if (machine->serversOnMachine[i].getPtr() == server) {
serverIndex = i;
machine->serversOnMachine[i--] = machine->serversOnMachine.back();
machine->serversOnMachine.pop_back();
foundServer = true;
}
}
ASSERT(foundServer);
self->updateMachineRepresentativeServer(machine);
ASSERT(serverIndex != -1);
if (serverIndex == 0) {
self->updateMachineRepresentativeServer(machine);
}
}
// Second handle the impact on the destination machine where the server's new locality is;
// destMachineID is the unique ID of the machine where the server's new locality is
Standalone<StringRef> destMachineID = server->lastKnownInterface.locality.zoneId().get();
if (self->machine_info.find(destMachineID) == self->machine_info.end()) {
// When server is moved to a new machine, we construct the new machine
// constructMachineFor1Server() must be called after server locality is updated
self->constructMachineFor1Server(server->id);
}
// If the destination machine is new, create one; otherwise, add server to an existing one
// Update server's machine reference to the destination machine
Reference<TCMachineInfo> destMachine = self->checkAndCreateMachineByServerID(server->id);
ASSERT(destMachine.isValid());
// Ensure the server's server team belong to a machine team, and
// Get the newBadTeams due to the locality change
vector<Reference<TCTeamInfo>> newBadTeams;
@ -2667,7 +2677,9 @@ ACTOR Future<Void> storageServerTracker(
newBadTeams.push_back(serverTeam);
continue;
}
self->checkAndCreateMachineTeam(serverTeam);
Reference<TCMachineTeamInfo> machineTeam = self->checkAndCreateMachineTeam(serverTeam);
ASSERT(machineTeam.isValid());
serverTeam->machineTeam = machineTeam;
}
// The END of handling the impact on the machine and machine teams due to the locality change
@ -3310,7 +3322,7 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
collection->server_info[uid] = Reference<TCServerInfo>(new TCServerInfo(interface, ProcessClass(), true, collection->storageServerSet));
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
collection->constructMachineFor1Server(uid);
collection->checkAndCreateMachine(collection->server_info[uid]);
}
return collection;