Bunch of changes based on review comments and discussions.

This commit is contained in:
negoyal 2021-07-30 01:32:43 -07:00
parent 4b87716475
commit 9e7197faba
16 changed files with 199 additions and 392 deletions

View File

@ -30,16 +30,22 @@
class AsyncFileChaos final : public IAsyncFile, public ReferenceCounted<AsyncFileChaos> {
private:
Reference<IAsyncFile> file;
Arena arena;
bool enabled;
public:
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file) {}
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file) {
// We onlyl allow chaod events on storage files
enabled = StringRef(file->getFilename()).startsWith(LiteralStringRef("storage-"));
}
void addref() override { ReferenceCounted<AsyncFileChaos>::addref(); }
void delref() override { ReferenceCounted<AsyncFileChaos>::delref(); }
static double getDelay() {
double getDelay() const {
double delayFor = 0.0;
if (!enabled)
return delayFor;
auto res = g_network->global(INetwork::enDiskFailureInjector);
if (res) {
DiskFailureInjector* delayInjector = static_cast<DiskFailureInjector*>(res);
@ -60,6 +66,9 @@ public:
Future<int> read(void* data, int length, int64_t offset) override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->read(data, length, offset);
// Wait for diskDelay before submitting the I/O
// Template types are being provided explicitly because they can't be automatically deduced for some reason.
return mapAsync<Void, std::function<Future<int>(Void)>, int>(
@ -67,18 +76,19 @@ public:
}
Future<Void> write(void const* data, int length, int64_t offset) override {
Arena arena;
char* pdata = nullptr;
// Check if a bit flip event was injected, if so, copy the buffer contents
// with a random bit flipped in a new buffer and use that for the write
auto res = g_network->global(INetwork::enBitFlipper);
if (res) {
if (enabled && res) {
auto bitFlipPercentage = static_cast<BitFlipper*>(res)->getBitFlipPercentage();
if (bitFlipPercentage > 0.0) {
pdata = (char*)arena.allocate4kAlignedBuffer(length);
memcpy(pdata, data, length);
if (deterministicRandom()->random01() < bitFlipPercentage) {
// copy buffer with a flipped bit
pdata = (char*)arena.allocate4kAlignedBuffer(length);
memcpy(pdata, data, length);
// flip a random bit in the copied buffer
pdata[deterministicRandom()->randomInt(0, length)] ^= (1 << deterministicRandom()->randomInt(0, 8));
// increment the metric for bit flips
@ -92,6 +102,13 @@ public:
}
double diskDelay = getDelay();
if (diskDelay == 0.0) {
if (pdata)
return holdWhile(arena, file->write(pdata, length, offset));
return file->write(data, length, offset);
}
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(delay(diskDelay), [=](Void _) -> Future<Void> {
if (pdata)
@ -103,6 +120,9 @@ public:
Future<Void> truncate(int64_t size) override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->truncate(size);
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [=](Void _) -> Future<Void> { return file->truncate(size); });
@ -110,6 +130,9 @@ public:
Future<Void> sync() override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->sync();
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [=](Void _) -> Future<Void> { return file->sync(); });
@ -117,6 +140,9 @@ public:
Future<int64_t> size() const override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->size();
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<int64_t>(Void)>, int64_t>(
delay(diskDelay), [=](Void _) -> Future<int64_t> { return file->size(); });

View File

@ -410,6 +410,7 @@ public:
std::vector<Optional<Standalone<StringRef>>> primarySatelliteDcIds;
std::vector<Optional<Standalone<StringRef>>> remoteSatelliteDcIds;
TSSMode tssMode;
std::map<NetworkAddress, bool> corruptWorkerMap;
// Used by workloads that perform reconfigurations
int testerCount;
@ -440,6 +441,13 @@ public:
static thread_local ProcessInfo* currentProcess;
bool checkInjectedCorruption() {
auto iter = corruptWorkerMap.find(currentProcess->address);
if (iter != corruptWorkerMap.end())
return iter->second;
return false;
}
protected:
Mutex mutex;

View File

@ -151,7 +151,6 @@ set(FDBSERVER_SRCS
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
workloads/BackupToDBUpgrade.actor.cpp
workloads/BitFlipping.actor.cpp
workloads/BlobStoreWorkload.h
workloads/BulkLoad.actor.cpp
workloads/BulkSetup.actor.h
@ -173,7 +172,7 @@ set(FDBSERVER_SRCS
workloads/DDMetricsExclude.actor.cpp
workloads/DiskDurability.actor.cpp
workloads/DiskDurabilityTest.actor.cpp
workloads/DiskThrottling.actor.cpp
workloads/DiskFailureInjection.actor.cpp
workloads/Downgrade.actor.cpp
workloads/DummyWorkload.actor.cpp
workloads/ExternalWorkload.actor.cpp

View File

@ -95,7 +95,6 @@ extern int limitReasonEnd;
extern const char* limitReasonName[];
extern const char* limitReasonDesc[];
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
typedef std::map<std::string, TraceEventFields> EventMap;
ACTOR static Future<Optional<TraceEventFields>> latestEventOnWorker(WorkerInterface worker, std::string eventName) {
@ -115,7 +114,7 @@ ACTOR static Future<Optional<TraceEventFields>> latestEventOnWorker(WorkerInterf
}
}
ACTOR static Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
ACTOR Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> workers,
std::string eventName) {
try {

View File

@ -46,4 +46,8 @@ Future<StatusReply> clusterGetStatus(
Version const& datacenterVersionDifference,
ConfigBroadcaster const* const& conifgBroadcaster);
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> const& workers,
std::string const& eventName);
#endif

View File

@ -22,6 +22,7 @@
#include <string>
#include <map>
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/simulator.h"
/*
** When using this VFS, the sqlite3_file* handles that SQLite uses are
@ -71,7 +72,7 @@ struct VFSAsyncFile {
.detail("Found", e)
.detail("ErrorCode", (int64_t)g_network->global(INetwork::enSQLiteInjectedError))
.backtrace();
return e;
return e || (g_network->isSimulated() && g_simulator.checkInjectedCorruption());
}
uint32_t* const pLockCount; // +1 for each SHARED_LOCK, or 1+X_COUNT for lock level X

View File

@ -26,6 +26,7 @@
#include <limits>
#include <random>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/IPager.h"
#include "fdbclient/Tuple.h"
#include "flow/serialize.h"
@ -2727,6 +2728,8 @@ public:
debug_printf(
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
Error e = checksum_failed();
if (g_network->isSimulated() && g_simulator.checkInjectedCorruption())
e = e.asInjectedFault();
TraceEvent(SevError, "RedwoodChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)

View File

@ -613,7 +613,6 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
}
}
for (const auto& grvProxy : dbi.client.grvProxies) {
if (grvProxy.addresses().contains(address)) {
return true;
@ -687,13 +686,15 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
// Last, tests that proxies included in the ClientDbInfo are considered as local.
NetworkAddress grvProxyAddress(IPAddress(0x26262626), 1);
GrvProxyInterface grvProxyInterf;
grvProxyInterf.getConsistentReadVersion = RequestStream<struct GetReadVersionRequest>(Endpoint({ grvProxyAddress }, UID(1, 2)));
grvProxyInterf.getConsistentReadVersion =
RequestStream<struct GetReadVersionRequest>(Endpoint({ grvProxyAddress }, UID(1, 2)));
testDbInfo.client.grvProxies.push_back(grvProxyInterf);
ASSERT(addressInDbAndPrimaryDc(grvProxyAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
NetworkAddress commitProxyAddress(IPAddress(0x37373737), 1);
CommitProxyInterface commitProxyInterf;
commitProxyInterf.commit = RequestStream<struct CommitTransactionRequest>(Endpoint({ commitProxyAddress }, UID(1, 2)));
commitProxyInterf.commit =
RequestStream<struct CommitTransactionRequest>(Endpoint({ commitProxyAddress }, UID(1, 2)));
testDbInfo.client.commitProxies.push_back(commitProxyInterf);
ASSERT(addressInDbAndPrimaryDc(commitProxyAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
@ -1204,8 +1205,7 @@ ACTOR Future<Void> chaosMetricsLogger() {
wait(delay(FLOW_KNOBS->CHAOS_LOGGING_INTERVAL));
TraceEvent e("ChaosMetrics");
// double elapsed = now() - chaosMetrics->startTime;
double elapsed = timer_monotonic() - chaosMetrics->startTime;
double elapsed = now() - chaosMetrics->startTime;
e.detail("Elapsed", elapsed);
chaosMetrics->getFields(&e);
e.trackLatest("ChaosMetrics");

View File

@ -1,229 +0,0 @@
/*
* BitFlipping.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/QuietDatabase.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct BitFlippingWorkload : TestWorkload {
bool enabled;
double testDuration;
double percentBitFlips;
double periodicCheckInterval;
std::vector<NetworkAddress> chosenWorkers;
std::vector<Future<Void>> clients;
BitFlippingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
percentBitFlips = getOption(options, LiteralStringRef("percentBitFlips"), 10.0);
periodicCheckInterval = getOption(options, LiteralStringRef("periodicCheckInterval"), 10.0);
}
std::string description() const override {
if (&g_simulator == g_network)
return "BitFlipping";
else
return "NoSimBitFlipping";
}
Future<Void> setup(Database const& cx) override { return Void(); }
// Starts the workload by -
// 1. Starting the actor to periodically check chaosMetrics, and
// 2. Starting the actor that injects failures on chosen storage servers
Future<Void> start(Database const& cx) override {
if (enabled) {
clients.push_back(periodicMetricCheck(this));
clients.push_back(flipBitsClient<WorkerInterface>(cx, this));
return timeout(waitForAll(clients), testDuration, Void());
} else
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(vector<PerfMetric>& m) override {}
static void checkBitFlipResult(Future<Void> res, WorkerInterface worker) {
if (res.isError()) {
auto err = res.getError();
if (err.code() == error_code_client_invalid_operation) {
TraceEvent(SevError, "ChaosDisabled")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString());
} else {
TraceEvent(SevError, "BitFlippingFailed")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString())
.error(err);
}
}
}
ACTOR void doBitFlips(WorkerInterface worker, double percentage, double startDelay = 0.0) {
state Future<Void> res;
wait(::delay(startDelay));
SetFailureInjection::FlipBitsCommand flipBits;
flipBits.percentBitFlips = percentage;
SetFailureInjection req;
req.flipBits = flipBits;
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkBitFlipResult(res, worker);
}
ACTOR static Future<Void> getAllStorageWorkers(Database cx,
BitFlippingWorkload* self,
std::vector<WorkerInterface>* result) {
result->clear();
state std::vector<WorkerInterface> res = wait(getStorageWorkers(cx, self->dbInfo, false));
for (auto& worker : res) {
result->emplace_back(worker);
}
return Void();
}
ACTOR template <class W>
Future<Void> flipBitsClient(Database cx, BitFlippingWorkload* self) {
state double lastTime = now();
state double workloadEnd = now() + self->testDuration;
state std::vector<W> machines;
loop {
wait(poisson(&lastTime, 1));
wait(BitFlippingWorkload::getAllStorageWorkers(cx, self, &machines));
auto machine = deterministicRandom()->randomChoice(machines);
// If we have already chosen this worker, then just continue
if (find(self->chosenWorkers.begin(), self->chosenWorkers.end(), machine.address()) !=
self->chosenWorkers.end())
continue;
// Keep track of chosen workers for verification purpose
self->chosenWorkers.emplace_back(machine.address());
self->doBitFlips(machine, self->percentBitFlips);
}
}
// Resend the chaos event to previosuly chosen workers, in case some workers got restarted and lost their chaos
// config
ACTOR static Future<Void> reSendChaos(BitFlippingWorkload* self) {
std::vector<WorkerDetails> workers =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
std::map<NetworkAddress, WorkerInterface> workersMap;
for (auto worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
for (auto& workerAddress : self->chosenWorkers) {
auto itr = workersMap.find(workerAddress);
if (itr != workersMap.end())
self->doBitFlips(itr->second, self->percentBitFlips);
}
return Void();
}
// For fetching chaosMetrics to ensure chaos events are happening
// This is borrowed code from Status.actor.cpp
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
ACTOR static Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> workers,
std::string eventName) {
try {
state vector<Future<ErrorOr<TraceEventFields>>> eventTraces;
for (int c = 0; c < workers.size(); c++) {
EventLogRequest req =
eventName.size() > 0 ? EventLogRequest(Standalone<StringRef>(eventName)) : EventLogRequest();
eventTraces.push_back(errorOr(timeoutError(workers[c].interf.eventLogRequest.getReply(req), 2.0)));
}
wait(waitForAll(eventTraces));
std::set<std::string> failed;
WorkerEvents results;
for (int i = 0; i < eventTraces.size(); i++) {
const ErrorOr<TraceEventFields>& v = eventTraces[i].get();
if (v.isError()) {
failed.insert(workers[i].interf.address().toString());
results[workers[i].interf.address()] = TraceEventFields();
} else {
results[workers[i].interf.address()] = v.get();
}
}
std::pair<WorkerEvents, std::set<std::string>> val;
val.first = results;
val.second = failed;
return val;
} catch (Error& e) {
ASSERT(e.code() ==
error_code_actor_cancelled); // All errors should be filtering through the errorOr actor above
throw;
}
}
// Fetches chaosMetrics and verifies that chaos events are happening for enabled workers
ACTOR static Future<Void> chaosGetStatus(BitFlippingWorkload* self) {
std::vector<WorkerDetails> workers =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventsFuture;
latestEventsFuture = latestEventOnWorkers(workers, "ChaosMetrics");
state Optional<std::pair<WorkerEvents, std::set<std::string>>> workerEvents = wait(latestEventsFuture);
state WorkerEvents cMetrics = workerEvents.present() ? workerEvents.get().first : WorkerEvents();
// Now verify that all chosen workers for chaos events have non-zero chaosMetrics
for (auto& workerAddress : self->chosenWorkers) {
auto chaosMetrics = cMetrics.find(workerAddress);
if (chaosMetrics != cMetrics.end()) {
int bitFlips = chaosMetrics->second.getInt("BitFlips");
// we expect bitFlips to be non-zero for chosenWorkers
if (bitFlips == 0) {
TraceEvent(SevError, "ChaosGetStatus")
.detail("OnEndpoint", workerAddress.toString())
.detail("BitFlips", bitFlips);
}
}
}
return Void();
}
// Periodically fetches chaosMetrics to ensure that chaas events are taking place
ACTOR static Future<Void> periodicMetricCheck(BitFlippingWorkload* self) {
state double start = now();
state double elapsed = 0.0;
loop {
// re-send the chaos event in case of a process restart
wait(reSendChaos(self));
elapsed += self->periodicCheckInterval;
wait(delayUntil(start + elapsed));
wait(chaosGetStatus(self));
}
}
};
WorkloadFactory<BitFlippingWorkload> BitFlippingWorkloadFactory("BitFlipping");

View File

@ -1,5 +1,5 @@
/*
* DiskThrottling.actor.cpp
* DiskFailureInjection.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -25,34 +25,45 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/Status.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct DiskThrottlingWorkload : TestWorkload {
struct DiskFailureInjectionWorkload : TestWorkload {
bool enabled;
double testDuration;
double startDelay;
bool throttleDisk;
int workersToThrottle;
double stallInterval;
double stallPeriod;
double throttlePeriod;
double periodicCheckInterval;
bool corruptFile;
int workersToCorrupt;
double percentBitFlips;
double periodicBroadcastInterval;
std::vector<NetworkAddress> chosenWorkers;
std::vector<Future<Void>> clients;
DiskThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
DiskFailureInjectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
startDelay = getOption(options, LiteralStringRef("startDelay"), 0.0);
testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0);
throttleDisk = getOption(options, LiteralStringRef("throttleDisk"), false);
workersToThrottle = getOption(options, LiteralStringRef("workersToThrottle"), 3);
stallInterval = getOption(options, LiteralStringRef("stallInterval"), 0.0);
stallPeriod = getOption(options, LiteralStringRef("stallPeriod"), 60.0);
throttlePeriod = getOption(options, LiteralStringRef("throttlePeriod"), 60.0);
periodicCheckInterval = getOption(options, LiteralStringRef("periodicCheckInterval"), 10.0);
corruptFile = getOption(options, LiteralStringRef("corruptFile"), false);
workersToCorrupt = getOption(options, LiteralStringRef("workersToCorrupt"), 1);
percentBitFlips = getOption(options, LiteralStringRef("percentBitFlips"), 10.0);
periodicBroadcastInterval = getOption(options, LiteralStringRef("periodicBroadcastInterval"), 5.0);
}
std::string description() const override {
if (&g_simulator == g_network)
return "DiskThrottling";
return "DiskFailureInjection";
else
return "NoSimDiskThrolling";
return "NoSimDiskFailureInjection";
}
Future<Void> setup(Database const& cx) override { return Void(); }
@ -62,8 +73,8 @@ struct DiskThrottlingWorkload : TestWorkload {
// 2. Starting the actor that injects failures on chosen storage servers
Future<Void> start(Database const& cx) override {
if (enabled) {
clients.push_back(periodicMetricCheck(this));
clients.push_back(throttleDiskClient<WorkerInterface>(cx, this));
clients.push_back(diskFailureInjectionClient<WorkerInterface>(cx, this));
clients.push_back(periodicEventBroadcast(this));
return timeout(waitForAll(clients), testDuration, Void());
} else
return Void();
@ -73,26 +84,26 @@ struct DiskThrottlingWorkload : TestWorkload {
void getMetrics(vector<PerfMetric>& m) override {}
static void checkDiskThrottleResult(Future<Void> res, WorkerInterface worker) {
static void checkDiskFailureInjectionResult(Future<Void> res, WorkerInterface worker) {
if (res.isError()) {
auto err = res.getError();
if (err.code() == error_code_client_invalid_operation) {
TraceEvent(SevError, "ChaosDisabled")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString());
} else {
TraceEvent(SevError, "DiskThrottlingFailed")
TraceEvent(SevError, "DiskFailureInjectionFailed")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString())
.error(err);
}
}
}
// Sets the disk failure request
ACTOR void doThrottle(WorkerInterface worker,
double stallInterval,
double stallPeriod,
double throttlePeriod,
double startDelay) {
// Sets the disk delay request
ACTOR void injectDiskDelays(WorkerInterface worker,
double stallInterval,
double stallPeriod,
double throttlePeriod,
double startDelay) {
state Future<Void> res;
wait(::delay(startDelay));
SetFailureInjection::DiskFailureCommand diskFailure;
@ -103,39 +114,34 @@ struct DiskThrottlingWorkload : TestWorkload {
req.diskFailure = diskFailure;
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkDiskThrottleResult(res, worker);
checkDiskFailureInjectionResult(res, worker);
}
// Currently unused, because we only inject disk failures on storage servers
ACTOR static Future<Void> getAllWorkers(DiskThrottlingWorkload* self, std::vector<WorkerInterface>* result) {
result->clear();
std::vector<WorkerDetails> res =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
for (auto& worker : res) {
result->emplace_back(worker.interf);
}
return Void();
// Sets the disk corruption request
ACTOR void injectBitFlips(WorkerInterface worker, double percentage, double startDelay = 0.0) {
state Future<Void> res;
wait(::delay(startDelay));
SetFailureInjection::FlipBitsCommand flipBits;
flipBits.percentBitFlips = percentage;
SetFailureInjection req;
req.flipBits = flipBits;
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkDiskFailureInjectionResult(res, worker);
}
ACTOR static Future<Void> getAllStorageWorkers(Database cx,
DiskThrottlingWorkload* self,
std::vector<WorkerInterface>* result) {
result->clear();
state std::vector<WorkerInterface> res = wait(getStorageWorkers(cx, self->dbInfo, false));
for (auto& worker : res) {
result->emplace_back(worker);
}
return Void();
}
// Choose random storage servers to inject disk failures
// Choose random storage servers to inject disk failures.
// We currently only inject disk failure on storage servers. Can be expanded to include
// other worker types in future
ACTOR template <class W>
Future<Void> throttleDiskClient(Database cx, DiskThrottlingWorkload* self) {
Future<Void> diskFailureInjectionClient(Database cx, DiskFailureInjectionWorkload* self) {
state double lastTime = now();
state std::vector<W> machines;
state int throttledWorkers = 0;
state int corruptedWorkers = 0;
loop {
wait(poisson(&lastTime, 1));
wait(DiskThrottlingWorkload::getAllStorageWorkers(cx, self, &machines));
wait(store(machines, getStorageWorkers(cx, self->dbInfo, false)));
auto machine = deterministicRandom()->randomChoice(machines);
// If we have already chosen this worker, then just continue
@ -145,13 +151,22 @@ struct DiskThrottlingWorkload : TestWorkload {
// Keep track of chosen workers for verification purpose
self->chosenWorkers.emplace_back(machine.address());
self->doThrottle(machine, self->stallInterval, self->stallPeriod, self->throttlePeriod, self->startDelay);
if (self->throttleDisk && (throttledWorkers++ < self->workersToThrottle))
self->injectDiskDelays(
machine, self->stallInterval, self->stallPeriod, self->throttlePeriod, self->startDelay);
if (self->corruptFile && (corruptedWorkers++ < self->workersToCorrupt)) {
if (&g_simulator == g_network)
g_simulator.corruptWorkerMap[machine.address()] = true;
self->injectBitFlips(machine, self->percentBitFlips);
}
}
}
// Resend the chaos event to previosuly chosen workers, in case some workers got restarted and lost their chaos
// config
ACTOR static Future<Void> reSendChaos(DiskThrottlingWorkload* self) {
ACTOR static Future<Void> reSendChaos(DiskFailureInjectionWorkload* self) {
state int throttledWorkers = 0;
state int corruptedWorkers = 0;
std::vector<WorkerDetails> workers =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
std::map<NetworkAddress, WorkerInterface> workersMap;
@ -160,57 +175,22 @@ struct DiskThrottlingWorkload : TestWorkload {
}
for (auto& workerAddress : self->chosenWorkers) {
auto itr = workersMap.find(workerAddress);
if (itr != workersMap.end())
self->doThrottle(
itr->second, self->stallInterval, self->stallPeriod, self->throttlePeriod, self->startDelay);
if (itr != workersMap.end()) {
if (self->throttleDisk && (throttledWorkers++ < self->workersToThrottle))
self->injectDiskDelays(
itr->second, self->stallInterval, self->stallPeriod, self->throttlePeriod, self->startDelay);
if (self->corruptFile && (corruptedWorkers++ < self->workersToCorrupt)) {
if (&g_simulator == g_network)
g_simulator.corruptWorkerMap[workerAddress] = true;
self->injectBitFlips(itr->second, self->percentBitFlips);
}
}
}
return Void();
}
// For fetching chaosMetrics to ensure chaos events are happening
// This is borrowed code from Status.actor.cpp
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
ACTOR static Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> workers,
std::string eventName) {
try {
state vector<Future<ErrorOr<TraceEventFields>>> eventTraces;
for (int c = 0; c < workers.size(); c++) {
EventLogRequest req =
eventName.size() > 0 ? EventLogRequest(Standalone<StringRef>(eventName)) : EventLogRequest();
eventTraces.push_back(errorOr(timeoutError(workers[c].interf.eventLogRequest.getReply(req), 2.0)));
}
wait(waitForAll(eventTraces));
std::set<std::string> failed;
WorkerEvents results;
for (int i = 0; i < eventTraces.size(); i++) {
const ErrorOr<TraceEventFields>& v = eventTraces[i].get();
if (v.isError()) {
failed.insert(workers[i].interf.address().toString());
results[workers[i].interf.address()] = TraceEventFields();
} else {
results[workers[i].interf.address()] = v.get();
}
}
std::pair<WorkerEvents, std::set<std::string>> val;
val.first = results;
val.second = failed;
return val;
} catch (Error& e) {
ASSERT(e.code() ==
error_code_actor_cancelled); // All errors should be filtering through the errorOr actor above
throw;
}
}
// Fetches chaosMetrics and verifies that chaos events are happening for enabled workers
ACTOR static Future<Void> chaosGetStatus(DiskThrottlingWorkload* self) {
ACTOR static Future<Void> chaosGetStatus(DiskFailureInjectionWorkload* self) {
std::vector<WorkerDetails> workers =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest{}));
@ -220,38 +200,54 @@ struct DiskThrottlingWorkload : TestWorkload {
state WorkerEvents cMetrics = workerEvents.present() ? workerEvents.get().first : WorkerEvents();
// Now verify that all chosen workers for chaos events have non-zero chaosMetrics
std::vector<Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>>> futures;
// Check if any of the chosen workers for chaos events have non-zero chaosMetrics
try {
int foundChaosMetrics = 0;
for (auto& workerAddress : self->chosenWorkers) {
auto chaosMetrics = cMetrics.find(workerAddress);
if (chaosMetrics != cMetrics.end()) {
// we expect diskDelays to be non-zero for chosenWorkers for throttleDisk event
if (self->throttleDisk) {
int diskDelays = chaosMetrics->second.getInt("DiskDelays");
if (diskDelays > 0) {
foundChaosMetrics++;
}
}
for (auto& workerAddress : self->chosenWorkers) {
auto chaosMetrics = cMetrics.find(workerAddress);
if (chaosMetrics != cMetrics.end()) {
int diskDelays = chaosMetrics->second.getInt("DiskDelays");
// we expect diskDelays to be non-zero for chosenWorkers
if (diskDelays == 0) {
TraceEvent(SevError, "ChaosGetStatus")
.detail("OnEndpoint", workerAddress.toString())
.detail("DiskDelays", diskDelays);
// we expect bitFlips to be non-zero for chosenWorkers for corruptFile event
if (self->corruptFile) {
int bitFlips = chaosMetrics->second.getInt("BitFlips");
if (bitFlips > 0) {
foundChaosMetrics++;
}
}
}
}
if (foundChaosMetrics == 0)
TraceEvent("DiskFailureInjectionFailed").detail("ChaosMetricCount", foundChaosMetrics);
else
TraceEvent("ChaosGetStatus").detail("ChaosMetricCount", foundChaosMetrics);
} catch (Error& e) {
// it's possible to get an empty event, it's okay to ignore
if (e.code() != error_code_attribute_not_found) {
throw e;
}
}
return Void();
}
// Periodically fetches chaosMetrics to ensure that chaas events are taking place
ACTOR static Future<Void> periodicMetricCheck(DiskThrottlingWorkload* self) {
// Periodically re-send the chaos event in case of a process restart
ACTOR static Future<Void> periodicEventBroadcast(DiskFailureInjectionWorkload* self) {
state double start = now();
state double elapsed = 0.0;
loop {
// re-send the chaos event in case of a process restart
wait(reSendChaos(self));
elapsed += self->periodicCheckInterval;
elapsed += self->periodicBroadcastInterval;
wait(delayUntil(start + elapsed));
wait(chaosGetStatus(self));
}
}
};
WorkloadFactory<DiskThrottlingWorkload> DiskThrottlingWorkloadFactory("DiskThrottling");
WorkloadFactory<DiskFailureInjectionWorkload> DiskFailureInjectionWorkloadFactory("DiskFailureInjection");

View File

@ -66,9 +66,9 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
// Chaos testing
init( ENABLE_CHAOS_FEATURES, false );
init( CHAOS_LOGGING_INTERVAL, 5.0 );
// Chaos testing - enabled for simulation by default
init( ENABLE_CHAOS_FEATURES, isSimulated );
init( CHAOS_LOGGING_INTERVAL, 5.0 );
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;

View File

@ -666,7 +666,7 @@ struct ChaosMetrics {
void clear() {
memset(this, 0, sizeof(ChaosMetrics));
startTime = timer_monotonic();
startTime = g_network ? g_network->now() : 0;
}
unsigned int diskDelays;
@ -703,11 +703,11 @@ struct DiskFailureInjector {
void setDiskFailure(double interval, double stallFor, double throttleFor) {
stallInterval = interval;
stallPeriod = stallFor;
stallUntil = std::max(stallUntil, timer_monotonic() + stallFor);
stallUntil = std::max(stallUntil, g_network->now() + stallFor);
// random stall duration in ms (chosen once)
stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5);
throttlePeriod = throttleFor;
throttleUntil = std::max(throttleUntil, timer_monotonic() + throttleFor);
throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor);
TraceEvent("SetDiskFailure")
.detail("StallInterval", interval)
.detail("StallPeriod", stallFor)
@ -719,8 +719,8 @@ struct DiskFailureInjector {
double getStallDelay() {
// If we are in a stall period and a stallInterval was specified, determine the
// delay to be inserted
if (((stallUntil - timer_monotonic()) > 0.0) && stallInterval) {
auto timeElapsed = fmod(timer_monotonic(), stallInterval);
if (((stallUntil - g_network->now()) > 0.0) && stallInterval) {
auto timeElapsed = fmod(g_network->now(), stallInterval);
return std::max(0.0, stallDuration - timeElapsed);
}
return 0.0;
@ -728,7 +728,7 @@ struct DiskFailureInjector {
double getThrottleDelay() {
// If we are in the throttle period, insert a random delay (in ms)
if ((throttleUntil - timer_monotonic()) > 0.0)
if ((throttleUntil - g_network->now()) > 0.0)
return (0.001 * deterministicRandom()->randomInt(1, 3));
return 0.0;

View File

@ -119,14 +119,12 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/BitFlippedCycle.toml IGNORE)
add_fdb_test(TEST_FILES fast/CacheTest.toml)
add_fdb_test(TEST_FILES fast/CloggedSideband.toml)
add_fdb_test(TEST_FILES fast/ConfigureLocked.toml)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/DiskThrottledCycle.toml IGNORE)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)
@ -231,6 +229,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/DDBalanceAndRemove.toml)
add_fdb_test(TEST_FILES slow/DDBalanceAndRemoveStatus.toml)
add_fdb_test(TEST_FILES slow/DifferentClustersSameRV.toml)
add_fdb_test(TEST_FILES slow/DiskFailureCycle.toml)
add_fdb_test(TEST_FILES slow/FastTriggeredWatches.toml)
add_fdb_test(TEST_FILES slow/LowLatencyWithFailures.toml)
add_fdb_test(TEST_FILES slow/MoveKeysClean.toml)

View File

@ -1,13 +0,0 @@
[[test]]
testTitle = 'BitFlippedCycle'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'BitFlipping'
testDuration = 60.0
percentBitFlips = 20.0

View File

@ -1,16 +0,0 @@
[[test]]
testTitle = 'DiskThrottledCycle'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'DiskThrottling'
testDuration = 30.0
stallInterval = 10.0
stallPeriod = 30.0
throttlePeriod = 30.0

View File

@ -0,0 +1,30 @@
[configuration]
buggify = false
minimumReplication = 3
minimumRegions = 3
logAntiQuorum = 0
[[test]]
testTitle = 'DiskFailureCycle'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'DiskFailureInjection'
testDuration = 20.0
startDelay = 20.0
throttleDisk = true
stallInterval = 10.0
stallPeriod = 20.0
throttlePeriod = 20.0
[[test.workload]]
testName = 'DiskFailureInjection'
testDuration = 20.0
startDelay = 40.0
corruptFile = true
percentBitFlips = 10