Merge pull request #9634 from sfc-gh-mpilman/features/negative-simulation

Framework to write negative tests
This commit is contained in:
Markus Pilman 2023-03-16 12:47:02 -07:00 committed by GitHub
commit df5b15e56c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 882 additions and 73 deletions

View File

@ -304,6 +304,10 @@ def is_restarting_test(test_file: Path):
return False
def is_negative(test_file: Path):
return test_file.parts[-2] == "negative"
def is_no_sim(test_file: Path):
return test_file.parts[-2] == "noSim"
@ -449,7 +453,7 @@ class TestRun:
command.append("--restarting")
if self.buggify_enabled:
command += ["-b", "on"]
if config.crash_on_error:
if config.crash_on_error and not is_negative(self.test_file):
command.append("--crash")
if config.long_running:
# disable simulation speedup
@ -488,6 +492,7 @@ class TestRun:
resources.join()
# we're rounding times up, otherwise we will prefer running very short tests (<1s)
self.run_time = math.ceil(resources.time())
self.summary.is_negative_test = is_negative(self.test_file)
self.summary.runtime = resources.time()
self.summary.max_rss = resources.max_rss
self.summary.was_killed = did_kill
@ -495,7 +500,7 @@ class TestRun:
self.summary.error_out = err_out
self.summary.summarize(self.temp_path, " ".join(command))
if not self.summary.ok():
if not self.summary.is_negative_test and not self.summary.ok():
self._run_rocksdb_logtool()
return self.summary.ok()

View File

@ -316,6 +316,8 @@ class Summary:
self.stderr_severity: str = '40'
self.will_restart: bool = will_restart
self.test_dir: Path | None = None
self.is_negative_test = False
self.negative_test_success = False
if uid is not None:
self.out.attributes['TestUID'] = str(uid)
@ -323,6 +325,7 @@ class Summary:
self.out.attributes['Statistics'] = stats
self.out.attributes['JoshuaSeed'] = str(config.joshua_seed)
self.out.attributes['WillRestart'] = '1' if self.will_restart else '0'
self.out.attributes['NegativeTest'] = '1' if self.is_negative_test else '0'
self.handler = ParseHandler(self.out)
self.register_handlers()
@ -371,7 +374,8 @@ class Summary:
return res
def ok(self):
return not self.error
# logical xor -- a test is successful if there was either no error or we expected errors (negative test)
return (not self.error) != self.is_negative_test
def done(self):
if config.print_coverage:
@ -529,6 +533,17 @@ class Summary:
self.handler.add_handler(('Type', 'ProgramStart'), program_start)
def negative_test_success(attrs: Dict[str, str]):
self.negative_test_success = True
child = SummaryTree(attrs['Type'])
for k, v in attrs:
if k != 'Type':
child.attributes[k] = v
self.out.append(child)
pass
self.handler.add_handler(('Type', 'NegativeTestSuccess'), negative_test_success)
def config_string(attrs: Dict[str, str]):
self.out.attributes['ConfigString'] = attrs['ConfigString']

View File

@ -793,8 +793,8 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
}
struct QuietDatabaseChecker {
ProcessEvents::Callback timeoutCallback = [this](StringRef name, StringRef msg, Error const& e) {
logFailure(name, msg, e);
ProcessEvents::Callback timeoutCallback = [this](StringRef name, std::any const& msg, Error const& e) {
logFailure(name, std::any_cast<StringRef>(msg), e);
};
double start = now();
double maxDDRunTime;

View File

@ -216,6 +216,31 @@ struct Resolver : ReferenceCounted<Resolver> {
};
} // namespace
ACTOR Future<Void> versionReady(Resolver* self, ProxyRequestsInfo* proxyInfo, Version prevVersion) {
loop {
if (self->recentStateTransactionsInfo.size() &&
proxyInfo->lastVersion <= self->recentStateTransactionsInfo.firstVersion()) {
self->neededVersion.set(std::max(self->neededVersion.get(), prevVersion));
}
// Update queue depth metric before waiting. Check if we're going to be one of the waiters or not.
int waiters = self->version.numWaiting();
if (self->version.get() < prevVersion) {
waiters++;
}
self->queueDepthDist->sampleRecordCounter(waiters);
choose {
when(wait(self->version.whenAtLeast(prevVersion))) {
// Update queue depth metric after waiting.
self->queueDepthDist->sampleRecordCounter(self->version.numWaiting());
return Void();
}
when(wait(self->checkNeededVersion.onTrigger())) {}
}
}
}
ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
ResolveTransactionBatchRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
@ -266,28 +291,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterQueueSizeCheck");
}
loop {
if (self->recentStateTransactionsInfo.size() &&
proxyInfo.lastVersion <= self->recentStateTransactionsInfo.firstVersion()) {
self->neededVersion.set(std::max(self->neededVersion.get(), req.prevVersion));
}
// Update queue depth metric before waiting. Check if we're going to be one of the waiters or not.
int waiters = self->version.numWaiting();
if (self->version.get() < req.prevVersion) {
waiters++;
}
self->queueDepthDist->sampleRecordCounter(waiters);
choose {
when(wait(self->version.whenAtLeast(req.prevVersion))) {
// Update queue depth metric after waiting.
self->queueDepthDist->sampleRecordCounter(self->version.numWaiting());
break;
}
when(wait(self->checkNeededVersion.onTrigger())) {}
}
}
wait(versionReady(self.getPtr(), &proxyInfo, req.prevVersion));
if (check_yield(TaskPriority::DefaultEndpoint)) {
wait(delay(0, TaskPriority::Low) || delay(SERVER_KNOBS->COMMIT_SLEEP_TIME)); // FIXME: Is this still right?

24
fdbserver/ResolverBug.cpp Normal file
View File

@ -0,0 +1,24 @@
/*
* ResolverBug.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/ResolverBug.h"
std::shared_ptr<ISimBug> ResolverBugID::create() const {
return std::make_shared<ResolverBug>();
}

View File

@ -816,21 +816,46 @@ struct TransactionInfo {
bool reportConflictingKeys;
};
bool ConflictBatch::ignoreTooOld() const {
return bugs && deterministicRandom()->random01() < bugs->ignoreTooOldProbability;
}
bool ConflictBatch::ignoreReadSet() const {
return bugs && deterministicRandom()->random01() < bugs->ignoreReadSetProbability;
}
bool ConflictBatch::ignoreWriteSet() const {
return bugs && deterministicRandom()->random01() < bugs->ignoreWriteSetProbability;
}
void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOldestVersion) {
const int t = transactionCount++;
Arena& arena = transactionInfo.arena();
TransactionInfo* info = new (arena) TransactionInfo;
info->reportConflictingKeys = tr.report_conflicting_keys;
bool tooOld = tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size();
if (tooOld && ignoreTooOld()) {
bugs->hit();
tooOld = false;
}
if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size()) {
if (tooOld) {
info->tooOld = true;
} else {
info->tooOld = false;
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
if (!ignoreReadSet()) {
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
} else {
bugs->hit();
}
if (!ignoreWriteSet()) {
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
} else {
bugs->hit();
}
for (int r = 0; r < tr.read_conflict_ranges.size(); r++) {
for (int r = 0; r < info->readRanges.size(); r++) {
const KeyRangeRef& range = tr.read_conflict_ranges[r];
points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first);
points.emplace_back(range.end, false, false, t, &info->readRanges[r].second);
@ -843,7 +868,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl
: nullptr,
tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr);
}
for (int r = 0; r < tr.write_conflict_ranges.size(); r++) {
for (int r = 0; r < info->writeRanges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r];
points.emplace_back(range.begin, true, true, t, &info->writeRanges[r].first);
points.emplace_back(range.end, false, true, t, &info->writeRanges[r].second);

View File

@ -26,6 +26,7 @@
#include <vector>
#include "fdbclient/CommitTransaction.h"
#include "fdbserver/ResolverBug.h"
struct ConflictSet;
ConflictSet* newConflictSet();
@ -63,6 +64,12 @@ private:
// Stores the map: a transaction -> conflicted transactions' indices
std::map<int, VectorRef<int>>* conflictingKeyRangeMap;
Arena* resolveBatchReplyArena;
std::shared_ptr<ResolverBug> bugs = SimBugInjector().get<ResolverBug>(ResolverBugID());
// bug injection
bool ignoreTooOld() const;
bool ignoreWriteSet() const;
bool ignoreReadSet() const;
void checkIntraBatchConflicts();
void combineWriteConflictRanges();

View File

@ -0,0 +1,44 @@
/*
* ResolverBug.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_RESOLVER_BUG_H
#define FDBSERVER_RESOLVER_BUG_H
#pragma once
#include "flow/SimBugInjector.h"
#include <vector>
struct ResolverBug : public ISimBug {
double ignoreTooOldProbability = 0.0;
double ignoreWriteSetProbability = 0.0;
double ignoreReadSetProbability = 0.0;
// data used to control lifetime of cycle clients
bool bugFound = false;
unsigned currentPhase = 0;
std::vector<unsigned> cycleState;
};
class ResolverBugID : public IBugIdentifier {
public:
std::shared_ptr<ISimBug> create() const override;
};
#endif // FDBSERVER_RESOLVER_BUG_H

View File

@ -0,0 +1,34 @@
/*
* StorageCorruptionBug.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef FDBSERVER_STORAGE_CORRUPTION_BUG_H
#define FDBSERVER_STORAGE_CORRUPTION_BUG_H
#include "flow/SimBugInjector.h"
class StorageCorruptionBug : public ISimBug {
public:
double corruptionProbability = 0.001;
};
class StorageCorruptionBugID : public IBugIdentifier {
public:
std::shared_ptr<ISimBug> create() const override { return std::make_shared<StorageCorruptionBug>(); }
};
#endif // FDBSERVER_STORAGE_CORRUPTION_BUG_H

View File

@ -92,6 +92,7 @@
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/StorageCorruptionBug.h"
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/Error.h"
@ -458,6 +459,7 @@ private:
struct StorageServer* data;
IKeyValueStore* storage;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
void writeMutationsBuggy(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, Optional<ReadOptions> options) {
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options));
@ -10367,6 +10369,29 @@ void StorageServerDisk::writeMutation(MutationRef mutation) {
ASSERT(false);
}
void StorageServerDisk::writeMutationsBuggy(const VectorRef<MutationRef>& mutations,
Version debugVersion,
const char* debugContext) {
auto bug = SimBugInjector().get<StorageCorruptionBug>(StorageCorruptionBugID());
if (!bug) {
writeMutations(mutations, debugVersion, debugContext);
}
int begin = 0;
while (begin < mutations.size()) {
int i;
for (i = begin; i < mutations.size(); ++i) {
if (deterministicRandom()->random01() < bug->corruptionProbability) {
bug->hit();
break;
}
}
writeMutations(mutations.slice(begin, i), debugVersion, debugContext);
// we want to drop the mutation at i (unless i == mutations.size(), in which case this will just finish the
// loop)
begin = i + 1;
}
}
void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
Version debugVersion,
const char* debugContext) {
@ -10399,7 +10424,11 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion,
ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion);
// TODO(alexmiller): Update to version tracking.
// DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
writeMutations(v.mutations, v.version, "makeVersionDurable");
if (!SimBugInjector().isEnabled()) {
writeMutations(v.mutations, v.version, "makeVersionDurable");
} else {
writeMutationsBuggy(v.mutations, v.version, "makeVersionDurable");
}
for (const auto& m : v.mutations)
bytesLeft -= mvccStorageBytes(m);
prevStorageVersion = v.version;

View File

@ -49,11 +49,11 @@ struct ConsistencyCheckWorkload : TestWorkload {
struct OnTimeout {
ConsistencyCheckWorkload& self;
explicit OnTimeout(ConsistencyCheckWorkload& self) : self(self) {}
void operator()(StringRef name, StringRef msg, Error const& e) {
void operator()(StringRef name, std::any const& msg, Error const& e) {
TraceEvent(SevError, "ConsistencyCheckFailure")
.error(e)
.detail("EventName", name)
.detail("EventMessage", msg)
.detail("EventMessage", std::any_cast<StringRef>(msg))
.log();
}
};

View File

@ -0,0 +1,171 @@
/*
* ResolverBug.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/ProcessEvents.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ResolverBug.h"
#include "fdbserver/ServerDBInfo.actor.h"
#include "flow/actorcompiler.h" // has to be last include
namespace {
struct ResolverBugWorkload : TestWorkload {
constexpr static auto NAME = "ResolverBug";
bool disableFailureInjections;
ResolverBug resolverBug;
Standalone<VectorRef<KeyValueRef>> cycleOptions;
KeyRef controlKey = "workload_control"_sr;
Promise<Void> bugFound;
ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true);
resolverBug.ignoreTooOldProbability = getOption(options, "ignoreTooOldProbability"_sr, 0.0);
resolverBug.ignoreWriteSetProbability = getOption(options, "ignoreWriteSetProbability"_sr, 0.0);
resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0);
for (auto& o : options) {
if (o.key.startsWith("cycle_"_sr)) {
KeyValueRef option;
option.key = o.key.removePrefix("cycle_"_sr);
option.value = o.value;
cycleOptions.push_back_deep(cycleOptions.arena(), option);
o.value = ""_sr;
}
}
if (clientId == 0) {
SimBugInjector().enable();
auto bug = SimBugInjector().enable<ResolverBug>(ResolverBugID());
*bug = resolverBug;
bug->cycleState.resize(clientCount, 0);
SimBugInjector().disable();
}
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
if (disableFailureInjections) {
out.insert("all");
}
}
Reference<TestWorkload> createCycle() {
WorkloadContext wcx;
wcx.clientId = clientId;
wcx.clientCount = clientCount;
wcx.ccr = ccr;
wcx.dbInfo = dbInfo;
wcx.options = cycleOptions;
wcx.sharedRandomNumber = sharedRandomNumber;
wcx.defaultTenant = defaultTenant.castTo<TenantName>();
return IWorkloadFactory::create("Cycle", wcx);
}
ACTOR static Future<Void> waitForPhase(std::shared_ptr<ResolverBug> bug, int phase) {
while (bug->currentPhase != phase) {
wait(delay(0.5));
}
return Void();
}
ACTOR static Future<Void> waitForPhaseDone(std::shared_ptr<ResolverBug> bug, int phase, int clientCount) {
while (std::count(bug->cycleState.begin(), bug->cycleState.end(), phase) != clientCount) {
wait(delay(0.5));
}
return Void();
}
struct ReportTraces {
ReportTraces() { g_traceProcessEvents = true; }
~ReportTraces() { g_traceProcessEvents = false; }
};
struct OnTestFailure {
std::shared_ptr<ResolverBug> bug;
OnTestFailure(std::shared_ptr<ResolverBug> bug) : bug(bug) {}
void operator()(StringRef, auto const& data, Error const&) {
BaseTraceEvent* trace = std::any_cast<BaseTraceEvent*>(data);
if (trace->getSeverity() == SevError) {
bug->bugFound = true;
}
}
};
ACTOR static Future<Void> driveWorkload(std::shared_ptr<ResolverBug> bug, int clientCount) {
state ReportTraces _;
state OnTestFailure onTestFailure(bug);
state ProcessEvents::Event ev("TraceEvent::TestFailure"_sr, onTestFailure);
loop {
bug->currentPhase = 1;
wait(waitForPhaseDone(bug, 1, clientCount));
SimBugInjector().enable();
bug->currentPhase = 2;
wait(waitForPhaseDone(bug, 2, clientCount));
SimBugInjector().disable();
bug->currentPhase = 3;
wait(waitForPhaseDone(bug, 3, clientCount));
}
}
ACTOR static Future<Void> _start(ResolverBugWorkload* self, Database cx) {
state Reference<TestWorkload> cycle;
state std::shared_ptr<ResolverBug> bug = SimBugInjector().get<ResolverBug>(ResolverBugID(), true);
loop {
wait(waitForPhase(bug, 1));
cycle = self->createCycle();
wait(cycle->setup(cx));
bug->cycleState[self->clientId] = 1;
wait(waitForPhase(bug, 2));
wait(cycle->start(cx));
bug->cycleState[self->clientId] = 2;
wait(waitForPhase(bug, 3));
wait(success(cycle->check(cx)));
bug->cycleState[self->clientId] = 3;
}
}
ACTOR static Future<Void> onBug(std::shared_ptr<ResolverBug> bug) {
loop {
if (bug->bugFound) {
TraceEvent("NegativeTestSuccess").log();
return Void();
}
wait(delay(0.5));
}
}
Future<Void> start(const Database& cx) override {
std::vector<Future<Void>> futures;
auto bug = SimBugInjector().get<ResolverBug>(ResolverBugID(), true);
if (clientId == 0) {
futures.push_back(driveWorkload(bug, clientCount));
}
futures.push_back(_start(this, cx->clone()));
return onBug(bug) || waitForAll(futures);
}
Future<bool> check(Database const& cx) override { return true; };
private:
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<ResolverBugWorkload> workloadFactory;
} // namespace

View File

@ -0,0 +1,78 @@
/*
* StorageCorruptionBug.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/StorageCorruptionBug.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/ProcessEvents.h"
#include "flow/actorcompiler.h" // has to be last include
namespace {
struct StorageCorruptionWorkload : TestWorkload {
constexpr static auto NAME = "StorageCorruption";
using Self = StorageCorruptionWorkload;
std::shared_ptr<StorageCorruptionBug> bug;
SimBugInjector bugInjector;
double testDuration;
StorageCorruptionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
bugInjector.enable();
bug = bugInjector.enable<StorageCorruptionBug>(StorageCorruptionBugID());
bugInjector.disable();
bug->corruptionProbability = getOption(options, "corruptionProbability"_sr, 0.001);
testDuration = getOption(options, "testDuration"_sr, 60.0);
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
ACTOR static Future<Void> _start(Self* self, Database cx) {
wait(success(setDDMode(cx, 0)));
self->bugInjector.enable();
wait(delay(self->testDuration));
self->bug->corruptionProbability = 0.0;
TraceEvent("CorruptionInjections").detail("NumCorruptions", self->bug->numHits()).log();
self->bugInjector.disable();
ProcessEvents::uncancellableEvent("ConsistencyCheckFailure"_sr,
[](StringRef, std::any const& data, Error const&) {
if (std::any_cast<BaseTraceEvent*>(data)->getSeverity() == SevError) {
TraceEvent("NegativeTestSuccess");
}
});
wait(success(setDDMode(cx, 1)));
return Void();
}
Future<Void> start(Database const& cx) override {
if (clientId != 0) {
return Void();
}
return _start(this, cx->clone());
}
Future<bool> check(Database const& cx) override { return true; }
private:
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<StorageCorruptionWorkload> factory;
} // namespace

View File

@ -42,27 +42,44 @@ struct EventImpl {
struct ProcessEventsImpl {
struct Triggering {
bool& value;
explicit Triggering(bool& value) : value(value) {
ASSERT(!value);
value = true;
unsigned& value;
ProcessEventsImpl& processEvents;
explicit Triggering(unsigned& value, ProcessEventsImpl& processEvents)
: value(value), processEvents(processEvents) {
++value;
}
~Triggering() {
if (--value == 0) {
// merge modifications back into the event map
for (auto const& p : processEvents.toRemove) {
for (auto const& n : p.second) {
processEvents.events[n].erase(p.first);
}
}
processEvents.toRemove.clear();
for (auto const& p : processEvents.toInsert) {
processEvents.events[p.first].insert(p.second.begin(), p.second.end());
}
processEvents.toInsert.clear();
}
}
~Triggering() { value = false; }
};
using EventMap = std::unordered_map<StringRef, std::unordered_map<EventImpl::Id, EventImpl*>>;
bool triggering = false;
unsigned triggering = 0;
EventMap events;
std::map<EventImpl::Id, std::vector<StringRef>> toRemove;
EventMap toInsert;
void trigger(StringRef name, StringRef msg, Error const& e) {
Triggering _(triggering);
void trigger(StringRef name, std::any const& data, Error const& e) {
Triggering _(triggering, *this);
auto iter = events.find(name);
// strictly speaking this isn't a bug, but having callbacks that aren't caught
// by anyone could mean that something was misspelled. Therefore, the safe thing
// to do is to abort.
ASSERT(iter != events.end());
std::unordered_map<EventImpl::Id, EventImpl*> callbacks = iter->second;
if (iter == events.end()) {
return;
}
std::unordered_map<EventImpl::Id, EventImpl*>& callbacks = iter->second;
// after we collected all unique callbacks we can call each
for (auto const& c : callbacks) {
try {
@ -70,24 +87,13 @@ struct ProcessEventsImpl {
// which case attempting to call it will be undefined
// behavior.
if (toRemove.count(c.first) == 0) {
c.second->callback(name, msg, e);
c.second->callback(name, data, e);
}
} catch (...) {
// callbacks are not allowed to throw
UNSTOPPABLE_ASSERT(false);
}
}
// merge modifications back into the event map
for (auto const& p : toRemove) {
for (auto const& n : p.second) {
events[n].erase(p.first);
}
}
toRemove.clear();
for (auto const& p : toInsert) {
events[p.first].insert(p.second.begin(), p.second.end());
}
toInsert.clear();
}
void add(StringRef const& name, EventImpl* event) {
@ -103,7 +109,30 @@ struct ProcessEventsImpl {
void remove(std::vector<StringRef> names, EventImpl::Id id) {
if (triggering) {
toRemove.emplace(id, std::move(names));
// it's possible that the event hasn't been added yet
bool inInsertMap = false;
for (auto const& name : names) {
auto it = toInsert.find(name);
if (it == toInsert.end()) {
// either all are in the insert map or none
ASSERT(!inInsertMap);
break;
}
auto it2 = it->second.find(id);
if (it2 == it->second.end()) {
// either all are in the insert map or none
ASSERT(!inInsertMap);
break;
}
inInsertMap = true;
it->second.erase(it2);
if (it->second.empty()) {
toInsert.erase(it);
}
}
if (!inInsertMap) {
toRemove.emplace(id, std::move(names));
}
} else {
for (auto const& name : names) {
events[name].erase(id);
@ -126,8 +155,12 @@ void EventImpl::removeEvent() {
namespace ProcessEvents {
void trigger(StringRef name, StringRef msg, Error const& e) {
processEventsImpl.trigger(name, msg, e);
void trigger(StringRef name, std::any const& data, Error const& e) {
processEventsImpl.trigger(name, data, e);
}
void uncancellableEvent(StringRef name, Callback callback) {
new EventImpl({ name }, std::move(callback));
}
Event::Event(StringRef name, Callback callback) {
@ -146,7 +179,7 @@ TEST_CASE("/flow/ProcessEvents") {
{
// Basic test
unsigned numHits = 0;
Event _("basic"_sr, [&numHits](StringRef n, StringRef, Error const& e) {
Event _("basic"_sr, [&numHits](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "basic"_sr);
ASSERT_EQ(e.code(), error_code_success);
++numHits;
@ -161,18 +194,18 @@ TEST_CASE("/flow/ProcessEvents") {
std::vector<std::shared_ptr<Event>> createdEvents;
std::vector<unsigned> numHits;
numHits.reserve(2);
Event _("create"_sr, [&](StringRef n, StringRef, Error const& e) {
Event _("create"_sr, [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "create"_sr);
ASSERT_EQ(e.code(), error_code_success);
++hits1;
numHits.push_back(0);
createdEvents.push_back(
std::make_shared<Event>(std::vector<StringRef>{ "create"_sr, "secondaries"_sr },
[&numHits, idx = numHits.size() - 1](StringRef n, StringRef, Error const& e) {
ASSERT(n == "create"_sr || n == "secondaries");
ASSERT_EQ(e.code(), error_code_success);
++numHits[idx];
}));
createdEvents.push_back(std::make_shared<Event>(
std::vector<StringRef>{ "create"_sr, "secondaries"_sr },
[&numHits, idx = numHits.size() - 1](StringRef n, std::any const&, Error const& e) {
ASSERT(n == "create"_sr || n == "secondaries");
ASSERT_EQ(e.code(), error_code_success);
++numHits[idx];
}));
});
trigger("create"_sr, ""_sr, success());
ASSERT_EQ(hits1, 1);
@ -197,7 +230,7 @@ TEST_CASE("/flow/ProcessEvents") {
// Remove self
unsigned called_self_delete = 0, called_non_delete = 0;
std::unique_ptr<Event> ev;
auto callback_del = [&](StringRef n, StringRef, Error const& e) {
auto callback_del = [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "deletion"_sr);
ASSERT_EQ(e.code(), error_code_success);
++called_self_delete;
@ -205,7 +238,7 @@ TEST_CASE("/flow/ProcessEvents") {
ev.reset();
};
ev.reset(new Event("deletion"_sr, callback_del));
Event _("deletion"_sr, [&](StringRef n, StringRef, Error const& e) {
Event _("deletion"_sr, [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "deletion"_sr);
ASSERT_EQ(e.code(), error_code_success);
++called_non_delete;
@ -215,6 +248,21 @@ TEST_CASE("/flow/ProcessEvents") {
ASSERT_EQ(called_self_delete, 1);
ASSERT_EQ(called_non_delete, 2);
}
{
// Reentrant safe
Event ev("reentrant"_sr, [](StringRef, std::any const& data, Error const&) {
// call depth of 5
auto v = std::any_cast<int>(data);
if (v < 5) {
Event doNotCall("reentrant"_sr, [](StringRef, std::any const&, Error const&) {
// should never be called
ASSERT(false);
});
trigger("reentrant"_sr, v + 1, success());
}
});
trigger("reentrant"_sr, 0, success());
}
return Void();
}

116
flow/SimBugInjector.cpp Normal file
View File

@ -0,0 +1,116 @@
/*
* SimBugInjector.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/SimBugInjector.h"
#include "flow/network.h"
#include <typeindex>
#include <boost/core/demangle.hpp>
namespace {
struct SimBugInjectorImpl {
bool isEnabled = true;
std::unordered_map<std::type_index, std::shared_ptr<ISimBug>> bugs;
};
struct ISimBugImpl {
unsigned numHits = 0;
static ISimBugImpl* get(void* self) { return reinterpret_cast<ISimBugImpl*>(self); }
};
SimBugInjectorImpl* simBugInjector = nullptr;
} // namespace
ISimBug::ISimBug() : impl(new ISimBugImpl()) {}
ISimBug::~ISimBug() {
delete ISimBugImpl::get(impl);
}
std::string ISimBug::name() const {
auto const& typeInfo = typeid(*this);
return boost::core::demangle(typeInfo.name());
}
void ISimBug::hit() {
++ISimBugImpl::get(impl)->numHits;
TraceEvent(SevWarnAlways, "BugInjected").detail("Name", name()).detail("NumHits", numHits()).log();
this->onHit();
}
void ISimBug::onHit() {}
unsigned ISimBug::numHits() const {
return ISimBugImpl::get(impl)->numHits;
}
IBugIdentifier::~IBugIdentifier() {}
bool SimBugInjector::isEnabled() const {
return simBugInjector != nullptr && simBugInjector->isEnabled;
}
void SimBugInjector::enable() {
// SimBugInjector is very dangerous. It will corrupt your data! Therefore, using it outside of simulation is
// not allowed
UNSTOPPABLE_ASSERT(g_network->isSimulated());
if (simBugInjector == nullptr) {
simBugInjector = new SimBugInjectorImpl();
}
simBugInjector->isEnabled = true;
}
void SimBugInjector::disable() {
if (simBugInjector) {
simBugInjector->isEnabled = false;
}
}
void SimBugInjector::reset() {
if (simBugInjector) {
delete simBugInjector;
}
}
std::shared_ptr<ISimBug> SimBugInjector::getImpl(const IBugIdentifier& id, bool getDisabled /* = false */) const {
if (!simBugInjector) {
return {};
}
if (!getDisabled && !isEnabled()) {
return {};
}
auto it = simBugInjector->bugs.find(std::type_index(typeid(id)));
if (it == simBugInjector->bugs.end()) {
return {};
} else {
return it->second;
}
}
std::shared_ptr<ISimBug> SimBugInjector::enableImpl(const IBugIdentifier& id) {
UNSTOPPABLE_ASSERT(isEnabled());
auto& res = simBugInjector->bugs[std::type_index(typeid(id))];
if (!res) {
res = id.create();
}
return res;
}

View File

@ -42,6 +42,7 @@
#include "flow/TDMetric.actor.h"
#include "flow/MetricSample.h"
#include "flow/network.h"
#include "flow/SimBugInjector.h"
#ifdef _WIN32
#include <windows.h>
@ -60,6 +61,7 @@
thread_local int g_allocation_tracing_disabled = 1;
unsigned tracedLines = 0;
thread_local int failedLineOverflow = 0;
bool g_traceProcessEvents = false;
ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {}
@ -1330,6 +1332,10 @@ void BaseTraceEvent::log() {
if (isNetworkThread()) {
TraceEvent::eventCounts[severity / 10]++;
}
if (g_traceProcessEvents) {
auto name = fmt::format("TraceEvent::{}", type);
ProcessEvents::trigger(StringRef(name), this, success());
}
g_traceLog.writeEvent(fields, trackingKey, severity > SevWarnAlways);
if (g_traceLog.isOpen()) {

View File

@ -21,13 +21,15 @@
#ifndef FLOW_PROCESS_EVENTS_H
#define FLOW_PROCESS_EVENTS_H
#include <functional>
#include <any>
#include "flow/flow.h"
namespace ProcessEvents {
// A callback is never allowed to throw. Since std::function can't
// take noexcept signatures, this is enforced at runtime
using Callback = std::function<void(StringRef, StringRef, Error const&)>;
using Callback = std::function<void(StringRef, std::any const&, Error const&)>;
class Event : NonCopyable {
void* impl;
@ -38,7 +40,8 @@ public:
~Event();
};
void trigger(StringRef name, StringRef msg, Error const& e);
void uncancellableEvent(StringRef name, Callback callback);
void trigger(StringRef name, std::any const& data, Error const& e);
} // namespace ProcessEvents

View File

@ -0,0 +1,150 @@
/*
* SimBugInjector.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_SIM_BUG_INJECTOR_H
#define FLOW_SIM_BUG_INJECTOR_H
#pragma once
#include <cstddef>
#include <memory>
#include <string>
/*
* This file provides a general framework to control how bugs should be injected into FDB. This must not be confused
* with Buggify. Buggify is intended to inject faults and set parameters in ways that FDB has to be able to handle
* correctly. This framework is meant to be used to inject actual bugs into FDB. The use-case for this is to implement
* negative tests. This way we can verify that tests actually catch the bugs that we're expecting them to find.
*/
/**
* A ISimBug is a logical, polymorphic, representation of a bug and is the main means of communication for code across
* multiple places within the codebase. A user isn't supposed to create instances of ISimBug themselves but instead they
* should implement a corresponding IBugIdentifier subclass which can then be used for indexing through the
* SimBugInjector.
*/
class ISimBug : std::enable_shared_from_this<ISimBug> {
void* impl;
virtual void onHit();
public:
ISimBug();
virtual ~ISimBug();
/**
* The name of the bug. By default this will return the class name (using typeid and boost::core::demangle). This is
* supposed to be a human-readable string which can be used to identify the bug when it appears in traces.
*
* @return A human readable string for this bug.
*/
virtual std::string name() const;
/**
* Should be called every time this bug is hit. This method can't be overridden. However, it will call `onHit` which
* can be overriden by a child.
*/
void hit();
/**
* @return Number of times this bug has been hit (since last call to `SimBugInjector::reset`
*/
unsigned numHits() const;
};
/*
* Each SimBug class should have a corresponding BugIdentifier
*/
class IBugIdentifier {
public:
virtual ~IBugIdentifier();
/**
* Creates an instance of the SimBug associated with this
* identifier. The reason we use shared_ptr instead of Reference here
* is so we (1) can use weak references and, more importantly,
* (2) shared_ptr has much better support for polymorphic types
*/
virtual std::shared_ptr<ISimBug> create() const = 0;
};
/*
* SimBugInjector is a wrapper for a singleton and can therefore be instantiated cheaply as often as one wants.
*/
class SimBugInjector {
public:
explicit SimBugInjector() {}
/**
* Globally enable SimBugInjector
*
* Precondition: g_network->isSimulated()
*/
void enable();
/**
* Globally disable SimBugInjector. If enable is called later, all current state is restored
*
* Precondition: true
*/
void disable();
/**
* Globally disable SimBugInjector. Unlike disable, this will also remove all existing state
*
* Precondition: true
*/
void reset();
/**
* Check whether SimBugInjector is globally enabled
*/
bool isEnabled() const;
/**
* This method can be used to check whether a given bug has been enabled and then fetch the corresponding
* ISimBug object.
*
* @param id The IBugIdentifier corresponding to this bug
* @return A valid shared_ptr, if the bug has been enabled, nullptr otherwise
* @post enabled(bug(id)) -> result is valid else result is nullptr
*/
template <class T>
std::shared_ptr<T> get(IBugIdentifier const& id, bool getDisabled = false) {
auto res = getImpl(id, getDisabled);
if (!res) {
return {};
}
return std::dynamic_pointer_cast<T>(res);
}
std::shared_ptr<ISimBug> getImpl(IBugIdentifier const& id, bool getDisabled = false) const;
/**
* Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a
* bug to call this method if bug injection isn't turned on.
*
* @param id
* @return the SimBug instance
* @pre isEnabled()
* @post result.get() != nullptr
*/
template <class T>
std::shared_ptr<T> enable(IBugIdentifier const& id) {
auto res = enableImpl(id);
if (!res) {
return {};
}
return std::dynamic_pointer_cast<T>(res);
}
std::shared_ptr<ISimBug> enableImpl(IBugIdentifier const& id);
};
#endif // FLOW_SIM_BUG_INJECTOR_H

View File

@ -51,6 +51,7 @@ inline static bool TRACE_SAMPLE() {
}
extern thread_local int g_allocation_tracing_disabled;
extern bool g_traceProcessEvents;
// Each major level of severity has 10 levels of minor levels, which are not all
// used. when the numbers of severity events in each level are counted, they are
@ -413,6 +414,7 @@ public:
std::unique_ptr<DynamicEventMetric> tmpEventMetric; // This just just a place to store fields
const TraceEventFields& getFields() const { return fields; }
Severity getSeverity() const { return severity; }
template <class Object>
void moveTo(Object& obj) {

View File

@ -449,6 +449,10 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.toml)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml)
add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml)
add_fdb_test(TEST_FILES negative/ResolverIgnoreReads.toml)
add_fdb_test(TEST_FILES negative/ResolverIgnoreWrites.toml)
add_fdb_test(TEST_FILES negative/StorageCorruption.toml)
add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE)
# Note that status tests are not deterministic.

View File

@ -0,0 +1,6 @@
[[test]]
testTitle = "ResolverIgnoreReads"
[[test.workload]]
testName = "ResolverBug"
ignoreReadSetProbability = 0.01

View File

@ -0,0 +1,11 @@
[[knobs]]
enable_version_vector = false
max_read_transaction_life_versions = 1000000
max_write_transaction_life_versions = 1000000
[[test]]
testTitle = "ResolverIgnoreTooOld"
[[test.workload]]
testName = "ResolverBug"
ignoreTooOldProbability = 0.2

View File

@ -0,0 +1,6 @@
[[test]]
testTitle = "ResolverIgnoreReads"
[[test.workload]]
testName = "ResolverBug"
ignoreWriteSetProbability = 0.01

View File

@ -0,0 +1,21 @@
[[test]]
testTitle = "StorageCorruption"
[[test.workload]]
testName = "StorageCorruption"
corruptionProbability = 0.001
testDuration = 60.0
[[test.workload]]
testName = 'ReadWrite'
testDuration = 60.0
transactionsPerSecond = 200
writesPerTransactionA = 5
readsPerTransactionA = 1
writesPerTransactionB = 10
readsPerTransactionB = 1
alpha = 0.5
nodeCount = 10000
valueBytes = 128
discardEdgeMeasurements = false
warmingDelay = 10.0