Adding BlobFailureInjection workload

This commit is contained in:
Josh Slocum 2023-03-28 12:55:05 -05:00 committed by Xiaoge Su
parent fe6a342388
commit ff0c61aaf0
20 changed files with 288 additions and 3 deletions

View File

@ -21,6 +21,7 @@
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/IAsyncFile.h"
#include "flow/FaultInjection.h"
#include "flow/Platform.actor.h"
#include "flow/Platform.h"
#include "fdbrpc/simulator.h"
@ -67,6 +68,8 @@ public:
Future<Void> r = uncancellable(holdWhile(old, m_file->write(old.begin(), size, m_writeOffset)));
m_writeOffset += size;
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::flush");
return r;
}
@ -77,6 +80,9 @@ public:
std::string name = f->m_file->getFilename();
f->m_file.clear();
wait(IAsyncFileSystem::filesystem()->renameFile(name, f->m_finalFullPath));
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::finish");
return Void();
}
@ -116,6 +122,8 @@ ACTOR static Future<BackupContainerFileSystem::FilesAndSizesT> listFiles_impl(st
results.push_back({ f.substr(m_path.size() + 1), ::fileSize(f) });
}
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::listFiles");
return results;
}
@ -217,6 +225,7 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
if (usesEncryption()) {
flags |= IAsyncFile::OPEN_ENCRYPTED;
}
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::readFile");
// Simulation does not properly handle opening the same file from multiple machines using a shared filesystem,
// so create a symbolic link to make each file opening appear to be unique. This could also work in production
// but only if the source directory is writeable which shouldn't be required for a restore.
@ -268,6 +277,7 @@ Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std:
}
Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const std::string& path) {
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::writeFile");
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE |
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE;
if (usesEncryption()) {
@ -286,6 +296,7 @@ Future<Void> BackupContainerLocalDirectory::writeEntireFile(const std::string& p
Future<Void> BackupContainerLocalDirectory::deleteFile(const std::string& path) {
::deleteFile(joinPath(m_path, path));
INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::deleteFile");
return Void();
}

View File

@ -62,7 +62,7 @@ struct ProcessInfo : NonCopyable {
INetworkConnections* network;
uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
double fault_injection_p1, fault_injection_p2, blob_inject_failure_rate;
bool failedDisk;
UID uid;
@ -82,7 +82,8 @@ struct ProcessInfo : NonCopyable {
: name(name), coordinationFolder(coordinationFolder), dataFolder(dataFolder), machine(nullptr),
addresses(addresses), address(addresses.address), locality(locality), startingClass(startingClass),
failed(false), excluded(false), cleared(false), rebooting(false), drProcess(false), network(net),
fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), failedDisk(false) {
fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), blob_inject_failure_rate(0),
failedDisk(false) {
uid = deterministicRandom()->randomUniqueID();
}

View File

@ -129,6 +129,8 @@ public:
KillType* ktFinal = nullptr) = 0;
virtual bool killAll(KillType kt, bool forceKill = false, KillType* ktFinal = nullptr) = 0;
// virtual KillType getMachineKillState( UID zoneID ) = 0;
virtual void processInjectBlobFault(ProcessInfo* machine, double failureRate) = 0;
virtual void processStopInjectBlobFault(ProcessInfo* machine) = 0;
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses,
std::vector<ProcessInfo*> const& deadProcesses,
KillType kt,

View File

@ -116,6 +116,29 @@ bool simulator_should_inject_fault(const char* context, const char* file, int li
return false;
}
bool simulator_should_inject_blob_fault(const char* context, const char* file, int line, int error_code) {
if (!g_network->isSimulated() || !faultInjectionActivated)
return false;
auto p = g_simulator->getCurrentProcess();
if (!g_simulator->speedUpSimulation && deterministicRandom()->random01() < p->blob_inject_failure_rate) {
CODE_PROBE(true, "A blob fault was injected", probe::assert::simOnly, probe::context::sim2);
CODE_PROBE(error_code == error_code_http_request_failed,
"A failed http request was injected",
probe::assert::simOnly,
probe::context::sim2);
TraceEvent("BlobFaultInjected")
.detail("Context", context)
.detail("File", file)
.detail("Line", line)
.detail("ErrorCode", error_code);
return true;
}
return false;
}
void ISimulator::disableFor(const std::string& desc, double time) {
disabledMap[desc] = time;
}
@ -1501,6 +1524,7 @@ public:
// The following function will determine if a machine can be remove in case when it has a blob worker
bool canKillMachineWithBlobWorkers(Optional<Standalone<StringRef>> machineId, KillType kt, KillType* ktFinal) {
// Allow if no blob workers, or it's a reboot(without removing the machine)
// FIXME: this should be ||
if (!blobGranulesEnabled && kt >= KillType::RebootAndDelete) {
return true;
}
@ -2348,6 +2372,18 @@ public:
g_clogging.unclogPair(from, to);
}
void processInjectBlobFault(ProcessInfo* machine, double failureRate) override {
CODE_PROBE(true, "Simulated process beginning blob fault", probe::context::sim2, probe::assert::simOnly);
should_inject_blob_fault = simulator_should_inject_blob_fault;
ASSERT(machine->blob_inject_failure_rate == 0.0);
machine->blob_inject_failure_rate = failureRate;
}
void processStopInjectBlobFault(ProcessInfo* machine) override {
CODE_PROBE(true, "Simulated process stopping blob fault", probe::context::sim2, probe::assert::simOnly);
machine->blob_inject_failure_rate = 0.0;
}
std::vector<ProcessInfo*> getAllProcesses() const override {
std::vector<ProcessInfo*> processes;
for (auto& c : machines) {

View File

@ -5330,7 +5330,8 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
} catch (Error& e) {
// These should not get an error that then causes a transaction retry loop. All error handling
// should be done in the purge calls
if (e.code() == error_code_operation_cancelled ||
// FIXME: retry purging if it gets blobstore errors instead of killing blob manager
if (e.code() == error_code_operation_cancelled || e.code() == error_code_http_request_failed ||
e.code() == error_code_blob_manager_replaced || e.code() == error_code_platform_error) {
throw e;
}

View File

@ -0,0 +1,173 @@
/*
* BlobFailureInjection.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/FaultInjection.h"
#include "flow/DeterministicRandom.h"
#include "fdbrpc/SimulatorProcessInfo.h"
#include "flow/actorcompiler.h" // This must be the last #include.
/*
* The BlobFailureInjection workload is designed to simulate blob storage becoming temporarily flaky or unavailable,
* from a single host to the whole cluster.
* TODO: add blob storage becoming permanently flaky or unavailable on a single host, to ensure the system moves work
* away accordingly. Could also handle that through attrition workload maybe?
* FIXME: make this work outside simulation. Talk to workers like DiskFailureInjection does and add S3BlobStore and
* AzureBlobStore fault injection points.
*/
struct BlobFailureInjectionWorkload : FailureInjectionWorkload {
static constexpr auto NAME = "BlobFailureInjection";
bool enabled;
double enableProbability = 0.5;
double testDuration = 10.0;
std::vector<ISimulator::ProcessInfo*> currentlyAffected;
BlobFailureInjectionWorkload(WorkloadContext const& wcx, NoOptions) : FailureInjectionWorkload(wcx) {
enabled = !clientId && g_network->isSimulated() && faultInjectionActivated;
}
BlobFailureInjectionWorkload(WorkloadContext const& wcx) : FailureInjectionWorkload(wcx) {
// only do this on the "first" client, and only when in simulation and only when fault injection is enabled
enabled = !clientId && g_network->isSimulated() && faultInjectionActivated;
enableProbability = getOption(options, "enableProbability"_sr, enableProbability);
testDuration = getOption(options, "testDuration"_sr, testDuration);
enabled = (enabled && deterministicRandom()->random01() < enableProbability);
}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return _start(cx, this); }
bool shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const override {
return alreadyAdded < 1 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01();
}
void undoFaultInjection() {
if (!currentlyAffected.empty()) {
TraceEvent("BlobFailureInjectionUnFailing").detail("Count", currentlyAffected.size());
}
for (auto& it : currentlyAffected) {
TraceEvent("BlobFailureInjectionUnFailingProcess").detail("Addr", it->address);
g_simulator->processStopInjectBlobFault(it);
}
currentlyAffected.clear();
}
ACTOR Future<Void> _start(Database cx, BlobFailureInjectionWorkload* self) {
if (!self->enabled) {
return Void();
}
CODE_PROBE(true, "Running workload with blob failure injection");
TraceEvent("BlobFailureInjectionBegin").log();
auto processes = getServers();
deterministicRandom()->randomShuffle(processes);
wait(timeout(
reportErrors(self->worker(cx, self, processes), "BlobFailureInjectionWorkerError"), self->testDuration, Void()));
// Undo all fault injection before exiting, if worker didn't
self->undoFaultInjection();
TraceEvent("BlobFailureInjectionEnd").log();
return Void();
}
// TODO: share code with machine attrition
static std::vector<ISimulator::ProcessInfo*> getServers() {
std::vector<ISimulator::ProcessInfo*> machines;
std::vector<ISimulator::ProcessInfo*> all = g_simulator->getAllProcesses();
for (int i = 0; i < all.size(); i++)
if (!all[i]->failed && all[i]->name == std::string("Server") &&
all[i]->startingClass != ProcessClass::TesterClass)
machines.push_back(all[i]);
return machines;
}
ACTOR Future<Void> worker(Database cx, BlobFailureInjectionWorkload* self, std::vector<ISimulator::ProcessInfo*> processes) {
int minFailureDuration = 5;
int maxFailureDuration = std::max(10, (int)(self->testDuration / 2));
state double failureDuration =
deterministicRandom()->randomSkewedUInt32(minFailureDuration, maxFailureDuration);
// add a random amount between 0 and 1, otherwise it's a whole number
failureDuration += deterministicRandom()->random01();
state double delayBefore = deterministicRandom()->random01() * (std::max<double>(0.0, self->testDuration - failureDuration));
wait(delay(delayBefore));
// TODO: pick one random worker, a subset of workers, or entire cluster randomly
int amountToFail = 1;
if (deterministicRandom()->coinflip()) {
if (deterministicRandom()->coinflip()) {
// fail all processes
amountToFail = processes.size();
} else if (processes.size() > 3) {
// fail a random amount of processes up to half
amountToFail = deterministicRandom()->randomInt(2, std::max<int>(3, processes.size() / 2));
}
} // fail 1 process 50% of the time
ASSERT(amountToFail <= processes.size());
ASSERT(amountToFail > 0);
double failureRate;
if (deterministicRandom()->coinflip()) {
// fail all requests - blob store is completely unreachable
failureRate = 1.0;
} else {
// fail a random percentage of requests, biasing towards low percentages.
// This is based on the intuition that failing 98% of requests is not very different than failing 99%, but
// failing 0.1% vs 1% is different
failureRate = deterministicRandom()->randomSkewedUInt32(1, 1000) / 1000.0;
}
CODE_PROBE(true, "blob failure injection killing processes");
TraceEvent("BlobFailureInjectionFailing")
.detail("Count", amountToFail)
.detail("Duration", failureDuration)
.detail("FailureRate", failureRate)
.log();
for (int i = 0; i < amountToFail; i++) {
TraceEvent("BlobFailureInjectionFailingProcess").detail("Addr", processes[i]->address);
self->currentlyAffected.push_back(processes[i]);
g_simulator->processInjectBlobFault(processes[i], failureRate);
}
wait(delay(failureDuration));
self->undoFaultInjection();
return Void();
}
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<BlobFailureInjectionWorkload> BlobFailureInjectionWorkloadFactory;
// TODO enable once bugs fixed!
// FailureInjectorFactory<BlobFailureInjectionWorkload> BlobFailureInjectionFailureWorkloadFactory;

View File

@ -21,6 +21,7 @@
#include "flow/FaultInjection.h"
bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code) = 0;
bool faultInjectionActivated = true;
void enableFaultInjection(bool enabled) {

View File

@ -31,11 +31,23 @@
#define SHOULD_INJECT_FAULT(context) (should_inject_fault && should_inject_fault(context, __FILE__, __LINE__, 0))
#define INJECT_BLOB_FAULT(error_type, context) \
do { \
if (should_inject_blob_fault && \
should_inject_blob_fault(context, __FILE__, __LINE__, error_code_##error_type)) \
throw error_type().asInjectedFault(); \
} while (0)
#define SHOULD_INJECT_BLOB_FAULT(context) \
(should_inject_blob_fault && should_inject_blob_fault(context, __FILE__, __LINE__, 0))
extern bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code);
extern bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code);
extern bool faultInjectionActivated;
extern void enableFaultInjection(bool enabled); // Enable fault injection called from fdbserver actor main function
#else
#define INJECT_FAULT(error_type, context)
#define INJECT_BLOB_FAULT(error_type, context)
#endif
#endif

View File

@ -44,3 +44,7 @@ testTitle = 'BlobGranuleMoveVerifyCycle'
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0

View File

@ -39,3 +39,7 @@ testTitle = 'BlobGranuleVerifyAtomicOps'
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 30.0

View File

@ -42,3 +42,7 @@ testTitle = 'BlobGranuleVerifyCycle'
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0

View File

@ -41,3 +41,7 @@ testTitle = 'BlobGranuleVerifySmall'
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0

View File

@ -34,3 +34,7 @@ testTitle = 'BlobGranuleRanges'
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 30.0

View File

@ -54,6 +54,10 @@ clearAfterTest=false
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 30.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'

View File

@ -44,6 +44,10 @@ runSetup=false
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 30.0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 30.0

View File

@ -62,6 +62,10 @@ clearAfterTest=false
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'

View File

@ -52,6 +52,10 @@ runSetup=false
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 60.0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0

View File

@ -40,3 +40,7 @@ testTitle = 'BlobGranuleCorrectness'
machinesToLeave = 3
reboot = true
testDuration = 120.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 120.0

View File

@ -52,3 +52,7 @@ testTitle = 'BlobGranuleVerifyBalance'
maxDelay = 100
kill1Timeout = 30
kill2Timeout = 6000
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 120.0

View File

@ -49,3 +49,7 @@ testTitle = 'BlobGranuleVerifyLarge'
reboot = true
testDuration = 120.0
[[test.workload]]
testName = 'BlobFailureInjection'
testDuration = 120.0