Added support for tracking roles for simulation workers
Fixed the exclusion and inclusion address simulation API and integration within workloads Added more information within trace events for simulation
This commit is contained in:
parent
581bd6c8ed
commit
44e0df78c5
|
@ -77,12 +77,12 @@ void ISimulator::displayWorkers() const
|
|||
}
|
||||
|
||||
printf("DataHall ZoneId\n");
|
||||
printf(" Address Name Class Excluded Failed Rebooting DataFolder\n");
|
||||
printf(" Address Name Class Excluded Failed Rebooting Role DataFolder\n");
|
||||
for (auto& zoneRecord : zoneMap) {
|
||||
printf("\n%s\n", zoneRecord.first.c_str());
|
||||
for (auto& processInfo : zoneRecord.second) {
|
||||
printf(" %9s %-10s %-7s %-8s %-6s %-9s %-40s\n",
|
||||
processInfo->address.toString().c_str(), processInfo->name, processInfo->startingClass.toString().c_str(), (processInfo->excluded ? "True" : "False"), (processInfo->failed ? "True" : "False"), (processInfo->rebooting ? "True" : "False"), processInfo->dataFolder);
|
||||
printf(" %9s %-10s%-13s%-8s %-6s %-9s %-48s %-40s\n",
|
||||
processInfo->address.toString().c_str(), processInfo->name, processInfo->startingClass.toString().c_str(), (processInfo->excluded ? "True" : "False"), (processInfo->failed ? "True" : "False"), (processInfo->rebooting ? "True" : "False"), getRoles(processInfo->address).c_str(), processInfo->dataFolder);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1001,8 +1001,11 @@ public:
|
|||
for (auto processInfo : getAllProcesses()) {
|
||||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->isAvailableClass()) {
|
||||
// Ignore excluded machines
|
||||
if (processInfo->excluded)
|
||||
;
|
||||
// Mark all of the unavailable as dead
|
||||
if (!processInfo->isAvailable())
|
||||
else if (!processInfo->isAvailable())
|
||||
processesDead.push_back(processInfo);
|
||||
else if (protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
|
@ -1056,22 +1059,22 @@ public:
|
|||
}
|
||||
// Reboot and Delete if remaining machines do NOT fulfill policies
|
||||
else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (!processesLeft.validate(tLogPolicy))) {
|
||||
auto newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
canSurvive = false;
|
||||
TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("tLogPolicy", tLogPolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "tLogPolicy does not validates against remaining processes.");
|
||||
}
|
||||
else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (!processesLeft.validate(storagePolicy))) {
|
||||
auto newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
canSurvive = false;
|
||||
TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "storagePolicy does not validates against remaining processes.");
|
||||
}
|
||||
else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (nQuorum > uniqueMachines.size())) {
|
||||
auto newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot;
|
||||
canSurvive = false;
|
||||
TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators.");
|
||||
}
|
||||
else {
|
||||
TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("ZonesLeft", ::describeZones(localitiesLeft)).detail("ValidateRemaining", processesLeft.validate(tLogPolicy));
|
||||
TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("ZonesLeft", ::describeZones(localitiesLeft)).detail("DataHallsLeft", ::describeDataHalls(localitiesLeft)).detail("ValidateRemaining", processesLeft.validate(tLogPolicy));
|
||||
}
|
||||
}
|
||||
if (newKillType) *newKillType = newKt;
|
||||
|
@ -1095,12 +1098,12 @@ public:
|
|||
TEST( kt == InjectFaults ); // Simulated machine was killed with faults
|
||||
|
||||
if (kt == KillInstantly) {
|
||||
TraceEvent(SevWarn, "FailMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).backtrace();
|
||||
TraceEvent(SevWarn, "FailMachine", machine->locality.zoneId()).detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
|
||||
// This will remove all the "tracked" messages that came from the machine being killed
|
||||
latestEventCache.clear();
|
||||
machine->failed = true;
|
||||
} else if (kt == InjectFaults) {
|
||||
TraceEvent(SevWarn, "FaultMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).backtrace();
|
||||
TraceEvent(SevWarn, "FaultMachine", machine->locality.zoneId()).detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace();
|
||||
should_inject_fault = simulator_should_inject_fault;
|
||||
machine->fault_injection_r = g_random->randomUniqueID().first();
|
||||
machine->fault_injection_p1 = 0.1;
|
||||
|
@ -1111,8 +1114,10 @@ public:
|
|||
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting);
|
||||
}
|
||||
virtual void rebootProcess( ProcessInfo* process, KillType kt ) {
|
||||
if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) )
|
||||
if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) ) {
|
||||
TraceEvent("RebootChanged").detail("ZoneId", process->locality.describeZone()).detail("KillType", RebootProcess).detail("OrigKillType", kt).detail("Reason", "Protected process");
|
||||
kt = RebootProcess;
|
||||
}
|
||||
doReboot( process, kt );
|
||||
}
|
||||
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) {
|
||||
|
@ -1157,6 +1162,7 @@ public:
|
|||
TEST(kt == InjectFaults); // Trying to kill by injecting faults
|
||||
|
||||
if(speedUpSimulation && !forceKill) {
|
||||
TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "Unforced kill within speedy simulation.").backtrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1181,15 +1187,25 @@ public:
|
|||
if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete))
|
||||
{
|
||||
std::vector<ProcessInfo*> processesLeft, processesDead;
|
||||
int protectedWorker = 0, unavailable = 0, excluded = 0;
|
||||
|
||||
for (auto machineRec : machines) {
|
||||
for (auto processInfo : machineRec.second.processes) {
|
||||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->isAvailableClass()) {
|
||||
if (!processInfo->isAvailable())
|
||||
// Do not include any excluded machines
|
||||
if (processInfo->excluded) {
|
||||
processesDead.push_back(processInfo);
|
||||
else if (protectedAddresses.count(processInfo->address))
|
||||
excluded ++;
|
||||
}
|
||||
else if (!processInfo->isAvailable()) {
|
||||
processesDead.push_back(processInfo);
|
||||
unavailable ++;
|
||||
}
|
||||
else if (protectedAddresses.count(processInfo->address)) {
|
||||
processesLeft.push_back(processInfo);
|
||||
protectedWorker ++;
|
||||
}
|
||||
else if (machineRec.second.zoneId != zoneId)
|
||||
processesLeft.push_back(processInfo);
|
||||
// Add processes from dead machines and datacenter machines to dead group
|
||||
|
@ -1202,7 +1218,7 @@ public:
|
|||
if ((kt != Reboot) && (!killIsSafe)) {
|
||||
kt = Reboot;
|
||||
}
|
||||
TraceEvent("ChangedKillMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
TraceEvent("ChangedKillMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("Protected", protectedWorker).detail("Unavailable", unavailable).detail("Excluded", excluded).detail("ProtectedTotal", protectedAddresses.size()).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
}
|
||||
else if ((kt == KillInstantly) || (kt == InjectFaults)) {
|
||||
TraceEvent("DeadMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
|
@ -1229,31 +1245,30 @@ public:
|
|||
// Check if any processes on machine are rebooting
|
||||
if( processesOnMachine != processesPerMachine && kt >= RebootAndDelete ) {
|
||||
TEST(true); //Attempted reboot, but the target did not have all of its processes running
|
||||
TraceEvent(SevWarn, "AbortedReboot", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target did not have all of its processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
|
||||
TraceEvent(SevWarn, "AbortedKill", zoneId).detail("KillType", kt).detailext("ZoneId", zoneId).detail("Reason", "Machine processes does not match number of processes per machine").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if any processes on machine are rebooting
|
||||
if ( processesOnMachine != processesPerMachine) {
|
||||
TEST(true); //Attempted reboot, but the target did not have all of its processes running
|
||||
TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target did not have all of its processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
|
||||
TraceEvent(SevWarn, "AbortedKill", zoneId).detail("KillType", kt).detailext("ZoneId", zoneId).detail("Reason", "Machine processes does not match number of processes per machine").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig).detail("killIsSafe", killIsSafe);
|
||||
if (kt < RebootAndDelete ) {
|
||||
if(kt == InjectFaults && machines[zoneId].machineProcess != nullptr)
|
||||
killProcess_internal( machines[zoneId].machineProcess, kt );
|
||||
for (auto& process : machines[zoneId].processes) {
|
||||
TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString());
|
||||
TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString()).detail("failed", process->failed).detail("excluded", process->excluded).detail("rebooting", process->rebooting);
|
||||
if (process->startingClass != ProcessClass::TesterClass)
|
||||
killProcess_internal( process, kt );
|
||||
}
|
||||
}
|
||||
else if ( kt == Reboot || killIsSafe) {
|
||||
for (auto& process : machines[zoneId].processes) {
|
||||
TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString());
|
||||
TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString()).detail("failed", process->failed).detail("excluded", process->excluded).detail("rebooting", process->rebooting);
|
||||
if (process->startingClass != ProcessClass::TesterClass)
|
||||
doReboot(process, kt );
|
||||
}
|
||||
|
@ -1269,13 +1284,16 @@ public:
|
|||
int dcProcesses = 0;
|
||||
|
||||
// Switch to a reboot, if anything protected on machine
|
||||
for (auto& process : processes) {
|
||||
auto processDcId = process->locality.dcId();
|
||||
auto processZoneId = process->locality.zoneId();
|
||||
for (auto& procRecord : processes) {
|
||||
auto processDcId = procRecord->locality.dcId();
|
||||
auto processZoneId = procRecord->locality.zoneId();
|
||||
ASSERT(processZoneId.present());
|
||||
if (processDcId.present() && (processDcId == dcId)) {
|
||||
if (protectedAddresses.count(process->address))
|
||||
if ((kt != Reboot) && (protectedAddresses.count(procRecord->address))) {
|
||||
kt = Reboot;
|
||||
TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig)
|
||||
.detail("Reason", "Datacenter has protected process").detail("ProcessAddress", procRecord->address).detail("failed", procRecord->failed).detail("rebooting", procRecord->rebooting).detail("excluded", procRecord->excluded).detail("Process", describe(*procRecord));
|
||||
}
|
||||
datacenterZones[processZoneId.get()] ++;
|
||||
dcProcesses ++;
|
||||
}
|
||||
|
@ -1290,7 +1308,9 @@ public:
|
|||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->isAvailableClass()) {
|
||||
// Mark all of the unavailable as dead
|
||||
if (!processInfo->isAvailable())
|
||||
if (processInfo->excluded)
|
||||
processesDead.push_back(processInfo);
|
||||
else if (!processInfo->isAvailable())
|
||||
processesDead.push_back(processInfo);
|
||||
else if (protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
|
@ -1304,7 +1324,7 @@ public:
|
|||
}
|
||||
|
||||
if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
|
||||
TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", ktOrig).detail("NewKillType", kt);
|
||||
TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig);
|
||||
}
|
||||
else {
|
||||
TraceEvent("DeadDataCenter").detailext("DataCenter", dcId).detail("KillType", kt).detail("DcZones", datacenterZones.size()).detail("DcProcesses", dcProcesses).detail("ProcessesDead", processesDead.size()).detail("ProcessesLeft", processesLeft.size()).detail("tLogPolicy", storagePolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
|
@ -1319,10 +1339,13 @@ public:
|
|||
.detail("DcZones", datacenterZones.size())
|
||||
.detail("DcProcesses", dcProcesses)
|
||||
.detailext("DCID", dcId)
|
||||
.detail("KillType", kt);
|
||||
.detail("KillType", kt)
|
||||
.detail("OrigKillType", ktOrig);
|
||||
|
||||
for (auto& datacenterZone : datacenterZones)
|
||||
killMachine( datacenterZone.first, kt, (kt == RebootAndDelete), true);
|
||||
killMachine( datacenterZone.first, kt, (kt == RebootAndDelete), true);
|
||||
// ahm If above doesn't work, go conservative
|
||||
// killMachine( datacenterZone.first, kt, false, true);
|
||||
}
|
||||
virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) {
|
||||
if (mode == ClogDefault) {
|
||||
|
@ -1500,6 +1523,9 @@ static double networkLatency() {
|
|||
}
|
||||
|
||||
ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
|
||||
TraceEvent("RebootingProcessAttempt").detailext("ZoneId", p->locality.zoneId()).detail("KillType", kt).detail("Process", p->toString()).detail("startingClass", p->startingClass.toString()).detail("failed", p->failed).detail("excluded", p->excluded).detail("rebooting", p->rebooting).detail("TaskDefaultDelay", TaskDefaultDelay);
|
||||
// ASSERT(p->failed); //ahm
|
||||
|
||||
Void _ = wait( g_sim2.delay( 0, TaskDefaultDelay, p ) ); // Switch to the machine in question
|
||||
|
||||
try {
|
||||
|
@ -1512,7 +1538,7 @@ ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
|
|||
|
||||
if( p->rebooting )
|
||||
return;
|
||||
TraceEvent("RebootingMachine").detail("KillType", kt).detail("Address", p->address).detailext("ZoneId", p->locality.zoneId()).detailext("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString());
|
||||
TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detailext("ZoneId", p->locality.zoneId()).detailext("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("failed", p->failed).detail("excluded", p->excluded).backtrace();
|
||||
p->rebooting = true;
|
||||
p->shutdownSignal.send( kt );
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -151,17 +151,81 @@ public:
|
|||
virtual bool isAvailable() const = 0;
|
||||
virtual void displayWorkers() const;
|
||||
|
||||
virtual void excludeAddress(NetworkAddress const& address) {
|
||||
excludedAddresses.insert(address);
|
||||
virtual void addRole(NetworkAddress const& address, std::string const& role) {
|
||||
roleAddresses[address][role] ++;
|
||||
TraceEvent("RoleAdd").detail("Address", address).detail("Role", role).detail("Roles", roleAddresses[address].size()).detail("Value", roleAddresses[address][role]);
|
||||
}
|
||||
|
||||
virtual void removeRole(NetworkAddress const& address, std::string const& role) {
|
||||
auto addressIt = roleAddresses.find(address);
|
||||
if (addressIt != roleAddresses.end()) {
|
||||
auto rolesIt = addressIt->second.find(role);
|
||||
if (rolesIt != addressIt->second.end()) {
|
||||
if (rolesIt->second > 1) {
|
||||
rolesIt->second --;
|
||||
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", addressIt->second.size()).detail("Value", rolesIt->second).detail("Result", "Decremented Role");
|
||||
}
|
||||
else {
|
||||
addressIt->second.erase(rolesIt);
|
||||
if (addressIt->second.size()) {
|
||||
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", addressIt->second.size()).detail("Value", 0).detail("Result", "Removed Role");
|
||||
}
|
||||
else {
|
||||
roleAddresses.erase(addressIt);
|
||||
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", 0).detail("Value", 0).detail("Result", "Removed Address");
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Role Missing");
|
||||
}
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Address Missing");
|
||||
}
|
||||
}
|
||||
|
||||
virtual std::string getRoles(NetworkAddress const& address, bool skipWorkers = true) const {
|
||||
auto addressIt = roleAddresses.find(address);
|
||||
std::string roleText;
|
||||
if (addressIt != roleAddresses.end()) {
|
||||
for (auto& roleIt : addressIt->second) {
|
||||
if ((!skipWorkers) || (roleIt.first != "Worker"))
|
||||
roleText += roleIt.first + ((roleIt.second > 1) ? format("-%d ", roleIt.second) : " ");
|
||||
}
|
||||
}
|
||||
if (roleText.empty())
|
||||
roleText = "[unset]";
|
||||
return roleText;
|
||||
}
|
||||
|
||||
virtual void excludeAddress(NetworkAddress const& address) {
|
||||
excludedAddresses[address]++;
|
||||
TraceEvent("ExcludeAddress").detail("Address", address).detail("Value", excludedAddresses[address]);
|
||||
}
|
||||
|
||||
virtual void includeAddress(NetworkAddress const& address) {
|
||||
excludedAddresses.erase(address);
|
||||
auto addressIt = excludedAddresses.find(address);
|
||||
if (addressIt != excludedAddresses.end()) {
|
||||
if (addressIt->second > 1) {
|
||||
addressIt->second --;
|
||||
TraceEvent("IncludeAddress").detail("Address", address).detail("Value", addressIt->second).detail("Result", "Decremented");
|
||||
}
|
||||
else {
|
||||
excludedAddresses.erase(addressIt);
|
||||
TraceEvent("IncludeAddress").detail("Address", address).detail("Value", 0).detail("Result", "Removed");
|
||||
}
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn,"IncludeAddress").detail("Address", address).detail("Result", "Missing");
|
||||
}
|
||||
}
|
||||
virtual void includeAllAddresses() {
|
||||
TraceEvent("IncludeAddressAll").detail("AddressTotal", excludedAddresses.size());
|
||||
excludedAddresses.clear();
|
||||
}
|
||||
virtual bool isExcluded(NetworkAddress const& address) const {
|
||||
return excludedAddresses.count(address) == 0;
|
||||
return excludedAddresses.find(address) != excludedAddresses.end();
|
||||
}
|
||||
|
||||
virtual void disableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
|
||||
|
@ -230,7 +294,8 @@ protected:
|
|||
|
||||
private:
|
||||
std::set<Optional<Standalone<StringRef>>> swapsDisabled;
|
||||
std::set<NetworkAddress> excludedAddresses;
|
||||
std::map<NetworkAddress, int> excludedAddresses;
|
||||
std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
|
||||
bool allSwapsDisabled;
|
||||
};
|
||||
|
||||
|
|
|
@ -218,6 +218,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
std::vector<LocalityData> unavailableLocals;
|
||||
LocalitySetRef logServerSet;
|
||||
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
|
||||
UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
bool bCompleted = false;
|
||||
|
||||
logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
|
||||
|
@ -230,7 +231,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
}
|
||||
else {
|
||||
if (it.second.interf.locality.dataHallId().present())
|
||||
TraceEvent(SevWarn,"GWFTADNotAvailable", id)
|
||||
TraceEvent(SevWarn,"GWFTADNotAvailable", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detailext("Zone", it.second.interf.locality.zoneId())
|
||||
.detailext("DataHall", it.second.interf.locality.dataHallId())
|
||||
|
@ -243,7 +244,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
.detail("Locality", it.second.interf.locality.toString())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
unavailableLocals.push_back(it.second.interf.locality);
|
||||
}
|
||||
}
|
||||
|
@ -258,12 +260,13 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
logServerMap->add(worker.first.locality, &worker);
|
||||
}
|
||||
if (logServerSet->size() < conf.tLogReplicationFactor) {
|
||||
TraceEvent(SevWarn,"GWFTADTooFew", id)
|
||||
TraceEvent(SevWarn,"GWFTADTooFew", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
}
|
||||
else if (logServerSet->size() <= conf.getDesiredLogs()) {
|
||||
ASSERT(conf.tLogPolicy);
|
||||
|
@ -275,12 +278,13 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
break;
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn,"GWFTADNotAcceptable", id)
|
||||
TraceEvent(SevWarn,"GWFTADNotAcceptable", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
}
|
||||
}
|
||||
// Try to select the desired size, if larger
|
||||
|
@ -300,7 +304,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
results.push_back(*object);
|
||||
tLocalities.push_back(object->first.locality);
|
||||
}
|
||||
TraceEvent("GWFTADBestResults", id)
|
||||
TraceEvent("GWFTADBestResults", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("BestCount", bestSet.size())
|
||||
|
@ -308,17 +312,19 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("TotalResults", results.size())
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarn,"GWFTADNoBest", id)
|
||||
TraceEvent(SevWarn,"GWFTADNoBest", functionId)
|
||||
.detail("Fitness", fitness)
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("InterfaceId", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -331,7 +337,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
tLocalities.push_back(object->first.locality);
|
||||
}
|
||||
|
||||
TraceEvent(SevWarn, "GetTLogTeamFailed")
|
||||
TraceEvent(SevWarn, "GetTLogTeamFailed", functionId)
|
||||
.detail("Policy", conf.tLogPolicy->info())
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Workers", id_worker.size())
|
||||
|
@ -344,7 +350,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
|
||||
.detail("checkStable", checkStable)
|
||||
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS).backtrace();
|
||||
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS)
|
||||
.detail("InterfaceId", id).backtrace();
|
||||
|
||||
// Free the set
|
||||
logServerSet->clear();
|
||||
|
@ -356,14 +363,25 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
id_used[result.first.locality.processId()]++;
|
||||
}
|
||||
|
||||
TraceEvent("GetTLogTeamDone")
|
||||
TraceEvent("GetTLogTeamDone", functionId)
|
||||
.detail("Completed", bCompleted).detail("Policy", conf.tLogPolicy->info())
|
||||
.detail("Results", results.size()).detail("Processes", logServerSet->size())
|
||||
.detail("Workers", id_worker.size())
|
||||
.detail("Replication", conf.tLogReplicationFactor)
|
||||
.detail("Desired", conf.getDesiredLogs())
|
||||
.detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
|
||||
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS);
|
||||
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS)
|
||||
.detail("InterfaceId", id);
|
||||
|
||||
for (auto& result : results) {
|
||||
TraceEvent("GetTLogTeamWorker", functionId)
|
||||
.detail("Class", result.second.toString())
|
||||
.detail("Address", result.first.address())
|
||||
.detailext("Zone", result.first.locality.zoneId())
|
||||
.detailext("DataHall", result.first.locality.dataHallId())
|
||||
.detail("isExcludedServer", conf.isExcludedServer(result.first.address()))
|
||||
.detail("isAvailable", IFailureMonitor::failureMonitor().getState(result.first.storage.getEndpoint()).isAvailable());
|
||||
}
|
||||
|
||||
// Free the set
|
||||
logServerSet->clear();
|
||||
|
|
|
@ -204,8 +204,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
loop {
|
||||
auto waitTime = SERVER_KNOBS->MIN_REBOOT_TIME + (SERVER_KNOBS->MAX_REBOOT_TIME - SERVER_KNOBS->MIN_REBOOT_TIME) * g_random->random01();
|
||||
cycles ++;
|
||||
TraceEvent("SimulatedFDBDWait").detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", NetworkAddress(ip, port, true, false))
|
||||
TraceEvent("SimulatedFDBDPreWait").detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("Address", NetworkAddress(ip, port, true, false))
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("waitTime", waitTime).detail("Port", port);
|
||||
|
||||
|
@ -219,10 +219,10 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
TraceEvent("SimulatedRebooterStarting", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detailext("DataHall", localities.dataHallId())
|
||||
.detail("ProcessAddress", process->address.toString())
|
||||
.detail("ProcessExcluded", process->excluded)
|
||||
.detail("Address", process->address.toString())
|
||||
.detail("Excluded", process->excluded)
|
||||
.detail("UsingSSL", useSSL);
|
||||
TraceEvent("ProgramStart").detail("Cycles", cycles)
|
||||
TraceEvent("ProgramStart").detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("SourceVersion", getHGVersion())
|
||||
.detail("Version", FDB_VT_VERSION)
|
||||
.detail("PackageName", FDB_VT_PACKAGE_NAME)
|
||||
|
@ -248,7 +248,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
} catch (Error& e) {
|
||||
// If in simulation, if we make it here with an error other than io_timeout but enASIOTimedOut is set then somewhere an io_timeout was converted to a different error.
|
||||
if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut))
|
||||
TraceEvent(SevError, "IOTimeoutErrorSuppressed").detail("ErrorCode", e.code()).backtrace();
|
||||
TraceEvent(SevError, "IOTimeoutErrorSuppressed").detail("ErrorCode", e.code()).detail("RandomId", randomId).backtrace();
|
||||
|
||||
if (onShutdown.isReady() && onShutdown.isError()) throw onShutdown.getError();
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
|
@ -258,15 +258,15 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
}
|
||||
|
||||
TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", process->address)
|
||||
.detail("ProcessExcluded", process->excluded)
|
||||
.detail("Address", process->address)
|
||||
.detail("Excluded", process->excluded)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("KillType", onShutdown.isReady() ? onShutdown.get() : ISimulator::None);
|
||||
|
||||
if (!onShutdown.isReady())
|
||||
onShutdown = ISimulator::InjectFaults;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(destructed ? SevInfo : SevError, "SimulatedFDBDRebooterError", localities.zoneId()).error(e, true);
|
||||
TraceEvent(destructed ? SevInfo : SevError, "SimulatedFDBDRebooterError", localities.zoneId()).detail("RandomId", randomId).error(e, true);
|
||||
onShutdown = e;
|
||||
}
|
||||
|
||||
|
@ -276,6 +276,11 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
process->rebooting = true;
|
||||
process->shutdownSignal.send(ISimulator::None);
|
||||
}
|
||||
TraceEvent("SimulatedFDBDWait", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("Address", process->address)
|
||||
.detail("Excluded", process->excluded)
|
||||
.detail("Rebooting", process->rebooting)
|
||||
.detailext("ZoneId", localities.zoneId());
|
||||
Void _ = wait( g_simulator.onProcess( simProcess ) );
|
||||
|
||||
Void _ = wait(delay(0.00001 + FLOW_KNOBS->MAX_BUGGIFIED_DELAY)); // One last chance for the process to clean up?
|
||||
|
@ -284,15 +289,15 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
|
||||
auto shutdownResult = onShutdown.get();
|
||||
TraceEvent("SimulatedFDBDShutdown", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", process->address)
|
||||
.detail("ProcessExcluded", process->excluded)
|
||||
.detail("Address", process->address)
|
||||
.detail("Excluded", process->excluded)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("KillType", shutdownResult);
|
||||
|
||||
if( shutdownResult < ISimulator::RebootProcessAndDelete ) {
|
||||
TraceEvent("SimulatedFDBDLowerReboot", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", process->address)
|
||||
.detail("ProcessExcluded", process->excluded)
|
||||
.detail("Address", process->address)
|
||||
.detail("Excluded", process->excluded)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("KillType", shutdownResult);
|
||||
return onShutdown.get();
|
||||
|
@ -300,7 +305,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
|
||||
if( onShutdown.get() == ISimulator::RebootProcessAndDelete ) {
|
||||
TraceEvent("SimulatedFDBDRebootAndDelete", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", process->address)
|
||||
.detail("Address", process->address)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("KillType", shutdownResult);
|
||||
*coordFolder = joinPath(baseFolder, g_random->randomUniqueID().toString());
|
||||
|
@ -317,7 +322,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
}
|
||||
else {
|
||||
TraceEvent("SimulatedFDBDJustRepeat", localities.zoneId()).detail("Cycles", cycles).detail("RandomId", randomId)
|
||||
.detail("ProcessAddress", process->address)
|
||||
.detail("Address", process->address)
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detail("KillType", shutdownResult);
|
||||
}
|
||||
|
@ -351,6 +356,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
state int bootCount = 0;
|
||||
state std::vector<std::string> myFolders;
|
||||
state std::vector<std::string> coordFolders;
|
||||
state UID randomId = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
try {
|
||||
CSimpleIni ini;
|
||||
|
@ -387,6 +393,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
std::string path = joinPath(myFolders[i], "fdb.cluster");
|
||||
Reference<ClusterConnectionFile> clusterFile(useSeedFile ? new ClusterConnectionFile(path, connStr.toString()) : new ClusterConnectionFile(path));
|
||||
processes.push_back(simulatedFDBDRebooter(clusterFile, ips[i], sslEnabled, i + 1, localities, processClass, &myFolders[i], &coordFolders[i], baseFolder, connStr, useSeedFile, runBackupAgents));
|
||||
TraceEvent("SimulatedMachineProcess", randomId).detail("Address", NetworkAddress(ips[i], i+1, true, false)).detailext("ZoneId", localities.zoneId()).detailext("DataHall", localities.dataHallId()).detail("Folder", myFolders[i]);
|
||||
}
|
||||
|
||||
TEST( bootCount >= 1 ); // Simulated machine rebooted
|
||||
|
@ -394,7 +401,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
TEST( bootCount >= 3 ); // Simulated machine rebooted three times
|
||||
++bootCount;
|
||||
|
||||
TraceEvent("SimulatedMachineStart")
|
||||
TraceEvent("SimulatedMachineStart", randomId)
|
||||
.detail("Folder0", myFolders[0])
|
||||
.detail("CFolder0", coordFolders[0])
|
||||
.detail("MachineIPs", toIPVectorString(ips))
|
||||
|
@ -410,7 +417,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
|
||||
Void _ = wait( waitForAll( processes ) );
|
||||
|
||||
TraceEvent("SimulatedMachineRebootStart")
|
||||
TraceEvent("SimulatedMachineRebootStart", randomId)
|
||||
.detail("Folder0", myFolders[0])
|
||||
.detail("CFolder0", coordFolders[0])
|
||||
.detail("MachineIPs", toIPVectorString(ips))
|
||||
|
@ -447,7 +454,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
closingStr += it + ", ";
|
||||
}
|
||||
|
||||
TraceEvent("SimulatedMachineRebootAfterKills")
|
||||
TraceEvent("SimulatedMachineRebootAfterKills", randomId)
|
||||
.detail("Folder0", myFolders[0])
|
||||
.detail("CFolder0", coordFolders[0])
|
||||
.detail("MachineIPs", toIPVectorString(ips))
|
||||
|
@ -476,12 +483,12 @@ ACTOR Future<Void> simulatedMachine(
|
|||
openFiles += *it + ", ";
|
||||
i++;
|
||||
}
|
||||
TraceEvent("MachineFilesOpen").detail("PAddr", toIPVectorString(ips)).detail("OpenFiles", openFiles);
|
||||
TraceEvent("MachineFilesOpen", randomId).detail("PAddr", toIPVectorString(ips)).detail("OpenFiles", openFiles);
|
||||
} else
|
||||
break;
|
||||
|
||||
if( shutdownDelayCount++ >= 50 ) { // Worker doesn't shut down instantly on reboot
|
||||
TraceEvent(SevError, "SimulatedFDBDFilesCheck")
|
||||
TraceEvent(SevError, "SimulatedFDBDFilesCheck", randomId)
|
||||
.detail("PAddrs", toIPVectorString(ips))
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detailext("DataHall", localities.dataHallId());
|
||||
|
@ -492,8 +499,8 @@ ACTOR Future<Void> simulatedMachine(
|
|||
backoff = std::min( backoff + 1.0, 6.0 );
|
||||
}
|
||||
|
||||
TraceEvent("SimulatedFDBDFilesClosed")
|
||||
.detail("ProcessAddress", toIPVectorString(ips))
|
||||
TraceEvent("SimulatedFDBDFilesClosed", randomId)
|
||||
.detail("Address", toIPVectorString(ips))
|
||||
.detailext("ZoneId", localities.zoneId())
|
||||
.detailext("DataHall", localities.dataHallId());
|
||||
|
||||
|
@ -515,7 +522,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
|
||||
auto rebootTime = g_random->random01() * MACHINE_REBOOT_TIME;
|
||||
|
||||
TraceEvent("SimulatedMachineShutdown")
|
||||
TraceEvent("SimulatedMachineShutdown", randomId)
|
||||
.detail("Swap", swap)
|
||||
.detail("KillType", killType)
|
||||
.detail("RebootTime", rebootTime)
|
||||
|
@ -535,7 +542,7 @@ ACTOR Future<Void> simulatedMachine(
|
|||
|
||||
if( myFolders != toRebootFrom ) {
|
||||
TEST( true ); // Simulated machine swapped data folders
|
||||
TraceEvent("SimulatedMachineFolderSwap")
|
||||
TraceEvent("SimulatedMachineFolderSwap", randomId)
|
||||
.detail("OldFolder0", myFolders[0]).detail("NewFolder0", toRebootFrom[0])
|
||||
.detail("MachineIPs", toIPVectorString(ips));
|
||||
}
|
||||
|
|
|
@ -184,7 +184,7 @@ std::string filenameFromSample( KeyValueStoreType storeType, std::string folder,
|
|||
if( storeType == KeyValueStoreType::SSD_BTREE_V1 )
|
||||
return joinPath( folder, sample_filename );
|
||||
else if ( storeType == KeyValueStoreType::SSD_BTREE_V2 )
|
||||
return joinPath(folder, sample_filename);
|
||||
return joinPath(folder, sample_filename);
|
||||
else if( storeType == KeyValueStoreType::MEMORY )
|
||||
return joinPath( folder, sample_filename.substr(0, sample_filename.size() - 5) );
|
||||
|
||||
|
@ -195,7 +195,7 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std
|
|||
if( storeType == KeyValueStoreType::SSD_BTREE_V1)
|
||||
return joinPath( folder, prefix + id.toString() + ".fdb" );
|
||||
else if (storeType == KeyValueStoreType::SSD_BTREE_V2)
|
||||
return joinPath(folder, prefix + id.toString() + ".sqlite");
|
||||
return joinPath(folder, prefix + id.toString() + ".sqlite");
|
||||
else if( storeType == KeyValueStoreType::MEMORY )
|
||||
return joinPath( folder, prefix + id.toString() + "-" );
|
||||
|
||||
|
@ -355,6 +355,7 @@ void startRole(UID roleId, UID workerId, std::string as, std::map<std::string, s
|
|||
g_roles.insert({as, roleId.shortString()});
|
||||
StringMetricHandle(LiteralStringRef("Roles")) = roleString(g_roles, false);
|
||||
StringMetricHandle(LiteralStringRef("RolesWithIDs")) = roleString(g_roles, true);
|
||||
if (g_network->isSimulated()) g_simulator.addRole(g_network->getLocalAddress(), as);
|
||||
}
|
||||
|
||||
void endRole(UID id, std::string as, std::string reason, bool ok, Error e) {
|
||||
|
@ -386,6 +387,7 @@ void endRole(UID id, std::string as, std::string reason, bool ok, Error e) {
|
|||
g_roles.erase({as, id.shortString()});
|
||||
StringMetricHandle(LiteralStringRef("Roles")) = roleString(g_roles, false);
|
||||
StringMetricHandle(LiteralStringRef("RolesWithIDs")) = roleString(g_roles, true);
|
||||
if (g_network->isSimulated()) g_simulator.removeRole(g_network->getLocalAddress(), as);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface, Reference<ClusterConnectionFile> connFile, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
|
@ -621,7 +623,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
Reference<IAsyncFile> checkFile = wait( IAsyncFileSystem::filesystem()->open( joinPath(folder, validationFilename), IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0600 ) );
|
||||
Void _ = wait( checkFile->sync() );
|
||||
}
|
||||
|
||||
|
||||
if(g_network->isSimulated()) {
|
||||
TraceEvent("SimulatedReboot").detail("Deletion", rebootReq.deleteData );
|
||||
if( rebootReq.deleteData ) {
|
||||
|
@ -660,7 +662,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
std::map<std::string, std::string> details;
|
||||
details["ForMaster"] = req.recruitmentID.shortString();
|
||||
details["StorageEngine"] = req.storeType.toString();
|
||||
|
||||
|
||||
//FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor
|
||||
startRole( logId, interf.id(), "SharedTLog", details );
|
||||
|
||||
|
|
|
@ -155,6 +155,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
LocalityData targetMachine = self->machines.back();
|
||||
|
||||
TraceEvent("Assassination").detail("TargetMachine", targetMachine.toString())
|
||||
.detailext("zoneId", targetMachine.zoneId())
|
||||
.detail("Reboot", self->reboot).detail("killedMachines", killedMachines)
|
||||
.detail("machinesToKill", self->machinesToKill).detail("machinesToLeave", self->machinesToLeave)
|
||||
.detail("machines", self->machines.size()).detail("Replace", self->replacement);
|
||||
|
@ -166,8 +167,9 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
g_simulator.killMachine( targetMachine.zoneId(), ISimulator::Reboot );
|
||||
}
|
||||
} else {
|
||||
TraceEvent("WorkerKill").detail("MachineCount", self->machines.size());
|
||||
if( g_random->random01() < 0.33 ) {
|
||||
auto randomDouble = g_random->random01();
|
||||
TraceEvent("WorkerKill").detail("MachineCount", self->machines.size()).detail("RandomValue", randomDouble);
|
||||
if (randomDouble < 0.33 ) {
|
||||
TraceEvent("RebootAndDelete").detail("TargetMachine", targetMachine.toString());
|
||||
g_simulator.killMachine( targetMachine.zoneId(), ISimulator::RebootAndDelete );
|
||||
} else {
|
||||
|
|
|
@ -26,9 +26,6 @@
|
|||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/ManagementAPI.h"
|
||||
|
||||
const char* removeClearEnv = getenv("REMOVE_CLEAR");
|
||||
int removeClear = removeClearEnv ? atoi(removeClearEnv) : 1;
|
||||
|
||||
template <>
|
||||
std::string describe( uint32_t const& item ) {
|
||||
return format("%d", item);
|
||||
|
@ -154,6 +151,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
{
|
||||
std::vector<ISimulator::ProcessInfo*> processes;
|
||||
std::set<AddressExclusion> processAddrs;
|
||||
UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
// Get the list of process network addresses
|
||||
for (auto& netAddr : netAddrs) {
|
||||
|
@ -170,24 +168,64 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
// Get the list of processes matching network address
|
||||
for (auto processInfo : g_simulator.getAllProcesses()) {
|
||||
auto processNet = AddressExclusion(processInfo->address.ip, processInfo->address.port);
|
||||
if (processAddrs.find(processNet) != processAddrs.end())
|
||||
if (processAddrs.find(processNet) != processAddrs.end()) {
|
||||
processes.push_back(processInfo);
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "getProcessItem").detail("ProcessAddress", processInfo->address).detail("Process", describe(*processInfo)).detail("failed", processInfo->failed).detail("excluded", processInfo->excluded).detail("rebooting", processInfo->rebooting).detail("Protected", g_simulator.protectedAddresses.count(processInfo->address));
|
||||
}
|
||||
else {
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "getProcessNoItem").detail("ProcessAddress", processInfo->address).detail("Process", describe(*processInfo)).detail("failed", processInfo->failed).detail("excluded", processInfo->excluded).detail("rebooting", processInfo->rebooting).detail("Protected", g_simulator.protectedAddresses.count(processInfo->address));
|
||||
}
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", "getProcesses")
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "getProcesses")
|
||||
.detail("netAddrSize",netAddrs.size()).detail("processAddrSize",processAddrs.size())
|
||||
.detail("netAddrs",describe(netAddrs)).detail("processAddrs",describe(processAddrs))
|
||||
.detail("Proceses", processes.size()).detail("MachineProcesses", machineProcesses.size());
|
||||
|
||||
// Processes may have been destroyed causing
|
||||
// ASSERT(processAddrs.size() == processes.size());
|
||||
return processes;
|
||||
}
|
||||
|
||||
virtual std::vector<ISimulator::ProcessInfo*> excludeAddresses(std::set<AddressExclusion> const& procAddrs)
|
||||
{
|
||||
// Get the updated list of processes which may have changed due to reboots, deletes, etc
|
||||
std::vector<ISimulator::ProcessInfo*> procArray = getProcesses(procAddrs);
|
||||
|
||||
// Include all of the excluded machines because the first command of the next section is includeall
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude addresses").detail("AddrTotal", procAddrs.size()).detail("ProcTotal", procArray.size()).detail("Addresses", describe(procAddrs)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& procAddr : procAddrs) {
|
||||
g_simulator.excludeAddress(NetworkAddress(procAddr.ip, procAddr.port, true, false));
|
||||
}
|
||||
for (auto& procRecord : procArray) {
|
||||
procRecord->excluded = true;
|
||||
TraceEvent("RemoveAndKill").detail("Step", "ExcludeAddress").detail("ProcessAddress", procRecord->address).detail("Process", describe(*procRecord)).detail("failed", procRecord->failed).detail("rebooting", procRecord->rebooting).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
return procArray;
|
||||
}
|
||||
|
||||
virtual std::vector<ISimulator::ProcessInfo*> includeAddresses(std::set<AddressExclusion> const& procAddrs)
|
||||
{
|
||||
// Get the updated list of processes which may have changed due to reboots, deletes, etc
|
||||
std::vector<ISimulator::ProcessInfo*> procArray = getProcesses(procAddrs);
|
||||
|
||||
// Include all of the excluded machines because the first command of the next section is includeall
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include addresses").detail("AddrTotal", procAddrs.size()).detail("ProcTotal", procArray.size()).detail("Addresses", describe(procAddrs)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& procAddr : procAddrs) {
|
||||
g_simulator.includeAddress(NetworkAddress(procAddr.ip, procAddr.port, true, false));
|
||||
}
|
||||
for (auto& procRecord : procArray) {
|
||||
// Only change the exclusion member, if not failed since it will require a reboot to revive it
|
||||
if (!procRecord->failed)
|
||||
procRecord->excluded = false;
|
||||
TraceEvent("RemoveAndKill").detail("Step", "IncludeAddress").detail("ProcessAddress", procRecord->address).detail("Process", describe(*procRecord)).detail("failed", procRecord->failed).detail("rebooting", procRecord->rebooting).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
return procArray;
|
||||
}
|
||||
|
||||
virtual std::vector<ISimulator::ProcessInfo*> protectServers(std::set<AddressExclusion> const& killAddrs)
|
||||
{
|
||||
std::vector<ISimulator::ProcessInfo*> processes;
|
||||
std::set<AddressExclusion> processAddrs;
|
||||
std::vector<AddressExclusion> killableAddrs;
|
||||
std::vector<ISimulator::ProcessInfo*> killProcesses, killableProcesses, processesLeft, processesDead;
|
||||
std::vector<ISimulator::ProcessInfo*> killProcArray, killableProcesses, processesLeft, processesDead;
|
||||
|
||||
// Get the list of processes matching network address
|
||||
for (auto processInfo : getServers()) {
|
||||
|
@ -199,7 +237,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
else if (killAddrs.find(processNet) == killAddrs.end())
|
||||
processesLeft.push_back(processInfo);
|
||||
else
|
||||
killProcesses.push_back(processInfo);
|
||||
killProcArray.push_back(processInfo);
|
||||
}
|
||||
|
||||
// Identify the largest set of processes which can be killed
|
||||
|
@ -207,22 +245,22 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
bool bCanKillProcess;
|
||||
ISimulator::ProcessInfo* randomProcess;
|
||||
auto deadProcess = processesDead.back();
|
||||
for (int killsLeft = killProcesses.size(); killsLeft > 0; killsLeft --)
|
||||
for (int killsLeft = killProcArray.size(); killsLeft > 0; killsLeft --)
|
||||
{
|
||||
// Select a random kill process
|
||||
randomIndex = g_random->randomInt(0, killsLeft);
|
||||
randomProcess = killProcesses[randomIndex];
|
||||
randomProcess = killProcArray[randomIndex];
|
||||
processesDead.push_back(randomProcess);
|
||||
killProcesses[randomIndex] = killProcesses.back();
|
||||
killProcesses.pop_back();
|
||||
killProcArray[randomIndex] = killProcArray.back();
|
||||
killProcArray.pop_back();
|
||||
// Add all of the remaining processes the leftover array
|
||||
processesLeft.insert(processesLeft.end(), killProcesses.begin(), killProcesses.end());
|
||||
processesLeft.insert(processesLeft.end(), killProcArray.begin(), killProcArray.end());
|
||||
|
||||
// Check if we can kill the added process
|
||||
bCanKillProcess = g_simulator.canKillProcesses(processesLeft, processesDead, ISimulator::KillInstantly, NULL);
|
||||
|
||||
// Remove the added processes
|
||||
processesLeft.resize(processesLeft.size() - killProcesses.size());
|
||||
processesLeft.resize(processesLeft.size() - killProcArray.size());
|
||||
|
||||
if (bCanKillProcess) {
|
||||
killableProcesses.push_back(randomProcess);
|
||||
|
@ -247,94 +285,133 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
|
||||
// Removing the first set of machines might legitimately bring the database down, so a timeout is not an error
|
||||
state std::vector<NetworkAddress> firstCoordinators;
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcesses;
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcArray;
|
||||
state bool bClearedFirst;
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude first list").detail("toKill1", describe(toKill1)).detail("KillTotal", toKill1.size())
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude list first").detail("toKill", describe(toKill1)).detail("KillTotal", toKill1.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
self->excludeAddresses(toKill1);
|
||||
|
||||
killProcesses = self->getProcesses(toKill1);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "mark first processes excluded").detail("Addresses", describe(toKill1))
|
||||
.detail("AddressTotal", toKill1.size()).detail("Processes", killProcesses.size())
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = true;
|
||||
g_simulator.excludeAddress(killProcess->address);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "MarkProcessFirst").detail("Process", describe(*killProcess));
|
||||
}
|
||||
Optional<Void> result = wait( timeout( removeAndKill( self, cx, toKill1, NULL), self->kill1Timeout ) );
|
||||
|
||||
Optional<Void> result = wait( timeout( removeAndKill( self, cx, toKill1), self->kill1Timeout ) );
|
||||
bClearedFirst = result.present();
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "first exclusion result").detail("result", result.present() ? "succeeded" : "failed");
|
||||
killProcesses = self->getProcesses(toKill1);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include first processes").detail("toKill1", describe(toKill1))
|
||||
.detail("KillTotal", toKill1.size()).detail("Processes", killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
g_simulator.includeAddress(killProcess->address);
|
||||
killProcess->excluded = false;
|
||||
TraceEvent("RemoveAndKill").detail("Step", "excluded list first").detail("excluderesult", bClearedFirst ? "succeeded" : "failed").detail("KillTotal", toKill1.size()).detail("Processes", killProcArray.size()).detail("toKill1", describe(toKill1)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
bClearedFirst=false;
|
||||
// Include the servers, if unable to exclude
|
||||
if (!bClearedFirst) {
|
||||
// Get the updated list of processes which may have changed due to reboots, deletes, etc
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include all first").detail("KillTotal", toKill1.size()).detail("toKill", describe(toKill1)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
|
||||
self->includeAddresses(toKill1);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "included all first").detail("KillTotal", toKill1.size()).detail("toKill", describe(toKill1)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
|
||||
killProcesses = self->protectServers(toKill2);
|
||||
// Get the list of protected servers
|
||||
killProcArray = self->protectServers(toKill2);
|
||||
|
||||
// Update the kill networks to the killable processes
|
||||
toKill2 = self->getNetworks(killProcesses);
|
||||
toKill2 = self->getNetworks(killProcArray);
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Mark second processes excluded").detail("toKill2", describe(toKill2))
|
||||
.detail("KillTotal", toKill2.size()).detail("Processes", killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = true;
|
||||
g_simulator.excludeAddress(killProcess->address);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "MarkProcessSecond").detail("Processes", killProcesses.size()).detail("Process", describe(*killProcess));
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude list second").detail("KillTotal", toKill2.size()).detail("toKill", describe(toKill2)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
self->excludeAddresses(toKill2);
|
||||
|
||||
// The second set of machines is selected so that we can always make progress without it, even after the permitted number of other permanent failures
|
||||
// so we expect to succeed after a finite amount of time
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter( self->kill2Timeout/2, "RemoveServersSafely" );
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude second list").detail("toKill2", describe(toKill2)).detail("KillTotal", toKill2.size())
|
||||
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
|
||||
.detail("Processes", killProcArray.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2, bClearedFirst ? &toKill1 : NULL), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
|
||||
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "excluded second list").detail("KillTotal", toKill2.size()).detail("Excluded", killProcesses.size())
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill").detail("Step", "excluded second list").detail("KillTotal", toKill1.size()).detail("toKill", describe(toKill2)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
// Reinclude all of the machine, if buggified
|
||||
if (BUGGIFY) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", "final include all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
// Get the updated list of processes which may have changed due to reboots, deletes, etc
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include all second").detail("KillTotal", toKill1.size()).detail("toKill", describe(toKill2)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
|
||||
for (auto& killProcess : killProcesses) {
|
||||
g_simulator.includeAddress(killProcess->address);
|
||||
killProcess->excluded = false;
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", "final included all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
self->includeAddresses(toKill2);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "included all second").detail("KillTotal", toKill1.size()).detail("toKill", describe(toKill2)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, std::set<AddressExclusion> toKill)
|
||||
virtual std::vector<ISimulator::ProcessInfo*> killAddresses(std::set<AddressExclusion> const& killAddrs)
|
||||
{
|
||||
// First clear the exclusion list and exclude the given list
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
|
||||
TraceEvent("RemoveAndKill").detail("Step", "included all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
bool removeViaClear = !BUGGIFY;
|
||||
std::vector<ISimulator::ProcessInfo*> killProcArray;
|
||||
std::vector<AddressExclusion> toKillArray;
|
||||
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcesses;
|
||||
std::copy(killAddrs.begin(), killAddrs.end(), std::back_inserter(toKillArray));
|
||||
killProcArray = getProcesses(killAddrs);
|
||||
|
||||
// Reboot and delete or kill the servers
|
||||
if( killProcesses ) {
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", removeViaClear ? "ClearProcesses" : "IgnoreProcesses").detail("Addresses", describe(killAddrs))
|
||||
.detail("Processes", killProcArray.size()).detail("ClusterAvailable", g_simulator.isAvailable()).detail("RemoveViaClear", removeViaClear);
|
||||
for (auto& killProcess : killProcArray) {
|
||||
if (g_simulator.protectedAddresses.count(killProcess->address))
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "NoKill Process").detail("Process", describe(*killProcess)).detail("failed", killProcess->failed).detail("rebooting", killProcess->rebooting).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address));
|
||||
else if (removeViaClear) {
|
||||
g_simulator.rebootProcess( killProcess, ISimulator::RebootProcessAndDelete);
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Clear Process").detail("Process", describe(*killProcess)).detail("failed", killProcess->failed).detail("rebooting", killProcess->rebooting).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address));
|
||||
}
|
||||
/*
|
||||
else {
|
||||
g_simulator.killProcess( killProcess, ISimulator::KillInstantly );
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Kill Process").detail("Process", describe(*killProcess)).detail("failed", killProcess->failed).detail("rebooting", killProcess->rebooting).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address));
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
else {
|
||||
std::set<Optional<Standalone<StringRef>>> zoneIds;
|
||||
bool killedMachine;
|
||||
for (auto& killProcess : killProcArray) {
|
||||
zoneIds.insert(killProcess->locality.zoneId());
|
||||
}
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", removeViaClear ? "ClearMachines" : "KillMachines").detail("Addresses", describe(killAddrs)).detail("Processes", killProcArray.size()).detail("Zones", zoneIds.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& zoneId : zoneIds) {
|
||||
killedMachine = g_simulator.killMachine( zoneId, removeViaClear ? ISimulator::RebootAndDelete : ISimulator::KillInstantly, removeViaClear);
|
||||
TraceEvent(killedMachine ? SevInfo : SevWarn, "RemoveAndKill").detail("Step", removeViaClear ? "Clear Machine" : "Kill Machine").detailext("ZoneId", zoneId).detail(removeViaClear ? "Cleared" : "Killed", killedMachine).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
}
|
||||
|
||||
return killProcArray;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, std::set<AddressExclusion> toKill, std::set<AddressExclusion>* pIncAddrs)
|
||||
{
|
||||
state UID functionId = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
// First clear the exclusion list and exclude the given list
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "include all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "included all").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
// Reinclude the addresses that were excluded, if present
|
||||
if (pIncAddrs) {
|
||||
self->includeAddresses(*pIncAddrs);
|
||||
}
|
||||
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcArray;
|
||||
state std::vector<AddressExclusion> toKillArray;
|
||||
|
||||
std::copy(toKill.begin(), toKill.end(), std::back_inserter(toKillArray));
|
||||
killProcesses = self->getProcesses(toKill);
|
||||
killProcArray = self->getProcesses(toKill);
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Activate Server Exclusion").detail("toKill", describe(toKill)).detail("Addresses", describe(toKillArray)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Activate Server Exclusion").detail("KillAddrs", toKill.size()).detail("KillProcs", killProcArray.size()).detail("MissingProcs", toKill.size()!=killProcArray.size()).detail("toKill", describe(toKill)).detail("Addresses", describe(toKillArray)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( excludeServers( cx, toKillArray ) );
|
||||
|
||||
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough servers left
|
||||
// alive to do a coordinators auto (?)
|
||||
if (toKill.size()) {
|
||||
// Wait for removal to be safe
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Wait For Server Exclusion").detail("Addresses", describe(toKill)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Wait For Server Exclusion").detail("Addresses", describe(toKill)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( waitForExcludedServers( cx, toKillArray ) );
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "coordinators auto").detail("desiredCoordinators", g_simulator.desiredCoordinators).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "coordinators auto").detail("desiredCoordinators", g_simulator.desiredCoordinators).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
// Setup the coordinators BEFORE the exclusion
|
||||
// Otherwise, we may end up with NotEnoughMachinesForCoordinators
|
||||
|
@ -349,38 +426,14 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
break;
|
||||
}
|
||||
|
||||
// Reboot and delete or kill the servers
|
||||
if( self->killProcesses ) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "ClearProcesses" : "KillProcesses").detail("Addresses", describe(toKill))
|
||||
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "Clear Process" : "Kill Process").detail("Process", describe(*killProcess)).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address));
|
||||
// ASSERT(g_simulator.protectedAddresses.count(killProcess->address) == 0);
|
||||
if (removeClear)
|
||||
g_simulator.rebootProcess( killProcess, ISimulator::RebootProcessAndDelete);
|
||||
else
|
||||
g_simulator.killProcess( killProcess, ISimulator::KillInstantly );
|
||||
}
|
||||
}
|
||||
else {
|
||||
std::set<Optional<Standalone<StringRef>>> zoneIds;
|
||||
bool killedMachine;
|
||||
for (auto& killProcess : killProcesses) {
|
||||
zoneIds.insert(killProcess->locality.zoneId());
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "ClearMachines" : "KillMachines").detail("Addresses", describe(toKill)).detail("Processes", killProcesses.size()).detail("Zones", zoneIds.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& zoneId : zoneIds) {
|
||||
killedMachine = g_simulator.killMachine( zoneId, removeClear ? ISimulator::RebootAndDelete : ISimulator::KillInstantly, removeClear ? true : false );
|
||||
TraceEvent(killedMachine ? SevInfo : SevWarn, "RemoveAndKill").detail("Step", removeClear ? "Clear Machine" : "Kill Machine").detailext("ZoneId", zoneId).detail(removeClear ? "Cleared" : "Killed", killedMachine).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
}
|
||||
self->killAddresses(toKill);
|
||||
}
|
||||
else
|
||||
{
|
||||
TraceEvent("RemoveAndKill").detail("Step", "nothing to clear").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "nothing to clear").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "done").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "done").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue