Misc fixes.
This commit is contained in:
parent
3b34423248
commit
a8baeb75d0
|
@ -34,8 +34,11 @@ private:
|
|||
|
||||
public:
|
||||
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file) {
|
||||
// We onlyl allow chaod events on storage files
|
||||
enabled = StringRef(file->getFilename()).startsWith(LiteralStringRef("storage-"));
|
||||
// We only allow chaos events on storage files
|
||||
enabled = (file->getFilename().find("storage-") != std::string::npos);
|
||||
//enabled = StringRef(file->getFilename()).startsWith(LiteralStringRef("storage-"));
|
||||
|
||||
TraceEvent("AsyncFileChaos").detail("Enabled", enabled).detail("FileName", file->getFilename());
|
||||
}
|
||||
|
||||
void addref() override { ReferenceCounted<AsyncFileChaos>::addref(); }
|
||||
|
|
|
@ -43,11 +43,16 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
double periodicBroadcastInterval;
|
||||
std::vector<NetworkAddress> chosenWorkers;
|
||||
std::vector<Future<Void>> clients;
|
||||
// Verification Mode: We run the workload indefinitely in this mode.
|
||||
// The idea is to keep going until we get a non-zero chaosMetric to ensure
|
||||
// that we haven't lost the chaos event. testDuration is ignored in this mode
|
||||
bool verificationMode;
|
||||
|
||||
DiskFailureInjectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
enabled = !clientId; // only do this on the "first" client
|
||||
startDelay = getOption(options, LiteralStringRef("startDelay"), 0.0);
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0);
|
||||
verificationMode = getOption(options, LiteralStringRef("verificationMode"), false);
|
||||
throttleDisk = getOption(options, LiteralStringRef("throttleDisk"), false);
|
||||
workersToThrottle = getOption(options, LiteralStringRef("workersToThrottle"), 3);
|
||||
stallInterval = getOption(options, LiteralStringRef("stallInterval"), 0.0);
|
||||
|
@ -69,12 +74,18 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
// Starts the workload by -
|
||||
// 1. Starting the actor to periodically check chaosMetrics, and
|
||||
// 1. Starting the actor to periodically check chaosMetrics and re-broadcast chaos events, and
|
||||
// 2. Starting the actor that injects failures on chosen storage servers
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (enabled) {
|
||||
clients.push_back(diskFailureInjectionClient<WorkerInterface>(cx, this));
|
||||
clients.push_back(periodicEventBroadcast(this));
|
||||
// In verification mode, we want to wait until the first actor returns which indicates that
|
||||
// a non-zero chaosMetric was found
|
||||
if (verificationMode) {
|
||||
return waitForAny(clients);
|
||||
}
|
||||
// Else we honor testDuration
|
||||
return timeout(waitForAll(clients), testDuration, Void());
|
||||
} else
|
||||
return Void();
|
||||
|
@ -197,7 +208,8 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Fetches chaosMetrics and verifies that chaos events are happening for enabled workers
|
||||
ACTOR static Future<Void> chaosGetStatus(DiskFailureInjectionWorkload* self) {
|
||||
ACTOR static Future<int> chaosGetStatus(DiskFailureInjectionWorkload* self) {
|
||||
state int foundChaosMetrics = 0;
|
||||
std::vector<WorkerDetails> workers =
|
||||
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
|
||||
|
||||
|
@ -209,7 +221,6 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
|
||||
// Check if any of the chosen workers for chaos events have non-zero chaosMetrics
|
||||
try {
|
||||
int foundChaosMetrics = 0;
|
||||
for (auto& workerAddress : self->chosenWorkers) {
|
||||
auto chaosMetrics = cMetrics.find(workerAddress);
|
||||
if (chaosMetrics != cMetrics.end()) {
|
||||
|
@ -230,10 +241,6 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (foundChaosMetrics == 0)
|
||||
TraceEvent("DiskFailureInjectionFailed").detail("ChaosMetricCount", foundChaosMetrics);
|
||||
else
|
||||
TraceEvent("ChaosGetStatus").detail("ChaosMetricCount", foundChaosMetrics);
|
||||
} catch (Error& e) {
|
||||
// it's possible to get an empty event, it's okay to ignore
|
||||
if (e.code() != error_code_attribute_not_found) {
|
||||
|
@ -241,7 +248,7 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
return foundChaosMetrics;
|
||||
}
|
||||
|
||||
// Periodically re-send the chaos event in case of a process restart
|
||||
|
@ -253,7 +260,11 @@ struct DiskFailureInjectionWorkload : TestWorkload {
|
|||
wait(reSendChaos(self));
|
||||
elapsed += self->periodicBroadcastInterval;
|
||||
wait(delayUntil(start + elapsed));
|
||||
wait(chaosGetStatus(self));
|
||||
int foundChaosMetrics = wait(chaosGetStatus(self));
|
||||
if (foundChaosMetrics > 0) {
|
||||
TraceEvent("FoundChaos").detail("ChaosMetricCount", foundChaosMetrics);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -33,10 +33,14 @@ struct TargetedKillWorkload : TestWorkload {
|
|||
std::string machineToKill;
|
||||
bool enabled, killAllMachineProcesses;
|
||||
double killAt;
|
||||
bool reboot;
|
||||
double suspendDuration;
|
||||
|
||||
TargetedKillWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
enabled = !clientId; // only do this on the "first" client
|
||||
killAt = getOption(options, LiteralStringRef("killAt"), 5.0);
|
||||
reboot = getOption(options, LiteralStringRef("reboot"), false);
|
||||
suspendDuration = getOption(options, LiteralStringRef("suspendDuration"), 1.0);
|
||||
machineToKill = getOption(options, LiteralStringRef("machineToKill"), LiteralStringRef("master")).toString();
|
||||
killAllMachineProcesses = getOption(options, LiteralStringRef("killWholeMachine"), false);
|
||||
}
|
||||
|
@ -61,13 +65,19 @@ struct TargetedKillWorkload : TestWorkload {
|
|||
state vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
|
||||
|
||||
int killed = 0;
|
||||
state RebootRequest rbReq;
|
||||
if (self->reboot) {
|
||||
rbReq.waitForDuration = self->suspendDuration;
|
||||
} else {
|
||||
rbReq.waitForDuration = std::numeric_limits<uint32_t>::max();
|
||||
}
|
||||
for (int i = 0; i < workers.size(); i++) {
|
||||
if (workers[i].interf.master.getEndpoint().getPrimaryAddress() == address ||
|
||||
(self->killAllMachineProcesses &&
|
||||
workers[i].interf.master.getEndpoint().getPrimaryAddress().ip == address.ip &&
|
||||
workers[i].processClass != ProcessClass::TesterClass)) {
|
||||
TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].interf.id());
|
||||
workers[i].interf.clientInterface.reboot.send(RebootRequest());
|
||||
workers[i].interf.clientInterface.reboot.send(rbReq);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -715,6 +715,7 @@ struct DiskFailureInjector {
|
|||
throttlePeriod = throttleFor;
|
||||
throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor);
|
||||
TraceEvent("SetDiskFailure")
|
||||
.detail("Now", g_network->now())
|
||||
.detail("StallInterval", interval)
|
||||
.detail("StallPeriod", stallFor)
|
||||
.detail("StallUntil", stallUntil)
|
||||
|
|
|
@ -10,12 +10,13 @@ testTitle = 'DiskFailureCycle'
|
|||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
transactionsPerSecond = 2500.0
|
||||
testDuration = 60.0
|
||||
testDuration = 600.0
|
||||
expectedRate = 0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'DiskFailureInjection'
|
||||
testDuration = 20.0
|
||||
verificationMode = true
|
||||
startDelay = 20.0
|
||||
throttleDisk = true
|
||||
stallInterval = 10.0
|
||||
|
@ -25,6 +26,7 @@ testTitle = 'DiskFailureCycle'
|
|||
[[test.workload]]
|
||||
testName = 'DiskFailureInjection'
|
||||
testDuration = 20.0
|
||||
verificationMode = true
|
||||
startDelay = 40.0
|
||||
corruptFile = true
|
||||
percentBitFlips = 10
|
||||
|
|
Loading…
Reference in New Issue