Remember disk corruptions and downgrade trace severity if a corruption was injected
This commit is contained in:
parent
76175be123
commit
d1c80659b5
|
@ -1010,7 +1010,9 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
|
|||
|
||||
choose {
|
||||
when(wait(waitForAll(leaderServers))) {}
|
||||
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
|
||||
when(wait(delay(5.0))) {
|
||||
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
|
||||
}
|
||||
}
|
||||
TraceEvent("ChangeQuorumCheckerSetCoordinatorsKey")
|
||||
.detail("CurrentCoordinators", old.toString())
|
||||
|
@ -1112,7 +1114,9 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
|
|||
TaskPriority::CoordinationReply));
|
||||
choose {
|
||||
when(wait(waitForAll(leaderServers))) {}
|
||||
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
|
||||
when(wait(delay(5.0))) {
|
||||
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
|
||||
}
|
||||
}
|
||||
|
||||
tr.set(coordinatorsKey, newClusterConnectionString.toString());
|
||||
|
@ -2145,6 +2149,9 @@ ACTOR Future<Void> lockDatabase(Reference<ReadYourWritesTransaction> tr, UID id)
|
|||
|
||||
ACTOR Future<Void> lockDatabase(Database cx, UID id) {
|
||||
state Transaction tr(cx);
|
||||
UID debugID = deterministicRandom()->randomUniqueID();
|
||||
TraceEvent("LockDatabaseTransaction", debugID).log();
|
||||
tr.debugTransaction(debugID);
|
||||
loop {
|
||||
try {
|
||||
wait(lockDatabase(&tr, id));
|
||||
|
|
|
@ -25,15 +25,18 @@
|
|||
#include "flow/IAsyncFile.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
|
||||
// template <class AsyncFileType>
|
||||
class AsyncFileChaos final : public IAsyncFile, public ReferenceCounted<AsyncFileChaos> {
|
||||
private:
|
||||
Reference<IAsyncFile> file;
|
||||
// since we have to read this often, we cache the filename here
|
||||
std::string filename;
|
||||
bool enabled;
|
||||
|
||||
public:
|
||||
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file) {
|
||||
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file), filename(file->getFilename()) {
|
||||
// We only allow chaos events on storage files
|
||||
enabled = (file->getFilename().find("storage-") != std::string::npos);
|
||||
}
|
||||
|
@ -78,6 +81,7 @@ public:
|
|||
Future<Void> write(void const* data, int length, int64_t offset) override {
|
||||
Arena arena;
|
||||
char* pdata = nullptr;
|
||||
unsigned corruptedBlock = 0;
|
||||
|
||||
// Check if a bit flip event was injected, if so, copy the buffer contents
|
||||
// with a random bit flipped in a new buffer and use that for the write
|
||||
|
@ -90,7 +94,10 @@ public:
|
|||
pdata = (char*)arena.allocate4kAlignedBuffer(length);
|
||||
memcpy(pdata, data, length);
|
||||
// flip a random bit in the copied buffer
|
||||
pdata[deterministicRandom()->randomInt(0, length)] ^= (1 << deterministicRandom()->randomInt(0, 8));
|
||||
auto corruptedPos = deterministicRandom()->randomInt(0, length);
|
||||
pdata[corruptedPos] ^= (1 << deterministicRandom()->randomInt(0, 8));
|
||||
// mark the block as corrupted
|
||||
corruptedBlock = offset + corruptedPos / (4 * 1024);
|
||||
|
||||
// increment the metric for bit flips
|
||||
auto res = g_network->global(INetwork::enChaosMetrics);
|
||||
|
@ -102,20 +109,27 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
double diskDelay = getDelay();
|
||||
if (diskDelay == 0.0) {
|
||||
if (pdata)
|
||||
return holdWhile(arena, file->write(pdata, length, offset));
|
||||
|
||||
return file->write(data, length, offset);
|
||||
}
|
||||
|
||||
// Wait for diskDelay before submitting the I/O
|
||||
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(delay(diskDelay), [=](Void _) -> Future<Void> {
|
||||
if (pdata)
|
||||
return holdWhile(arena, file->write(pdata, length, offset));
|
||||
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(delay(getDelay()), [=](Void _) -> Future<Void> {
|
||||
if (pdata) {
|
||||
// if (g_network->isSimulated())
|
||||
return map(holdWhile(arena, file->write(pdata, length, offset)), [corruptedBlock, this](auto res) {
|
||||
if (g_network->isSimulated()) {
|
||||
g_simulator->corruptedBlocks.template emplace(filename, corruptedBlock);
|
||||
}
|
||||
return res;
|
||||
});
|
||||
}
|
||||
|
||||
return file->write(data, length, offset);
|
||||
return map(file->write(data, length, offset), [this, pdata, offset, length](auto res) {
|
||||
if (pdata != nullptr || !g_network->isSimulated()) {
|
||||
return res;
|
||||
}
|
||||
g_simulator->corruptedBlocks.erase(
|
||||
g_simulator->corruptedBlocks.lower_bound(std::make_pair(filename, offset / 4096)),
|
||||
g_simulator->corruptedBlocks.upper_bound(std::make_pair(filename, (offset + length) / 4096)));
|
||||
return res;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
#include <random>
|
||||
#include <limits>
|
||||
|
||||
#include <boost/unordered_set.hpp>
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/Histogram.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
|
@ -508,6 +510,8 @@ public:
|
|||
|
||||
std::unordered_map<Standalone<StringRef>, PrivateKey> authKeys;
|
||||
|
||||
std::set<std::pair<std::string, unsigned>> corruptedBlocks;
|
||||
|
||||
flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); };
|
||||
void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); };
|
||||
|
||||
|
|
|
@ -149,6 +149,13 @@ struct PageChecksumCodec {
|
|||
}
|
||||
|
||||
if (!silent) {
|
||||
auto severity = SevError;
|
||||
if (g_network->isSimulated()) {
|
||||
if (g_simulator->corruptedBlocks.count(std::make_pair(filename, pageNumber - 1))) {
|
||||
// this corruption was caused by failure injection
|
||||
severity = SevWarnAlways;
|
||||
}
|
||||
}
|
||||
TraceEvent trEvent(SevError, "SQLitePageChecksumFailure");
|
||||
trEvent.error(checksum_failed())
|
||||
.detail("CodecPageSize", pageSize)
|
||||
|
@ -706,7 +713,7 @@ struct IntKeyCursor {
|
|||
db.checkError("BtreeCloseCursor", sqlite3BtreeCloseCursor(cursor));
|
||||
} catch (...) {
|
||||
}
|
||||
delete[](char*) cursor;
|
||||
delete[] (char*)cursor;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -744,7 +751,7 @@ struct RawCursor {
|
|||
} catch (...) {
|
||||
TraceEvent(SevError, "RawCursorDestructionError").log();
|
||||
}
|
||||
delete[](char*) cursor;
|
||||
delete[] (char*)cursor;
|
||||
}
|
||||
}
|
||||
void moveFirst() {
|
||||
|
|
|
@ -116,6 +116,10 @@ struct MachineAttritionWorkload : FailureInjectionWorkload {
|
|||
bool shouldInject(DeterministicRandom& random,
|
||||
const WorkloadRequest& work,
|
||||
const unsigned alreadyAdded) const override {
|
||||
if (g_network->isSimulated() && !g_simulator->extraDatabases.empty()) {
|
||||
// Remove this as soon as we track extra databases properly
|
||||
return false;
|
||||
}
|
||||
return work.useDatabase && random.random01() < 1.0 / (2.0 + alreadyAdded);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue