Wrapper class to avoid adding overhead to all async disk calls

This commit is contained in:
negoyal 2021-07-12 17:51:01 -07:00
parent 2b5a96f745
commit 1b8b22decc
12 changed files with 182 additions and 139 deletions

View File

@ -94,12 +94,16 @@ struct SetFailureInjection {
constexpr static FileIdentifier file_identifier = 15439864;
ReplyPromise<Void> reply;
struct ThrottleDiskCommand {
double time;
Optional<NetworkAddress> address; // TODO: NEELAM: how do we identify the machine
// how often should the delay be inserted (0 meaning once, 10 meaning every 10 secs)
double delayFrequency;
// min delay to be inserted
double delayMin;
//max delay to be inserted
double delayMax;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, time, address);
serializer(ar, delayFrequency, delayMin, delayMax);
}
};
Optional<ThrottleDiskCommand> throttleDisk;

View File

@ -0,0 +1,89 @@
/*
* VersionedBTree.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/flow.h"
#include "flow/serialize.h"
#include "flow/genericactors.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/network.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h"
//template <class AsyncFileType>
class AsyncFileDelayed final : public IAsyncFile, public ReferenceCounted<AsyncFileDelayed> {
private:
Reference<IAsyncFile> file;
public:
explicit AsyncFileDelayed(Reference<IAsyncFile> file) : file(file) {}
void addref() override { ReferenceCounted<AsyncFileDelayed>::addref(); }
void delref() override { ReferenceCounted<AsyncFileDelayed>::delref(); }
Future<int> read(void* data, int length, int64_t offset) override {
double delay = 0.0;
auto res = g_network->global(INetwork::enFailureInjector);
if (res)
delay = static_cast<DiskFailureInjector*>(res)->getDiskDelay();
TraceEvent("AsyncFileDelayedRead").detail("ThrottleDelay", delay);
return delayed(file->read(data, length, offset), delay);
}
Future<Void> write(void const* data, int length, int64_t offset) override {
double delay = 0.0;
auto res = g_network->global(INetwork::enFailureInjector);
if (res)
delay = static_cast<DiskFailureInjector*>(res)->getDiskDelay();
TraceEvent("AsyncFileDelayedWrite").detail("ThrottleDelay", delay);
return delayed(file->write(data, length, offset), delay);
}
Future<Void> truncate(int64_t size) override {
double delay = 0.0;
auto res = g_network->global(INetwork::enFailureInjector);
if (res)
delay = static_cast<DiskFailureInjector*>(res)->getDiskDelay();
return delayed(file->truncate(size), delay);
}
Future<Void> sync() override {
double delay = 0.0;
auto res = g_network->global(INetwork::enFailureInjector);
if (res)
delay = static_cast<DiskFailureInjector*>(res)->getDiskDelay();
return delayed(file->sync(), delay);
}
Future<int64_t> size() const override {
double delay = 0.0;
auto res = g_network->global(INetwork::enFailureInjector);
if (res)
delay = static_cast<DiskFailureInjector*>(res)->getDiskDelay();
return delayed(file->size(), delay);
}
int64_t debugFD() const override {
return file->debugFD();
}
std::string getFilename() const override {
return file->getFilename();
}
};

View File

@ -162,16 +162,14 @@ public:
Future<int> read(void* data, int length, int64_t offset) override {
++countFileLogicalReads;
++countLogicalReads;
double throttleFor = diskFailureInjector->getDiskDelay();
return read_impl(fd, data, length, offset, throttleFor);
return read_impl(fd, data, length, offset);
}
Future<Void> write(void const* data, int length, int64_t offset) override // Copies data synchronously
{
++countFileLogicalWrites;
++countLogicalWrites;
double throttleFor = diskFailureInjector->getDiskDelay();
// Standalone<StringRef> copy = StringRef((const uint8_t*)data, length);
return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset, throttleFor);
return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset);
}
Future<Void> truncate(int64_t size) override {
++countFileLogicalWrites;
@ -272,7 +270,6 @@ private:
int fd, flags;
Reference<ErrorInfo> err;
std::string filename;
//DiskFailureInjector* diskFailureInjector;
mutable Int64MetricHandle countFileLogicalWrites;
mutable Int64MetricHandle countFileLogicalReads;
@ -280,8 +277,7 @@ private:
mutable Int64MetricHandle countLogicalReads;
AsyncFileEIO(int fd, int flags, std::string const& filename)
: fd(fd), flags(flags), filename(filename), err(new ErrorInfo),
diskFailureInjector(DiskFailureInjector::injector()) {
: fd(fd), flags(flags), filename(filename), err(new ErrorInfo) {
if (!g_network->isSimulated()) {
countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename);
countFileLogicalReads.init(LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename);
@ -333,18 +329,13 @@ private:
TraceEvent("AsyncFileClosed").suppressFor(1.0).detail("Fd", fd);
}
ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset, double throttleFor) {
ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset) {
state TaskPriority taskID = g_network->getCurrentTask();
state Promise<Void> p;
// fprintf(stderr, "eio_read (fd=%d length=%d offset=%lld)\n", fd, length, offset);
state eio_req* r = eio_read(fd, data, length, offset, 0, eio_callback, &p);
try {
wait(p.getFuture());
// throttleDisk if enabled
//double throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
wait(delay(throttleFor));
}
} catch (...) {
g_network->setCurrentTask(taskID);
eio_cancel(r);
@ -367,17 +358,12 @@ private:
}
}
ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset, double throttleFor) {
ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset) {
state TaskPriority taskID = g_network->getCurrentTask();
state Promise<Void> p;
state eio_req* r = eio_write(fd, (void*)data.begin(), data.size(), offset, 0, eio_callback, &p);
try {
wait(p.getFuture());
// throttleDisk if enabled
//double throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
wait(delay(throttleFor));
}
} catch (...) {
g_network->setCurrentTask(taskID);
eio_cancel(r);
@ -567,8 +553,6 @@ private:
static void apple_fsync(eio_req* req) { req->result = fcntl(req->int1, F_FULLFSYNC, 0); }
static void free_req(eio_req* req) { free(req); }
#endif
public:
DiskFailureInjector* diskFailureInjector;
};
#ifdef FILESYSTEM_IMPL

View File

@ -195,10 +195,6 @@ public:
void addref() override { ReferenceCounted<AsyncFileKAIO>::addref(); }
void delref() override { ReferenceCounted<AsyncFileKAIO>::delref(); }
ACTOR static void throttleDisk(double throttleFor) {
if (throttleFor > 0.0)
wait(delay(throttleFor));
}
Future<int> read(void* data, int length, int64_t offset) override {
++countFileLogicalReads;
++countLogicalReads;
@ -216,9 +212,6 @@ public:
enqueue(io, "read", this);
Future<int> result = io->result.getFuture();
// throttleDisk if enabled
throttleDisk(diskFailureInjector->getDiskDelay());
#if KAIO_LOGGING
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
@ -244,9 +237,6 @@ public:
enqueue(io, "write", this);
Future<int> result = io->result.getFuture();
// throttleDisk if enabled
throttleDisk(diskFailureInjector->getDiskDelay());
#if KAIO_LOGGING
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
@ -758,9 +748,6 @@ private:
}
}
}
public:
DiskFailureInjector* diskFailureInjector;
};
#if KAIO_LOGGING

View File

@ -62,7 +62,7 @@ private:
Future<Void> shutdown;
public:
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file), diskFailureInjector(DiskFailureInjector::injector()) { shutdown = doShutdown(this); }
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file) { shutdown = doShutdown(this); }
ACTOR Future<Void> doShutdown(AsyncFileDetachable* self) {
wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture()));
@ -85,20 +85,12 @@ public:
Future<int> read(void* data, int length, int64_t offset) override {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
// throttleDisk if enabled
auto throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
TraceEvent("AsyncFileDetachable_Read").detail("ThrottleDelay", throttleFor);
//wait(delay(throttleFor));
}
return sendErrorOnShutdown(file->read(data, length, offset));
}
Future<Void> write(void const* data, int length, int64_t offset) override {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
if (diskFailureInjector->getDiskDelay() > 0.0)
TraceEvent("AsyncFileDetachable_Write").detail("ThrottleDelay", diskFailureInjector->getDiskDelay());
return sendErrorOnShutdown(file->write(data, length, offset));
}
@ -130,8 +122,6 @@ public:
throw io_error().asInjectedFault();
return file->getFilename();
}
public:
DiskFailureInjector* diskFailureInjector;
};
// An async file implementation which wraps another async file and will randomly destroy sectors that it is writing when
@ -203,7 +193,7 @@ private:
bool aio)
: filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters),
openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
aio(aio), diskFailureInjector(DiskFailureInjector::injector())
aio(aio)
{
// This is only designed to work in simulation
@ -321,7 +311,7 @@ public:
// Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the
// results
Future<int> read(void* data, int length, int64_t offset) override { return read(this, data, length, offset, diskFailureInjector->getDiskDelay()); }
Future<int> read(void* data, int length, int64_t offset) override { return read(this, data, length, offset); }
// Writes data to the file. Writes are delayed a random amount of time before being
// passed to the underlying file
@ -336,7 +326,7 @@ public:
Promise<Void> writeStarted;
Promise<Future<Void>> writeEnded;
writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset, diskFailureInjector->getDiskDelay()));
writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset));
return writeStarted.getFuture();
}
@ -444,7 +434,7 @@ private:
return readFuture.get();
}
ACTOR Future<int> read(AsyncFileNonDurable* self, void* data, int length, int64_t offset, double throttleFor = 0.0) {
ACTOR Future<int> read(AsyncFileNonDurable* self, void* data, int length, int64_t offset) {
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess));
@ -452,12 +442,6 @@ private:
try {
state int rep = wait(self->onRead(self, data, length, offset));
wait(g_simulator.onProcess(currentProcess, currentTaskID));
// throttleDisk if enabled
if (throttleFor > 0.0) {
TraceEvent("AsyncFileNonDurable_ReadDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("ReadLength", length).detail("Offset", offset);
wait(delay(throttleFor));
}
return rep;
} catch (Error& e) {
state Error err = e;
@ -474,8 +458,7 @@ private:
Future<Future<Void>> ownFuture,
void const* data,
int length,
int64_t offset,
double throttleFor = 0.0) {
int64_t offset) {
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess));
@ -639,11 +622,6 @@ private:
}
wait(waitForAll(writeFutures));
// throttleDisk if enabled
if (throttleFor > 0.0) {
TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
wait(delay(throttleFor));
}
//TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
return Void();
}
@ -889,8 +867,6 @@ private:
throw err;
}
}
public:
DiskFailureInjector* diskFailureInjector;
};
#include "flow/unactorcompiler.h"

View File

@ -31,6 +31,7 @@
#define FILESYSTEM_IMPL 1
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileDelayed.actor.h"
#include "fdbrpc/AsyncFileEIO.actor.h"
#include "fdbrpc/AsyncFileWinASIO.actor.h"
#include "fdbrpc/AsyncFileKAIO.actor.h"
@ -76,6 +77,8 @@ Future<Reference<class IAsyncFile>> Net2FileSystem::open(const std::string& file
static_cast<boost::asio::io_service*>((void*)g_network->global(INetwork::enASIOService)));
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileDelayed(r)); });
return f;
}

View File

@ -34,6 +34,7 @@
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h"
#include "fdbrpc/AsyncFileDelayed.actor.h"
#include "flow/crc32c.h"
#include "fdbrpc/TraceFileIO.h"
#include "flow/FaultInjection.h"
@ -1949,13 +1950,6 @@ public:
void clogPair(const IPAddress& from, const IPAddress& to, double seconds) override {
g_clogging.clogPairFor(from, to, seconds);
}
void throttleDisk(ProcessInfo* machine, double seconds) override {
machine->throttleDiskFor = seconds;
TraceEvent("ThrottleDisk").detail("Delay", seconds).
detail("Roles", getRoles(machine->address)).
detail("Address", machine->address).
detail("StartingClass", machine->startingClass.toString());
}
std::vector<ProcessInfo*> getAllProcesses() const override {
std::vector<ProcessInfo*> processes;
for (auto& c : machines) {
@ -2397,19 +2391,11 @@ Future<Void> waitUntilDiskReady(Reference<DiskParameters> diskParameters, int64_
diskParameters->nextOperation += (1.0 / diskParameters->iops) + (size / diskParameters->bandwidth);
double randomLatency;
if (g_simulator.getCurrentProcess()->throttleDiskFor) {
randomLatency = g_simulator.getCurrentProcess()->throttleDiskFor;
TraceEvent("WaitUntilDiskReadyThrottling")
.detail("Delay", randomLatency);
} else if (sync) {
if (sync) {
randomLatency = .005 + deterministicRandom()->random01() * (BUGGIFY ? 1.0 : .010);
} else
randomLatency = 10 * deterministicRandom()->random01() / diskParameters->iops;
TraceEvent("WaitUntilDiskReady").detail("Delay", randomLatency).
detail("Roles", g_simulator.getRoles(g_simulator.getCurrentProcess()->address)).
detail("Address", g_simulator.getCurrentProcess()->address).
detail("ThrottleDiskFor", g_simulator.getCurrentProcess()->throttleDiskFor);
return delayUntil(diskParameters->nextOperation + randomLatency);
}
@ -2488,6 +2474,8 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
f = AsyncFileDetachable::open(f);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileDelayed(r)); });
return f;
} else
return AsyncFileCached::open(filename, flags, mode);

View File

@ -87,7 +87,6 @@ public:
uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
bool failedDisk;
double throttleDiskFor;
UID uid;
@ -103,7 +102,7 @@ public:
: name(name), locality(locality), startingClass(startingClass), addresses(addresses),
address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder),
failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false), failedDisk(false), throttleDiskFor(0) {
fault_injection_r(0), machine(0), cleared(false), failedDisk(false) {
uid = deterministicRandom()->randomUniqueID();
}
@ -375,7 +374,6 @@ public:
virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0;
virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0;
virtual void throttleDisk(ProcessInfo* machine, double seconds) = 0;
virtual std::vector<ProcessInfo*> getAllProcesses() const = 0;
virtual ProcessInfo* getProcessByAddress(NetworkAddress const& address) = 0;
virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0;
@ -464,9 +462,8 @@ struct DiskParameters : ReferenceCounted<DiskParameters> {
double nextOperation;
int64_t iops;
int64_t bandwidth;
double throttleFor;
DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth), throttleFor(0) {}
DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth) {}
};
// Simulates delays for performing operations on disk

View File

@ -1516,8 +1516,14 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
if (req.throttleDisk.present()) {
TraceEvent("DiskThrottleRequest").detail("Delay", req.throttleDisk.get().time);
DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get().time);
TraceEvent("DiskThrottleRequest").detail("DelayFrequency",req.throttleDisk.get().delayFrequency).
detail("DelayMin", req.throttleDisk.get().delayMin).
detail("DelayMax", req.throttleDisk.get().delayMax);
auto diskFailureInjector = DiskFailureInjector::injector();
//DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get());
diskFailureInjector->throttleFor(req.throttleDisk.get().delayFrequency,
req.throttleDisk.get().delayMin,
req.throttleDisk.get().delayMax);
}
req.reply.send(Void());
} else {

View File

@ -10,12 +10,18 @@
struct DiskThrottlingWorkload : TestWorkload {
bool enabled;
double testDuration;
double throttleFor;
double throttleFrequency;
double throttleMin;
double throttleMax;
DiskThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
throttleFor = getOption(options, LiteralStringRef("throttleDelay"), 2.0);
TraceEvent("DiskThrottlingWorkload").detail("TestDuration", testDuration).detail("For", throttleFor);
throttleFrequency = getOption(options, LiteralStringRef("throttleFrequency"), 0.0);
throttleMin = getOption(options, LiteralStringRef("throttleMin"), 2.0);
throttleMax = getOption(options, LiteralStringRef("throttleMax"), 2.0);
TraceEvent("DiskThrottlingWorkload")
.detail("TestDuration", testDuration).detail("Frequency", throttleFrequency)
.detail("Min", throttleMin).detail("Max", throttleMax);
}
std::string description() const override {
@ -28,12 +34,6 @@ struct DiskThrottlingWorkload : TestWorkload {
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
//if (&g_simulator == g_network && enabled) {
// TraceEvent("DiskThrottlingStart").detail("For", throttleFor);
// return timeout(reportErrors(throttleDiskClient<ISimulator::ProcessInfo*>(cx, this), "DiskThrottlingError"),
// testDuration,
// Void());
//} else
if (enabled) {
return timeout(reportErrors(throttleDiskClient<WorkerInterface>(cx, this), "DiskThrottlingError"),
testDuration,
@ -46,13 +46,6 @@ struct DiskThrottlingWorkload : TestWorkload {
void getMetrics(vector<PerfMetric>& m) override {}
ACTOR void doThrottle_unused(ISimulator::ProcessInfo* machine, double t, double delay = 0.0) {
wait(::delay(delay));
TraceEvent("ThrottleDisk").detail("For", t);
g_simulator.throttleDisk(machine, t);
TraceEvent("ThrottleDiskSet").detail("For", t);
}
static void checkDiskThrottleResult(Future<Void> res, WorkerInterface worker) {
if (res.isError()) {
auto err = res.getError();
@ -67,36 +60,20 @@ struct DiskThrottlingWorkload : TestWorkload {
}
}
ACTOR void doThrottle(WorkerInterface worker, double t, double delay = 0.0) {
ACTOR void doThrottle(WorkerInterface worker, double frequency, double minDelay, double maxDelay, double startDelay = 0.0) {
state Future<Void> res;
wait(::delay(delay));
wait(::delay(startDelay));
SetFailureInjection::ThrottleDiskCommand throttleDisk;
throttleDisk.time = t;
throttleDisk.delayFrequency = frequency;
throttleDisk.delayMin = minDelay;
throttleDisk.delayMax = maxDelay;
SetFailureInjection req;
req.throttleDisk = throttleDisk;
TraceEvent("ThrottleDisk").detail("For", t);
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkDiskThrottleResult(res, worker);
}
static Future<Void> getAllWorkers_unused(DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
result->clear();
*result = g_simulator.getAllProcesses();
return Void();
}
static Future<Void> getAllStorageWorkers_unused(Database cx, DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
for (int i = 0; i < all.size(); i++)
if (!all[i]->failed &&
all[i]->name == std::string("Server") &&
((all[i]->startingClass == ProcessClass::StorageClass) ||
(all[i]->startingClass == ProcessClass::UnsetClass)))
result->emplace_back(all[i]);
return Void();
}
ACTOR static Future<Void> getAllWorkers(DiskThrottlingWorkload* self, std::vector<WorkerInterface>* result) {
result->clear();
std::vector<WorkerDetails> res =
@ -125,8 +102,7 @@ struct DiskThrottlingWorkload : TestWorkload {
wait(poisson(&lastTime, 1));
wait(DiskThrottlingWorkload::getAllStorageWorkers(cx, self, &machines));
auto machine = deterministicRandom()->randomChoice(machines);
TraceEvent("DoThrottleDisk").detail("For", self->throttleFor);
self->doThrottle(machine, self->throttleFor);
self->doThrottle(machine, self->throttleFrequency, self->throttleMin, self->throttleMax);
}
}
};

View File

@ -647,6 +647,44 @@ public:
// Returns the interface that should be used to make and accept socket connections
};
struct DelayGenerator : FastAllocated<DelayGenerator> {
void setDelay(double frequency, double min, double max) {
delayFrequency = frequency;
delayMin = min;
delayMax = max;
delayFor = (delayMin == delayMax) ? delayMin : deterministicRandom()->randomInt(delayMin, delayMax);
delayUntil = std::max(delayUntil, timer_monotonic() + delayFor);
TraceEvent("DelayGeneratorSetDelay").detail("DelayFrequency", frequency).detail("DelayMin", min).
detail("DelayMax", max).detail("DelayFor", delayFor).detail("DelayUntil", delayUntil);
}
double getDelay() {
// If a delayFrequency was specified, this logic determins the delay to be inserted at any point in time
if (delayFrequency) {
auto timeElapsed = fmod(timer_monotonic(), delayFrequency);
TraceEvent("DelayGeneratorGetDelay").detail("DelayFrequency", delayFrequency).
detail("TimeElapsed", timeElapsed).detail("DelayFor", delayFor);
return std::max(0.0, delayFor - timeElapsed);
}
TraceEvent("DelayGeneratorGetDelay").detail("DelayFrequency", delayFrequency).
detail("CurTime", timer_monotonic()).detail("DelayUntil", delayUntil);
return std::max(0.0, delayUntil - timer_monotonic());
}
private: //members
double delayFrequency = 0.0; // how often should the delay be inserted (0 meaning once, 10 meaning every 10 secs)
double delayMin; // min delay to be inserted
double delayMax; // max delay to be inserted
double delayFor = 0.0; // randomly chosen delay between min and max
double delayUntil = 0.0; // used when the delayFrequency is 0
public: // construction
DelayGenerator() = default;
DelayGenerator(DelayGenerator const&) = delete;
};
struct DiskFailureInjector : FastAllocated<DiskFailureInjector> {
static DiskFailureInjector* injector() {
auto res = g_network->global(INetwork::enFailureInjector);
@ -657,25 +695,19 @@ struct DiskFailureInjector : FastAllocated<DiskFailureInjector> {
return static_cast<DiskFailureInjector*>(res);
}
//virtual void throttleFor(double time) = 0;
//virtual double getDiskDelay() = 0;
void throttleFor(double time) {
TraceEvent("DiskFailureInjectorBefore").detail("ThrottleUntil", throttleUntil);
throttleUntil = std::max(throttleUntil, timer_monotonic() + time);
TraceEvent("DiskFailureInjectorAfter").detail("ThrottleUntil", throttleUntil);
void throttleFor(double frequency, double delayMin, double delayMax) {
delayGenerator.setDelay(frequency, delayMin, delayMax);
}
double getDiskDelay() {
if (!FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
return 0.0;
}
return std::max(0.0, throttleUntil - timer_monotonic());
return delayGenerator.getDelay();
}
private: // members
double throttleUntil = 0.0;
std::unordered_map<NetworkAddress, double> throttleDisk;
DelayGenerator delayGenerator;
private: // construction
DiskFailureInjector() = default;

View File

@ -10,4 +10,5 @@ testTitle = 'DiskThrottledCycle'
[[test.workload]]
testName = 'DiskThrottling'
testDuration = 30.0
throttleFrequency = 10.0