Implement Disk Throttling Chaos workload.

This commit is contained in:
negoyal 2021-06-30 17:05:04 -07:00
parent 436ab6cb7f
commit df39c5a44e
12 changed files with 141 additions and 11 deletions

View File

@ -31,8 +31,10 @@
// A ClientWorkerInterface is embedded as the first element of a WorkerInterface.
struct ClientWorkerInterface {
constexpr static FileIdentifier file_identifier = 12418152;
RequestStream<struct RebootRequest> reboot;
RequestStream<struct ProfilerRequest> profiler;
RequestStream<struct SetFailureInjection> setFailureInjection;
bool operator==(ClientWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(ClientWorkerInterface const& r) const { return id() != r.id(); }
@ -43,7 +45,7 @@ struct ClientWorkerInterface {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reboot, profiler);
serializer(ar, reboot, profiler, setFailureInjection);
}
};
@ -88,4 +90,23 @@ struct ProfilerRequest {
}
};
struct SetFailureInjection {
constexpr static FileIdentifier file_identifier = 15439864;
ReplyPromise<Void> reply;
struct ThrottleDiskCommand {
double time;
Optional<NetworkAddress> address; // TODO: NEELAM: how do we identify the machine
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, time, address);
}
};
Optional<ThrottleDiskCommand> throttleDisk;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, throttleDisk);
}
};
#endif

View File

@ -162,14 +162,16 @@ public:
Future<int> read(void* data, int length, int64_t offset) override {
++countFileLogicalReads;
++countLogicalReads;
return read_impl(fd, data, length, offset);
double throttleFor = diskFailureInjector->getDiskDelay();
return read_impl(fd, data, length, offset, throttleFor);
}
Future<Void> write(void const* data, int length, int64_t offset) override // Copies data synchronously
{
++countFileLogicalWrites;
++countLogicalWrites;
double throttleFor = diskFailureInjector->getDiskDelay();
// Standalone<StringRef> copy = StringRef((const uint8_t*)data, length);
return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset);
return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset, throttleFor);
}
Future<Void> truncate(int64_t size) override {
++countFileLogicalWrites;
@ -270,6 +272,7 @@ private:
int fd, flags;
Reference<ErrorInfo> err;
std::string filename;
//DiskFailureInjector* diskFailureInjector;
mutable Int64MetricHandle countFileLogicalWrites;
mutable Int64MetricHandle countFileLogicalReads;
@ -277,7 +280,8 @@ private:
mutable Int64MetricHandle countLogicalReads;
AsyncFileEIO(int fd, int flags, std::string const& filename)
: fd(fd), flags(flags), filename(filename), err(new ErrorInfo) {
: fd(fd), flags(flags), filename(filename), err(new ErrorInfo),
diskFailureInjector(DiskFailureInjector::injector()) {
if (!g_network->isSimulated()) {
countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename);
countFileLogicalReads.init(LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename);
@ -329,13 +333,18 @@ private:
TraceEvent("AsyncFileClosed").suppressFor(1.0).detail("Fd", fd);
}
ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset) {
ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset, double throttleFor) {
state TaskPriority taskID = g_network->getCurrentTask();
state Promise<Void> p;
// fprintf(stderr, "eio_read (fd=%d length=%d offset=%lld)\n", fd, length, offset);
state eio_req* r = eio_read(fd, data, length, offset, 0, eio_callback, &p);
try {
wait(p.getFuture());
// throttleDisk if enabled
//double throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
wait(delay(throttleFor));
}
} catch (...) {
g_network->setCurrentTask(taskID);
eio_cancel(r);
@ -358,12 +367,17 @@ private:
}
}
ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset) {
ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset, double throttleFor) {
state TaskPriority taskID = g_network->getCurrentTask();
state Promise<Void> p;
state eio_req* r = eio_write(fd, (void*)data.begin(), data.size(), offset, 0, eio_callback, &p);
try {
wait(p.getFuture());
// throttleDisk if enabled
//double throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
wait(delay(throttleFor));
}
} catch (...) {
g_network->setCurrentTask(taskID);
eio_cancel(r);
@ -553,6 +567,8 @@ private:
static void apple_fsync(eio_req* req) { req->result = fcntl(req->int1, F_FULLFSYNC, 0); }
static void free_req(eio_req* req) { free(req); }
#endif
public:
DiskFailureInjector* diskFailureInjector;
};
#ifdef FILESYSTEM_IMPL

View File

@ -195,7 +195,10 @@ public:
void addref() override { ReferenceCounted<AsyncFileKAIO>::addref(); }
void delref() override { ReferenceCounted<AsyncFileKAIO>::delref(); }
ACTOR static void throttleDisk(double throttleFor) {
if (throttleFor > 0.0)
wait(delay(throttleFor));
}
Future<int> read(void* data, int length, int64_t offset) override {
++countFileLogicalReads;
++countLogicalReads;
@ -213,6 +216,9 @@ public:
enqueue(io, "read", this);
Future<int> result = io->result.getFuture();
// throttleDisk if enabled
throttleDisk(diskFailureInjector->getDiskDelay());
#if KAIO_LOGGING
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
@ -238,6 +244,9 @@ public:
enqueue(io, "write", this);
Future<int> result = io->result.getFuture();
// throttleDisk if enabled
throttleDisk(diskFailureInjector->getDiskDelay());
#if KAIO_LOGGING
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
@ -749,6 +758,9 @@ private:
}
}
}
public:
DiskFailureInjector* diskFailureInjector;
};
#if KAIO_LOGGING

View File

@ -34,6 +34,7 @@
// must complete or cancel, but you should probably look at the file implementations you'll be using.
class IAsyncFile {
public:
//explicit IAsyncFile() : diskFailureInjector(DiskFailureInjector::injector()) {}
virtual ~IAsyncFile();
// Pass these to g_network->open to get an IAsyncFile
enum {
@ -95,6 +96,9 @@ public:
// Used for rate control, at present, only AsyncFileCached supports it
virtual Reference<IRateControl> const& getRateControl() { throw unsupported_operation(); }
virtual void setRateControl(Reference<IRateControl> const& rc) { throw unsupported_operation(); }
//public:
//DiskFailureInjector* diskFailureInjector;
};
typedef void (*runCycleFuncPtr)();

View File

@ -1949,6 +1949,13 @@ public:
void clogPair(const IPAddress& from, const IPAddress& to, double seconds) override {
g_clogging.clogPairFor(from, to, seconds);
}
void throttleDisk(ProcessInfo* machine, double seconds) override {
machine->throttleDiskFor = seconds;
TraceEvent("ThrottleDisk").detail("Delay", seconds).
detail("Roles", getRoles(machine->address)).
detail("Address", machine->address).
detail("StartingClass", machine->startingClass.toString());
}
std::vector<ProcessInfo*> getAllProcesses() const override {
std::vector<ProcessInfo*> processes;
for (auto& c : machines) {
@ -2390,11 +2397,19 @@ Future<Void> waitUntilDiskReady(Reference<DiskParameters> diskParameters, int64_
diskParameters->nextOperation += (1.0 / diskParameters->iops) + (size / diskParameters->bandwidth);
double randomLatency;
if (sync) {
if (g_simulator.getCurrentProcess()->throttleDiskFor) {
randomLatency = g_simulator.getCurrentProcess()->throttleDiskFor;
TraceEvent("WaitUntilDiskReadyThrottling")
.detail("Delay", randomLatency);
} else if (sync) {
randomLatency = .005 + deterministicRandom()->random01() * (BUGGIFY ? 1.0 : .010);
} else
randomLatency = 10 * deterministicRandom()->random01() / diskParameters->iops;
TraceEvent("WaitUntilDiskReady").detail("Delay", randomLatency).
detail("Roles", g_simulator.getRoles(g_simulator.getCurrentProcess()->address)).
detail("Address", g_simulator.getCurrentProcess()->address).
detail("ThrottleDiskFor", g_simulator.getCurrentProcess()->throttleDiskFor);
return delayUntil(diskParameters->nextOperation + randomLatency);
}

View File

@ -87,6 +87,7 @@ public:
uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
bool failedDisk;
double throttleDiskFor;
UID uid;
@ -102,7 +103,7 @@ public:
: name(name), locality(locality), startingClass(startingClass), addresses(addresses),
address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder),
failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false), failedDisk(false) {
fault_injection_r(0), machine(0), cleared(false), failedDisk(false), throttleDiskFor(0) {
uid = deterministicRandom()->randomUniqueID();
}
@ -374,6 +375,7 @@ public:
virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0;
virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0;
virtual void throttleDisk(ProcessInfo* machine, double seconds) = 0;
virtual std::vector<ProcessInfo*> getAllProcesses() const = 0;
virtual ProcessInfo* getProcessByAddress(NetworkAddress const& address) = 0;
virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0;
@ -462,8 +464,9 @@ struct DiskParameters : ReferenceCounted<DiskParameters> {
double nextOperation;
int64_t iops;
int64_t bandwidth;
double throttleFor;
DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth) {}
DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth), throttleFor(0) {}
};
// Simulates delays for performing operations on disk

View File

@ -158,6 +158,7 @@ set(FDBSERVER_SRCS
workloads/ChangeConfig.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/TriggerRecovery.actor.cpp
workloads/DiskThrottling.actor.cpp
workloads/SuspendProcesses.actor.cpp
workloads/CommitBugCheck.actor.cpp
workloads/ConfigureDatabase.actor.cpp

View File

@ -1209,6 +1209,10 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
TraceEvent(SevWarnAlways, "ChaosFeaturesEnabled");
}
folder = abspath(folder);
if (metricsPrefix.size() > 0) {
@ -1509,6 +1513,16 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
flushAndExit(0);
}
}
when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
if (req.throttleDisk.present()) {
DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get().time);
}
req.reply.send(Void());
} else {
req.reply.sendError(client_invalid_operation());
}
}
when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) {
state ProfilerRequest profilerReq = req;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,

View File

@ -64,6 +64,10 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) {
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
// Chaos testing
init( ENABLE_CHAOS_FEATURES, false );
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option.

View File

@ -98,6 +98,9 @@ public:
double HUGE_ARENA_LOGGING_BYTES;
double HUGE_ARENA_LOGGING_INTERVAL;
// Chaos testing
bool ENABLE_CHAOS_FEATURES;
bool WRITE_TRACING_ENABLED;
int TRACING_UDP_LISTENER_PORT;

View File

@ -486,7 +486,8 @@ public:
enNetworkAddressesFunc = 11,
enClientFailureMonitor = 12,
enSQLiteInjectedError = 13,
enGlobalConfig = 14
enGlobalConfig = 14,
enFailureInjector = 15
};
virtual void longTaskCheck(const char* name) {}
@ -646,4 +647,39 @@ public:
// Returns the interface that should be used to make and accept socket connections
};
struct DiskFailureInjector : FastAllocated<DiskFailureInjector> {
static DiskFailureInjector* injector() {
auto res = g_network->global(INetwork::enFailureInjector);
if (!res) {
res = new DiskFailureInjector();
g_network->setGlobal(INetwork::enFailureInjector, res);
}
return static_cast<DiskFailureInjector*>(res);
}
//double getSendDelay(NetworkAddress const& peer);
//double getReceiveDelay(NetworkAddress const& peer);
//virtual void throttleFor(double time) = 0;
//virtual double getDiskDelay() = 0;
void throttleFor(double time) {
throttleUntil = std::max(throttleUntil, timer_monotonic() + time);
}
double getDiskDelay() {
if (!FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
return 0.0;
}
return throttleUntil;
}
private: // members
double throttleUntil = 0.0;
private: // construction
DiskFailureInjector() = default;
DiskFailureInjector(DiskFailureInjector const&) = delete;
};
#endif

View File

@ -124,6 +124,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/DiskThrottledCycle.toml IGNORE)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)