Add framework for writing negative simulation tests
This commit is contained in:
parent
aa09baadab
commit
7a108a2768
|
@ -304,6 +304,10 @@ def is_restarting_test(test_file: Path):
|
|||
return False
|
||||
|
||||
|
||||
def is_negative(test_file: Path):
|
||||
return test_file.parts[-2] == "negative"
|
||||
|
||||
|
||||
def is_no_sim(test_file: Path):
|
||||
return test_file.parts[-2] == "noSim"
|
||||
|
||||
|
@ -449,7 +453,7 @@ class TestRun:
|
|||
command.append("--restarting")
|
||||
if self.buggify_enabled:
|
||||
command += ["-b", "on"]
|
||||
if config.crash_on_error:
|
||||
if config.crash_on_error and not is_negative(self.test_file):
|
||||
command.append("--crash")
|
||||
if config.long_running:
|
||||
# disable simulation speedup
|
||||
|
@ -488,6 +492,7 @@ class TestRun:
|
|||
resources.join()
|
||||
# we're rounding times up, otherwise we will prefer running very short tests (<1s)
|
||||
self.run_time = math.ceil(resources.time())
|
||||
self.summary.is_negative_test = is_negative(self.test_file)
|
||||
self.summary.runtime = resources.time()
|
||||
self.summary.max_rss = resources.max_rss
|
||||
self.summary.was_killed = did_kill
|
||||
|
@ -495,7 +500,7 @@ class TestRun:
|
|||
self.summary.error_out = err_out
|
||||
self.summary.summarize(self.temp_path, " ".join(command))
|
||||
|
||||
if not self.summary.ok():
|
||||
if not self.summary.is_negative_test and not self.summary.ok():
|
||||
self._run_rocksdb_logtool()
|
||||
return self.summary.ok()
|
||||
|
||||
|
|
|
@ -316,6 +316,8 @@ class Summary:
|
|||
self.stderr_severity: str = '40'
|
||||
self.will_restart: bool = will_restart
|
||||
self.test_dir: Path | None = None
|
||||
self.is_negative_test = False
|
||||
self.negative_test_success = False
|
||||
|
||||
if uid is not None:
|
||||
self.out.attributes['TestUID'] = str(uid)
|
||||
|
@ -323,6 +325,7 @@ class Summary:
|
|||
self.out.attributes['Statistics'] = stats
|
||||
self.out.attributes['JoshuaSeed'] = str(config.joshua_seed)
|
||||
self.out.attributes['WillRestart'] = '1' if self.will_restart else '0'
|
||||
self.out.attributes['NegativeTest'] = '1' if self.is_negative_test else '0'
|
||||
|
||||
self.handler = ParseHandler(self.out)
|
||||
self.register_handlers()
|
||||
|
@ -371,7 +374,8 @@ class Summary:
|
|||
return res
|
||||
|
||||
def ok(self):
|
||||
return not self.error
|
||||
# logical xor -- a test is successful if there was either no error or we expected errors (negative test)
|
||||
return (not self.error) != self.is_negative_test
|
||||
|
||||
def done(self):
|
||||
if config.print_coverage:
|
||||
|
@ -529,6 +533,17 @@ class Summary:
|
|||
|
||||
self.handler.add_handler(('Type', 'ProgramStart'), program_start)
|
||||
|
||||
def negative_test_success(attrs: Dict[str, str]):
|
||||
self.negative_test_success = True
|
||||
child = SummaryTree(attrs['Type'])
|
||||
for k, v in attrs:
|
||||
if k != 'Type':
|
||||
child.attributes[k] = v
|
||||
self.out.append(child)
|
||||
pass
|
||||
|
||||
self.handler.add_handler(('Type', 'NegativeTestSuccess'), negative_test_success)
|
||||
|
||||
def config_string(attrs: Dict[str, str]):
|
||||
self.out.attributes['ConfigString'] = attrs['ConfigString']
|
||||
|
||||
|
|
|
@ -793,8 +793,8 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
|
|||
}
|
||||
|
||||
struct QuietDatabaseChecker {
|
||||
ProcessEvents::Callback timeoutCallback = [this](StringRef name, StringRef msg, Error const& e) {
|
||||
logFailure(name, msg, e);
|
||||
ProcessEvents::Callback timeoutCallback = [this](StringRef name, std::any const& msg, Error const& e) {
|
||||
logFailure(name, std::any_cast<StringRef>(msg), e);
|
||||
};
|
||||
double start = now();
|
||||
double maxDDRunTime;
|
||||
|
|
|
@ -216,6 +216,31 @@ struct Resolver : ReferenceCounted<Resolver> {
|
|||
};
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> versionReady(Resolver* self, ProxyRequestsInfo* proxyInfo, Version prevVersion) {
|
||||
loop {
|
||||
if (self->recentStateTransactionsInfo.size() &&
|
||||
proxyInfo->lastVersion <= self->recentStateTransactionsInfo.firstVersion()) {
|
||||
self->neededVersion.set(std::max(self->neededVersion.get(), prevVersion));
|
||||
}
|
||||
|
||||
// Update queue depth metric before waiting. Check if we're going to be one of the waiters or not.
|
||||
int waiters = self->version.numWaiting();
|
||||
if (self->version.get() < prevVersion) {
|
||||
waiters++;
|
||||
}
|
||||
self->queueDepthDist->sampleRecordCounter(waiters);
|
||||
|
||||
choose {
|
||||
when(wait(self->version.whenAtLeast(prevVersion))) {
|
||||
// Update queue depth metric after waiting.
|
||||
self->queueDepthDist->sampleRecordCounter(self->version.numWaiting());
|
||||
return Void();
|
||||
}
|
||||
when(wait(self->checkNeededVersion.onTrigger())) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
||||
ResolveTransactionBatchRequest req,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
|
@ -267,28 +292,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterQueueSizeCheck");
|
||||
}
|
||||
|
||||
loop {
|
||||
if (self->recentStateTransactionsInfo.size() &&
|
||||
proxyInfo.lastVersion <= self->recentStateTransactionsInfo.firstVersion()) {
|
||||
self->neededVersion.set(std::max(self->neededVersion.get(), req.prevVersion));
|
||||
}
|
||||
|
||||
// Update queue depth metric before waiting. Check if we're going to be one of the waiters or not.
|
||||
int waiters = self->version.numWaiting();
|
||||
if (self->version.get() < req.prevVersion) {
|
||||
waiters++;
|
||||
}
|
||||
self->queueDepthDist->sampleRecordCounter(waiters);
|
||||
|
||||
choose {
|
||||
when(wait(self->version.whenAtLeast(req.prevVersion))) {
|
||||
// Update queue depth metric after waiting.
|
||||
self->queueDepthDist->sampleRecordCounter(self->version.numWaiting());
|
||||
break;
|
||||
}
|
||||
when(wait(self->checkNeededVersion.onTrigger())) {}
|
||||
}
|
||||
}
|
||||
wait(versionReady(self.getPtr(), &proxyInfo, req.prevVersion));
|
||||
|
||||
if (check_yield(TaskPriority::DefaultEndpoint)) {
|
||||
wait(delay(0, TaskPriority::Low) || delay(SERVER_KNOBS->COMMIT_SLEEP_TIME)); // FIXME: Is this still right?
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* ResolverBug.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "fdbserver/ResolverBug.h"
|
||||
|
||||
std::shared_ptr<ISimBug> ResolverBugID::create() const {
|
||||
return std::make_shared<ResolverBug>();
|
||||
}
|
|
@ -816,6 +816,18 @@ struct TransactionInfo {
|
|||
bool reportConflictingKeys;
|
||||
};
|
||||
|
||||
bool ConflictBatch::ignoreTooOld() const {
|
||||
return bugs && deterministicRandom()->random01() < bugs->ignoreTooOldProbability;
|
||||
}
|
||||
|
||||
bool ConflictBatch::ignoreReadSet() const {
|
||||
return bugs && deterministicRandom()->random01() < bugs->ignoreReadSetProbability;
|
||||
}
|
||||
|
||||
bool ConflictBatch::ignoreWriteSet() const {
|
||||
return bugs && deterministicRandom()->random01() < bugs->ignoreWriteSetProbability;
|
||||
}
|
||||
|
||||
void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOldestVersion) {
|
||||
const int t = transactionCount++;
|
||||
|
||||
|
@ -823,14 +835,18 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl
|
|||
TransactionInfo* info = new (arena) TransactionInfo;
|
||||
info->reportConflictingKeys = tr.report_conflicting_keys;
|
||||
|
||||
if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size()) {
|
||||
if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size() && !ignoreTooOld()) {
|
||||
info->tooOld = true;
|
||||
} else {
|
||||
info->tooOld = false;
|
||||
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
|
||||
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
|
||||
if (!ignoreReadSet()) {
|
||||
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
|
||||
}
|
||||
if (!ignoreWriteSet()) {
|
||||
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
|
||||
}
|
||||
|
||||
for (int r = 0; r < tr.read_conflict_ranges.size(); r++) {
|
||||
for (int r = 0; r < info->readRanges.size(); r++) {
|
||||
const KeyRangeRef& range = tr.read_conflict_ranges[r];
|
||||
points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first);
|
||||
points.emplace_back(range.end, false, false, t, &info->readRanges[r].second);
|
||||
|
@ -843,7 +859,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl
|
|||
: nullptr,
|
||||
tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr);
|
||||
}
|
||||
for (int r = 0; r < tr.write_conflict_ranges.size(); r++) {
|
||||
for (int r = 0; r < info->writeRanges.size(); r++) {
|
||||
const KeyRangeRef& range = tr.write_conflict_ranges[r];
|
||||
points.emplace_back(range.begin, true, true, t, &info->writeRanges[r].first);
|
||||
points.emplace_back(range.end, false, true, t, &info->writeRanges[r].second);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbserver/ResolverBug.h"
|
||||
|
||||
struct ConflictSet;
|
||||
ConflictSet* newConflictSet();
|
||||
|
@ -63,6 +64,12 @@ private:
|
|||
// Stores the map: a transaction -> conflicted transactions' indices
|
||||
std::map<int, VectorRef<int>>* conflictingKeyRangeMap;
|
||||
Arena* resolveBatchReplyArena;
|
||||
std::shared_ptr<ResolverBug> bugs = SimBugInjector().get<ResolverBug>(ResolverBugID());
|
||||
|
||||
// bug injection
|
||||
bool ignoreTooOld() const;
|
||||
bool ignoreWriteSet() const;
|
||||
bool ignoreReadSet() const;
|
||||
|
||||
void checkIntraBatchConflicts();
|
||||
void combineWriteConflictRanges();
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* ResolverBug.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_RESOLVER_BUG_H
|
||||
#define FDBSERVER_RESOLVER_BUG_H
|
||||
#pragma once
|
||||
|
||||
#include "flow/SimBugInjector.h"
|
||||
#include <vector>
|
||||
|
||||
struct ResolverBug : public ISimBug {
|
||||
double ignoreTooOldProbability = 0.0;
|
||||
double ignoreWriteSetProbability = 0.0;
|
||||
double ignoreReadSetProbability = 0.0;
|
||||
|
||||
// data used to control lifetime of cycle clients
|
||||
bool bugFound = false;
|
||||
unsigned currentPhase = 0;
|
||||
std::vector<unsigned> cycleState;
|
||||
};
|
||||
|
||||
class ResolverBugID : public IBugIdentifier {
|
||||
public:
|
||||
std::shared_ptr<ISimBug> create() const override;
|
||||
};
|
||||
|
||||
#endif // FDBSERVER_RESOLVER_BUG_H
|
|
@ -49,11 +49,11 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
|||
struct OnTimeout {
|
||||
ConsistencyCheckWorkload& self;
|
||||
explicit OnTimeout(ConsistencyCheckWorkload& self) : self(self) {}
|
||||
void operator()(StringRef name, StringRef msg, Error const& e) {
|
||||
void operator()(StringRef name, std::any const& msg, Error const& e) {
|
||||
TraceEvent(SevError, "ConsistencyCheckFailure")
|
||||
.error(e)
|
||||
.detail("EventName", name)
|
||||
.detail("EventMessage", msg)
|
||||
.detail("EventMessage", std::any_cast<StringRef>(msg))
|
||||
.log();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* ResolverBug.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "flow/ProcessEvents.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/ResolverBug.h"
|
||||
#include "fdbserver/ServerDBInfo.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace {
|
||||
|
||||
struct ResolverBugWorkload : TestWorkload {
|
||||
constexpr static auto NAME = "ResolverBug";
|
||||
bool disableFailureInjections;
|
||||
ResolverBug resolverBug;
|
||||
Standalone<VectorRef<KeyValueRef>> cycleOptions;
|
||||
KeyRef controlKey = "workload_control"_sr;
|
||||
|
||||
ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true);
|
||||
resolverBug.ignoreTooOldProbability = getOption(options, "ignoreTooOldProbability"_sr, 0.0);
|
||||
resolverBug.ignoreWriteSetProbability = getOption(options, "ignoreWriteSetProbability"_sr, 0.0);
|
||||
resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0);
|
||||
|
||||
for (auto& o : options) {
|
||||
bool hadPrefix = false;
|
||||
if (o.key.startsWith("cycle_"_sr)) {
|
||||
KeyValueRef option;
|
||||
option.key = o.key.removePrefix("cycle_"_sr);
|
||||
option.value = o.value;
|
||||
cycleOptions.push_back_deep(cycleOptions.arena(), option);
|
||||
o.value = ""_sr;
|
||||
}
|
||||
}
|
||||
|
||||
if (clientId == 0) {
|
||||
SimBugInjector().enable();
|
||||
auto bug = SimBugInjector().enable<ResolverBug>(ResolverBugID());
|
||||
*bug = resolverBug;
|
||||
bug->cycleState.resize(clientCount, 0);
|
||||
SimBugInjector().disable();
|
||||
}
|
||||
}
|
||||
|
||||
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
|
||||
if (disableFailureInjections) {
|
||||
out.insert("all");
|
||||
}
|
||||
}
|
||||
|
||||
Reference<TestWorkload> createCycle() {
|
||||
WorkloadContext wcx;
|
||||
wcx.clientId = clientId;
|
||||
wcx.clientCount = clientCount;
|
||||
wcx.ccr = ccr;
|
||||
wcx.dbInfo = dbInfo;
|
||||
wcx.options = cycleOptions;
|
||||
wcx.sharedRandomNumber = sharedRandomNumber;
|
||||
wcx.defaultTenant = defaultTenant.castTo<TenantName>();
|
||||
return IWorkloadFactory::create("Cycle", wcx);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> waitForPhase(std::shared_ptr<ResolverBug> bug, int phase) {
|
||||
while (bug->currentPhase != phase) {
|
||||
wait(delay(0.5));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> waitForPhaseDone(std::shared_ptr<ResolverBug> bug, int phase, int clientCount) {
|
||||
while (std::count(bug->cycleState.begin(), bug->cycleState.end(), phase) != clientCount) {
|
||||
wait(delay(0.5));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
struct ReportTraces {
|
||||
ReportTraces() { g_traceProcessEvents = true; }
|
||||
~ReportTraces() { g_traceProcessEvents = false; }
|
||||
};
|
||||
|
||||
struct OnTestFailure {
|
||||
std::shared_ptr<ResolverBug> bug;
|
||||
OnTestFailure(std::shared_ptr<ResolverBug> bug) : bug(bug) {}
|
||||
|
||||
void operator()(StringRef, auto const& data, Error const&) {
|
||||
BaseTraceEvent* trace = std::any_cast<BaseTraceEvent*>(data);
|
||||
if (trace->getSeverity() == SevError) {
|
||||
bug->bugFound = true;
|
||||
TraceEvent("NegativeTestSuccess").log();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR static Future<Void> driveWorkload(std::shared_ptr<ResolverBug> bug, int clientCount) {
|
||||
state ReportTraces _;
|
||||
state OnTestFailure onTestFailure(bug);
|
||||
state ProcessEvents::Event ev("TraceEvent::TestFailure"_sr, onTestFailure);
|
||||
loop {
|
||||
bug->currentPhase = 1;
|
||||
wait(waitForPhaseDone(bug, 1, clientCount));
|
||||
SimBugInjector().enable();
|
||||
bug->currentPhase = 2;
|
||||
wait(waitForPhaseDone(bug, 2, clientCount));
|
||||
SimBugInjector().disable();
|
||||
bug->currentPhase = 3;
|
||||
wait(waitForPhaseDone(bug, 3, clientCount));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _start(ResolverBugWorkload* self, Database cx) {
|
||||
state Reference<TestWorkload> cycle;
|
||||
state std::shared_ptr<ResolverBug> bug = SimBugInjector().get<ResolverBug>(ResolverBugID());
|
||||
state Future<Void> f = Never();
|
||||
if (self->clientId == 0) {
|
||||
f = driveWorkload(bug, self->clientCount);
|
||||
}
|
||||
while (!bug->bugFound) {
|
||||
ASSERT(!f.isReady());
|
||||
wait(waitForPhase(bug, 1));
|
||||
cycle = self->createCycle();
|
||||
wait(cycle->setup(cx));
|
||||
bug->cycleState[self->clientId] = 1;
|
||||
wait(waitForPhase(bug, 2));
|
||||
wait(cycle->start(cx));
|
||||
bug->cycleState[self->clientId] = 2;
|
||||
wait(waitForPhase(bug, 3));
|
||||
wait(success(cycle->check(cx)));
|
||||
bug->cycleState[self->clientId] = 3;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> start(const Database& cx) override { return _start(this, cx->clone()); }
|
||||
|
||||
private:
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<ResolverBugWorkload> workloadFactory;
|
||||
|
||||
} // namespace
|
|
@ -55,13 +55,15 @@ struct ProcessEventsImpl {
|
|||
std::map<EventImpl::Id, std::vector<StringRef>> toRemove;
|
||||
EventMap toInsert;
|
||||
|
||||
void trigger(StringRef name, StringRef msg, Error const& e) {
|
||||
void trigger(StringRef name, std::any const& data, Error const& e) {
|
||||
Triggering _(triggering);
|
||||
auto iter = events.find(name);
|
||||
// strictly speaking this isn't a bug, but having callbacks that aren't caught
|
||||
// by anyone could mean that something was misspelled. Therefore, the safe thing
|
||||
// to do is to abort.
|
||||
ASSERT(iter != events.end());
|
||||
if (iter == events.end()) {
|
||||
return;
|
||||
}
|
||||
std::unordered_map<EventImpl::Id, EventImpl*> callbacks = iter->second;
|
||||
// after we collected all unique callbacks we can call each
|
||||
for (auto const& c : callbacks) {
|
||||
|
@ -70,7 +72,7 @@ struct ProcessEventsImpl {
|
|||
// which case attempting to call it will be undefined
|
||||
// behavior.
|
||||
if (toRemove.count(c.first) == 0) {
|
||||
c.second->callback(name, msg, e);
|
||||
c.second->callback(name, data, e);
|
||||
}
|
||||
} catch (...) {
|
||||
// callbacks are not allowed to throw
|
||||
|
@ -126,8 +128,8 @@ void EventImpl::removeEvent() {
|
|||
|
||||
namespace ProcessEvents {
|
||||
|
||||
void trigger(StringRef name, StringRef msg, Error const& e) {
|
||||
processEventsImpl.trigger(name, msg, e);
|
||||
void trigger(StringRef name, std::any const& data, Error const& e) {
|
||||
processEventsImpl.trigger(name, data, e);
|
||||
}
|
||||
|
||||
Event::Event(StringRef name, Callback callback) {
|
||||
|
@ -146,7 +148,7 @@ TEST_CASE("/flow/ProcessEvents") {
|
|||
{
|
||||
// Basic test
|
||||
unsigned numHits = 0;
|
||||
Event _("basic"_sr, [&numHits](StringRef n, StringRef, Error const& e) {
|
||||
Event _("basic"_sr, [&numHits](StringRef n, std::any const&, Error const& e) {
|
||||
ASSERT_EQ(n, "basic"_sr);
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++numHits;
|
||||
|
@ -161,18 +163,18 @@ TEST_CASE("/flow/ProcessEvents") {
|
|||
std::vector<std::shared_ptr<Event>> createdEvents;
|
||||
std::vector<unsigned> numHits;
|
||||
numHits.reserve(2);
|
||||
Event _("create"_sr, [&](StringRef n, StringRef, Error const& e) {
|
||||
Event _("create"_sr, [&](StringRef n, std::any const&, Error const& e) {
|
||||
ASSERT_EQ(n, "create"_sr);
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++hits1;
|
||||
numHits.push_back(0);
|
||||
createdEvents.push_back(
|
||||
std::make_shared<Event>(std::vector<StringRef>{ "create"_sr, "secondaries"_sr },
|
||||
[&numHits, idx = numHits.size() - 1](StringRef n, StringRef, Error const& e) {
|
||||
ASSERT(n == "create"_sr || n == "secondaries");
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++numHits[idx];
|
||||
}));
|
||||
createdEvents.push_back(std::make_shared<Event>(
|
||||
std::vector<StringRef>{ "create"_sr, "secondaries"_sr },
|
||||
[&numHits, idx = numHits.size() - 1](StringRef n, std::any const&, Error const& e) {
|
||||
ASSERT(n == "create"_sr || n == "secondaries");
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++numHits[idx];
|
||||
}));
|
||||
});
|
||||
trigger("create"_sr, ""_sr, success());
|
||||
ASSERT_EQ(hits1, 1);
|
||||
|
@ -197,7 +199,7 @@ TEST_CASE("/flow/ProcessEvents") {
|
|||
// Remove self
|
||||
unsigned called_self_delete = 0, called_non_delete = 0;
|
||||
std::unique_ptr<Event> ev;
|
||||
auto callback_del = [&](StringRef n, StringRef, Error const& e) {
|
||||
auto callback_del = [&](StringRef n, std::any const&, Error const& e) {
|
||||
ASSERT_EQ(n, "deletion"_sr);
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++called_self_delete;
|
||||
|
@ -205,7 +207,7 @@ TEST_CASE("/flow/ProcessEvents") {
|
|||
ev.reset();
|
||||
};
|
||||
ev.reset(new Event("deletion"_sr, callback_del));
|
||||
Event _("deletion"_sr, [&](StringRef n, StringRef, Error const& e) {
|
||||
Event _("deletion"_sr, [&](StringRef n, std::any const&, Error const& e) {
|
||||
ASSERT_EQ(n, "deletion"_sr);
|
||||
ASSERT_EQ(e.code(), error_code_success);
|
||||
++called_non_delete;
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* SimBugInjector.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/SimBugInjector.h"
|
||||
#include "flow/network.h"
|
||||
|
||||
#include <typeindex>
|
||||
|
||||
namespace {
|
||||
|
||||
struct SimBugInjectorImpl {
|
||||
bool isEnabled = true;
|
||||
std::unordered_map<std::type_index, std::shared_ptr<ISimBug>> bugs;
|
||||
};
|
||||
|
||||
SimBugInjectorImpl* simBugInjector = nullptr;
|
||||
|
||||
} // namespace
|
||||
|
||||
ISimBug::~ISimBug() {}
|
||||
IBugIdentifier::~IBugIdentifier() {}
|
||||
|
||||
bool SimBugInjector::isEnabled() const {
|
||||
return simBugInjector != nullptr && simBugInjector->isEnabled;
|
||||
}
|
||||
|
||||
void SimBugInjector::enable() {
|
||||
// SimBugInjector is very dangerous. It will corrupt your data! Therefore, using it outside of simulation is
|
||||
// not allowed
|
||||
UNSTOPPABLE_ASSERT(g_network->isSimulated());
|
||||
if (simBugInjector == nullptr) {
|
||||
simBugInjector = new SimBugInjectorImpl();
|
||||
}
|
||||
simBugInjector->isEnabled = true;
|
||||
}
|
||||
|
||||
void SimBugInjector::disable() {
|
||||
if (simBugInjector) {
|
||||
simBugInjector->isEnabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
void SimBugInjector::reset() {
|
||||
if (simBugInjector) {
|
||||
delete simBugInjector;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ISimBug> SimBugInjector::getImpl(const IBugIdentifier& id) const {
|
||||
if (!isEnabled()) {
|
||||
return {};
|
||||
}
|
||||
auto it = simBugInjector->bugs.find(std::type_index(typeid(id)));
|
||||
if (it == simBugInjector->bugs.end()) {
|
||||
return {};
|
||||
} else {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ISimBug> SimBugInjector::enableImpl(const IBugIdentifier& id) {
|
||||
UNSTOPPABLE_ASSERT(isEnabled());
|
||||
auto& res = simBugInjector->bugs[std::type_index(typeid(id))];
|
||||
if (!res) {
|
||||
res = id.create();
|
||||
}
|
||||
return res;
|
||||
}
|
|
@ -42,6 +42,7 @@
|
|||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/MetricSample.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/SimBugInjector.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <windows.h>
|
||||
|
@ -60,6 +61,7 @@
|
|||
thread_local int g_allocation_tracing_disabled = 1;
|
||||
unsigned tracedLines = 0;
|
||||
thread_local int failedLineOverflow = 0;
|
||||
bool g_traceProcessEvents = false;
|
||||
|
||||
ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {}
|
||||
|
||||
|
@ -1305,6 +1307,10 @@ BaseTraceEvent& BaseTraceEvent::backtrace(const std::string& prefix) {
|
|||
}
|
||||
|
||||
void BaseTraceEvent::log() {
|
||||
if (g_traceProcessEvents) {
|
||||
auto name = fmt::format("TraceEvent::{}", type);
|
||||
ProcessEvents::trigger(StringRef(name), this, success());
|
||||
}
|
||||
if (!logged) {
|
||||
init();
|
||||
++g_allocation_tracing_disabled;
|
||||
|
|
|
@ -21,13 +21,15 @@
|
|||
#ifndef FLOW_PROCESS_EVENTS_H
|
||||
#define FLOW_PROCESS_EVENTS_H
|
||||
#include <functional>
|
||||
#include <any>
|
||||
|
||||
#include "flow/flow.h"
|
||||
|
||||
namespace ProcessEvents {
|
||||
|
||||
// A callback is never allowed to throw. Since std::function can't
|
||||
// take noexcept signatures, this is enforced at runtime
|
||||
using Callback = std::function<void(StringRef, StringRef, Error const&)>;
|
||||
using Callback = std::function<void(StringRef, std::any const&, Error const&)>;
|
||||
|
||||
class Event : NonCopyable {
|
||||
void* impl;
|
||||
|
@ -38,7 +40,7 @@ public:
|
|||
~Event();
|
||||
};
|
||||
|
||||
void trigger(StringRef name, StringRef msg, Error const& e);
|
||||
void trigger(StringRef name, std::any const& data, Error const& e);
|
||||
|
||||
} // namespace ProcessEvents
|
||||
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* SimBugInjector.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FLOW_SIM_BUG_INJECTOR_H
|
||||
#define FLOW_SIM_BUG_INJECTOR_H
|
||||
#pragma once
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
|
||||
/*
|
||||
* This file provides a general framework to control how bugs should be injected into FDB. This must not be confused
|
||||
* with Buggify. Buggify is intended to inject faults and set parameters in ways that FDB has to be able to handle
|
||||
* correctly. This framework is meant to be used to inject actual bugs into FDB. The use-case for this is to implement
|
||||
* negative tests. This way we can verify that tests actually catch the bugs that we're expecting them to find.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A ISimBug is a logical, polymorphic, representation of a bug and is the main means of communication for code across
|
||||
* multiple places within the codebase. A user isn't supposed to create instances of ISimBug themselves but instead they
|
||||
* should implement a corresponding IBugIdentifier subclass which can then be used for indexing through the
|
||||
* SimBugInjector.
|
||||
*/
|
||||
class ISimBug : std::enable_shared_from_this<ISimBug> {
|
||||
public:
|
||||
virtual ~ISimBug();
|
||||
};
|
||||
|
||||
/*
|
||||
* Each SimBug class should have a corresponding BugIdentifier
|
||||
*/
|
||||
class IBugIdentifier {
|
||||
public:
|
||||
virtual ~IBugIdentifier();
|
||||
/**
|
||||
* Creates an instance of the SimBug associated with this
|
||||
* identifier. The reason we use shared_ptr instead of Reference here
|
||||
* is so we (1) can use weak references and, more importantly,
|
||||
* (2) shared_ptr has much better support for polymorphic types
|
||||
*/
|
||||
virtual std::shared_ptr<ISimBug> create() const = 0;
|
||||
};
|
||||
|
||||
/*
|
||||
* SimBugInjector is a wrapper for a singleton and can therefore be instantiated cheaply as often as one wants.
|
||||
*/
|
||||
class SimBugInjector {
|
||||
public:
|
||||
explicit SimBugInjector() {}
|
||||
/**
|
||||
* Globally enable SimBugInjector
|
||||
*
|
||||
* Precondition: g_network->isSimulated()
|
||||
*/
|
||||
void enable();
|
||||
/**
|
||||
* Globally disable SimBugInjector. If enable is called later, all current state is restored
|
||||
*
|
||||
* Precondition: true
|
||||
*/
|
||||
void disable();
|
||||
/**
|
||||
* Globally disable SimBugInjector. Unlike disable, this will also remove all existing state
|
||||
*
|
||||
* Precondition: true
|
||||
*/
|
||||
void reset();
|
||||
/**
|
||||
* Check whether SimBugInjector is globally enabled
|
||||
*/
|
||||
bool isEnabled() const;
|
||||
|
||||
/**
|
||||
* This method can be used to check whether a given bug has been enabled and then fetch the corresponding
|
||||
* ISimBug object.
|
||||
*
|
||||
* @param id The IBugIdentifier corresponding to this bug
|
||||
* @return A valid shared_ptr, if the bug has been enabled, nullptr otherwise
|
||||
* @post enabled(bug(id)) -> result is valid else result is nullptr
|
||||
*/
|
||||
template <class T>
|
||||
std::shared_ptr<T> get(IBugIdentifier const& id) {
|
||||
auto res = getImpl(id);
|
||||
if (!res) {
|
||||
return {};
|
||||
}
|
||||
return std::dynamic_pointer_cast<T>(res);
|
||||
}
|
||||
|
||||
std::shared_ptr<ISimBug> getImpl(IBugIdentifier const& id) const;
|
||||
|
||||
/**
|
||||
* Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a
|
||||
* bug to call this method if bug injection isn't turned on.
|
||||
*
|
||||
* @param id
|
||||
* @return the SimBug instance
|
||||
* @pre isEnabled()
|
||||
* @post result.get() != nullptr
|
||||
*/
|
||||
template <class T>
|
||||
std::shared_ptr<T> enable(IBugIdentifier const& id) {
|
||||
auto res = enableImpl(id);
|
||||
if (!res) {
|
||||
return {};
|
||||
}
|
||||
return std::dynamic_pointer_cast<T>(res);
|
||||
}
|
||||
|
||||
std::shared_ptr<ISimBug> enableImpl(IBugIdentifier const& id);
|
||||
};
|
||||
|
||||
#endif // FLOW_SIM_BUG_INJECTOR_H
|
|
@ -51,6 +51,7 @@ inline static bool TRACE_SAMPLE() {
|
|||
}
|
||||
|
||||
extern thread_local int g_allocation_tracing_disabled;
|
||||
extern bool g_traceProcessEvents;
|
||||
|
||||
// Each major level of severity has 10 levels of minor levels, which are not all
|
||||
// used. when the numbers of severity events in each level are counted, they are
|
||||
|
@ -413,6 +414,7 @@ public:
|
|||
std::unique_ptr<DynamicEventMetric> tmpEventMetric; // This just just a place to store fields
|
||||
|
||||
const TraceEventFields& getFields() const { return fields; }
|
||||
Severity getSeverity() const { return severity; }
|
||||
|
||||
template <class Object>
|
||||
void moveTo(Object& obj) {
|
||||
|
|
|
@ -449,6 +449,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.toml)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml)
|
||||
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml)
|
||||
add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml)
|
||||
add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE)
|
||||
|
||||
# Note that status tests are not deterministic.
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
[[test]]
|
||||
testTitle = "ResolverIgnoreTooOld"
|
||||
|
||||
[[test.workload]]
|
||||
testName = "ResolverBug"
|
||||
ignoreTooOldProbability = 0.1
|
Loading…
Reference in New Issue