From fb350edb44301ec2b76a224dd7ef05a47aaf7e0b Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 8 Mar 2023 17:09:50 -0700 Subject: [PATCH 01/13] Add framework for writing negative simulation tests --- contrib/TestHarness2/test_harness/run.py | 9 +- .../TestHarness2/test_harness/summarize.py | 17 +- fdbserver/QuietDatabase.actor.cpp | 4 +- fdbserver/Resolver.actor.cpp | 48 +++--- fdbserver/ResolverBug.cpp | 24 +++ fdbserver/SkipList.cpp | 26 ++- fdbserver/include/fdbserver/ConflictSet.h | 7 + fdbserver/include/fdbserver/ResolverBug.h | 44 +++++ .../workloads/ConsistencyCheck.actor.cpp | 4 +- fdbserver/workloads/ResolverBug.actor.cpp | 159 ++++++++++++++++++ flow/ProcessEvents.cpp | 34 ++-- flow/SimBugInjector.cpp | 85 ++++++++++ flow/Trace.cpp | 6 + flow/include/flow/ProcessEvents.h | 6 +- flow/include/flow/SimBugInjector.h | 129 ++++++++++++++ flow/include/flow/Trace.h | 2 + tests/CMakeLists.txt | 1 + tests/negative/ResolverIgnoreTooOld.toml | 6 + 18 files changed, 559 insertions(+), 52 deletions(-) create mode 100644 fdbserver/ResolverBug.cpp create mode 100644 fdbserver/include/fdbserver/ResolverBug.h create mode 100644 fdbserver/workloads/ResolverBug.actor.cpp create mode 100644 flow/SimBugInjector.cpp create mode 100644 flow/include/flow/SimBugInjector.h create mode 100644 tests/negative/ResolverIgnoreTooOld.toml diff --git a/contrib/TestHarness2/test_harness/run.py b/contrib/TestHarness2/test_harness/run.py index f922a1273f..a37a2fab50 100644 --- a/contrib/TestHarness2/test_harness/run.py +++ b/contrib/TestHarness2/test_harness/run.py @@ -304,6 +304,10 @@ def is_restarting_test(test_file: Path): return False +def is_negative(test_file: Path): + return test_file.parts[-2] == "negative" + + def is_no_sim(test_file: Path): return test_file.parts[-2] == "noSim" @@ -449,7 +453,7 @@ class TestRun: command.append("--restarting") if self.buggify_enabled: command += ["-b", "on"] - if config.crash_on_error: + if config.crash_on_error and not is_negative(self.test_file): command.append("--crash") if config.long_running: # disable simulation speedup @@ -488,6 +492,7 @@ class TestRun: resources.join() # we're rounding times up, otherwise we will prefer running very short tests (<1s) self.run_time = math.ceil(resources.time()) + self.summary.is_negative_test = is_negative(self.test_file) self.summary.runtime = resources.time() self.summary.max_rss = resources.max_rss self.summary.was_killed = did_kill @@ -495,7 +500,7 @@ class TestRun: self.summary.error_out = err_out self.summary.summarize(self.temp_path, " ".join(command)) - if not self.summary.ok(): + if not self.summary.is_negative_test and not self.summary.ok(): self._run_rocksdb_logtool() return self.summary.ok() diff --git a/contrib/TestHarness2/test_harness/summarize.py b/contrib/TestHarness2/test_harness/summarize.py index 0f868e3ab7..2d9898f155 100644 --- a/contrib/TestHarness2/test_harness/summarize.py +++ b/contrib/TestHarness2/test_harness/summarize.py @@ -316,6 +316,8 @@ class Summary: self.stderr_severity: str = '40' self.will_restart: bool = will_restart self.test_dir: Path | None = None + self.is_negative_test = False + self.negative_test_success = False if uid is not None: self.out.attributes['TestUID'] = str(uid) @@ -323,6 +325,7 @@ class Summary: self.out.attributes['Statistics'] = stats self.out.attributes['JoshuaSeed'] = str(config.joshua_seed) self.out.attributes['WillRestart'] = '1' if self.will_restart else '0' + self.out.attributes['NegativeTest'] = '1' if self.is_negative_test else '0' self.handler = ParseHandler(self.out) self.register_handlers() @@ -371,7 +374,8 @@ class Summary: return res def ok(self): - return not self.error + # logical xor -- a test is successful if there was either no error or we expected errors (negative test) + return (not self.error) != self.is_negative_test def done(self): if config.print_coverage: @@ -529,6 +533,17 @@ class Summary: self.handler.add_handler(('Type', 'ProgramStart'), program_start) + def negative_test_success(attrs: Dict[str, str]): + self.negative_test_success = True + child = SummaryTree(attrs['Type']) + for k, v in attrs: + if k != 'Type': + child.attributes[k] = v + self.out.append(child) + pass + + self.handler.add_handler(('Type', 'NegativeTestSuccess'), negative_test_success) + def config_string(attrs: Dict[str, str]): self.out.attributes['ConfigString'] = attrs['ConfigString'] diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 1a1ec4835c..fdf6fcda59 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -793,8 +793,8 @@ ACTOR Future reconfigureAfter(Database cx, } struct QuietDatabaseChecker { - ProcessEvents::Callback timeoutCallback = [this](StringRef name, StringRef msg, Error const& e) { - logFailure(name, msg, e); + ProcessEvents::Callback timeoutCallback = [this](StringRef name, std::any const& msg, Error const& e) { + logFailure(name, std::any_cast(msg), e); }; double start = now(); double maxDDRunTime; diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 8c43f39024..9075f0bc9e 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -216,6 +216,31 @@ struct Resolver : ReferenceCounted { }; } // namespace +ACTOR Future versionReady(Resolver* self, ProxyRequestsInfo* proxyInfo, Version prevVersion) { + loop { + if (self->recentStateTransactionsInfo.size() && + proxyInfo->lastVersion <= self->recentStateTransactionsInfo.firstVersion()) { + self->neededVersion.set(std::max(self->neededVersion.get(), prevVersion)); + } + + // Update queue depth metric before waiting. Check if we're going to be one of the waiters or not. + int waiters = self->version.numWaiting(); + if (self->version.get() < prevVersion) { + waiters++; + } + self->queueDepthDist->sampleRecordCounter(waiters); + + choose { + when(wait(self->version.whenAtLeast(prevVersion))) { + // Update queue depth metric after waiting. + self->queueDepthDist->sampleRecordCounter(self->version.numWaiting()); + return Void(); + } + when(wait(self->checkNeededVersion.onTrigger())) {} + } + } +} + ACTOR Future resolveBatch(Reference self, ResolveTransactionBatchRequest req, Reference const> db) { @@ -266,28 +291,7 @@ ACTOR Future resolveBatch(Reference self, g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterQueueSizeCheck"); } - loop { - if (self->recentStateTransactionsInfo.size() && - proxyInfo.lastVersion <= self->recentStateTransactionsInfo.firstVersion()) { - self->neededVersion.set(std::max(self->neededVersion.get(), req.prevVersion)); - } - - // Update queue depth metric before waiting. Check if we're going to be one of the waiters or not. - int waiters = self->version.numWaiting(); - if (self->version.get() < req.prevVersion) { - waiters++; - } - self->queueDepthDist->sampleRecordCounter(waiters); - - choose { - when(wait(self->version.whenAtLeast(req.prevVersion))) { - // Update queue depth metric after waiting. - self->queueDepthDist->sampleRecordCounter(self->version.numWaiting()); - break; - } - when(wait(self->checkNeededVersion.onTrigger())) {} - } - } + wait(versionReady(self.getPtr(), &proxyInfo, req.prevVersion)); if (check_yield(TaskPriority::DefaultEndpoint)) { wait(delay(0, TaskPriority::Low) || delay(SERVER_KNOBS->COMMIT_SLEEP_TIME)); // FIXME: Is this still right? diff --git a/fdbserver/ResolverBug.cpp b/fdbserver/ResolverBug.cpp new file mode 100644 index 0000000000..08debec3aa --- /dev/null +++ b/fdbserver/ResolverBug.cpp @@ -0,0 +1,24 @@ +/* + * ResolverBug.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "fdbserver/ResolverBug.h" + +std::shared_ptr ResolverBugID::create() const { + return std::make_shared(); +} diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index 9f32329931..2fb9363e03 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -816,6 +816,18 @@ struct TransactionInfo { bool reportConflictingKeys; }; +bool ConflictBatch::ignoreTooOld() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreTooOldProbability; +} + +bool ConflictBatch::ignoreReadSet() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreReadSetProbability; +} + +bool ConflictBatch::ignoreWriteSet() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreWriteSetProbability; +} + void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOldestVersion) { const int t = transactionCount++; @@ -823,14 +835,18 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl TransactionInfo* info = new (arena) TransactionInfo; info->reportConflictingKeys = tr.report_conflicting_keys; - if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size()) { + if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size() && !ignoreTooOld()) { info->tooOld = true; } else { info->tooOld = false; - info->readRanges.resize(arena, tr.read_conflict_ranges.size()); - info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + if (!ignoreReadSet()) { + info->readRanges.resize(arena, tr.read_conflict_ranges.size()); + } + if (!ignoreWriteSet()) { + info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + } - for (int r = 0; r < tr.read_conflict_ranges.size(); r++) { + for (int r = 0; r < info->readRanges.size(); r++) { const KeyRangeRef& range = tr.read_conflict_ranges[r]; points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first); points.emplace_back(range.end, false, false, t, &info->readRanges[r].second); @@ -843,7 +859,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl : nullptr, tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr); } - for (int r = 0; r < tr.write_conflict_ranges.size(); r++) { + for (int r = 0; r < info->writeRanges.size(); r++) { const KeyRangeRef& range = tr.write_conflict_ranges[r]; points.emplace_back(range.begin, true, true, t, &info->writeRanges[r].first); points.emplace_back(range.end, false, true, t, &info->writeRanges[r].second); diff --git a/fdbserver/include/fdbserver/ConflictSet.h b/fdbserver/include/fdbserver/ConflictSet.h index c8a14fcdd3..90ed2c4062 100644 --- a/fdbserver/include/fdbserver/ConflictSet.h +++ b/fdbserver/include/fdbserver/ConflictSet.h @@ -26,6 +26,7 @@ #include #include "fdbclient/CommitTransaction.h" +#include "fdbserver/ResolverBug.h" struct ConflictSet; ConflictSet* newConflictSet(); @@ -63,6 +64,12 @@ private: // Stores the map: a transaction -> conflicted transactions' indices std::map>* conflictingKeyRangeMap; Arena* resolveBatchReplyArena; + std::shared_ptr bugs = SimBugInjector().get(ResolverBugID()); + + // bug injection + bool ignoreTooOld() const; + bool ignoreWriteSet() const; + bool ignoreReadSet() const; void checkIntraBatchConflicts(); void combineWriteConflictRanges(); diff --git a/fdbserver/include/fdbserver/ResolverBug.h b/fdbserver/include/fdbserver/ResolverBug.h new file mode 100644 index 0000000000..de84d44eea --- /dev/null +++ b/fdbserver/include/fdbserver/ResolverBug.h @@ -0,0 +1,44 @@ +/* + * ResolverBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_RESOLVER_BUG_H +#define FDBSERVER_RESOLVER_BUG_H +#pragma once + +#include "flow/SimBugInjector.h" +#include + +struct ResolverBug : public ISimBug { + double ignoreTooOldProbability = 0.0; + double ignoreWriteSetProbability = 0.0; + double ignoreReadSetProbability = 0.0; + + // data used to control lifetime of cycle clients + bool bugFound = false; + unsigned currentPhase = 0; + std::vector cycleState; +}; + +class ResolverBugID : public IBugIdentifier { +public: + std::shared_ptr create() const override; +}; + +#endif // FDBSERVER_RESOLVER_BUG_H diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index a459cd862c..d83a638b52 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -49,11 +49,11 @@ struct ConsistencyCheckWorkload : TestWorkload { struct OnTimeout { ConsistencyCheckWorkload& self; explicit OnTimeout(ConsistencyCheckWorkload& self) : self(self) {} - void operator()(StringRef name, StringRef msg, Error const& e) { + void operator()(StringRef name, std::any const& msg, Error const& e) { TraceEvent(SevError, "ConsistencyCheckFailure") .error(e) .detail("EventName", name) - .detail("EventMessage", msg) + .detail("EventMessage", std::any_cast(msg)) .log(); } }; diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp new file mode 100644 index 0000000000..be18959dd9 --- /dev/null +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -0,0 +1,159 @@ +/* + * ResolverBug.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flow/ProcessEvents.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/ResolverBug.h" +#include "fdbserver/ServerDBInfo.actor.h" + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +struct ResolverBugWorkload : TestWorkload { + constexpr static auto NAME = "ResolverBug"; + bool disableFailureInjections; + ResolverBug resolverBug; + Standalone> cycleOptions; + KeyRef controlKey = "workload_control"_sr; + + ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true); + resolverBug.ignoreTooOldProbability = getOption(options, "ignoreTooOldProbability"_sr, 0.0); + resolverBug.ignoreWriteSetProbability = getOption(options, "ignoreWriteSetProbability"_sr, 0.0); + resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0); + + for (auto& o : options) { + bool hadPrefix = false; + if (o.key.startsWith("cycle_"_sr)) { + KeyValueRef option; + option.key = o.key.removePrefix("cycle_"_sr); + option.value = o.value; + cycleOptions.push_back_deep(cycleOptions.arena(), option); + o.value = ""_sr; + } + } + + if (clientId == 0) { + SimBugInjector().enable(); + auto bug = SimBugInjector().enable(ResolverBugID()); + *bug = resolverBug; + bug->cycleState.resize(clientCount, 0); + SimBugInjector().disable(); + } + } + + void disableFailureInjectionWorkloads(std::set& out) const override { + if (disableFailureInjections) { + out.insert("all"); + } + } + + Reference createCycle() { + WorkloadContext wcx; + wcx.clientId = clientId; + wcx.clientCount = clientCount; + wcx.ccr = ccr; + wcx.dbInfo = dbInfo; + wcx.options = cycleOptions; + wcx.sharedRandomNumber = sharedRandomNumber; + wcx.defaultTenant = defaultTenant.castTo(); + return IWorkloadFactory::create("Cycle", wcx); + } + + ACTOR static Future waitForPhase(std::shared_ptr bug, int phase) { + while (bug->currentPhase != phase) { + wait(delay(0.5)); + } + return Void(); + } + + ACTOR static Future waitForPhaseDone(std::shared_ptr bug, int phase, int clientCount) { + while (std::count(bug->cycleState.begin(), bug->cycleState.end(), phase) != clientCount) { + wait(delay(0.5)); + } + return Void(); + } + + struct ReportTraces { + ReportTraces() { g_traceProcessEvents = true; } + ~ReportTraces() { g_traceProcessEvents = false; } + }; + + struct OnTestFailure { + std::shared_ptr bug; + OnTestFailure(std::shared_ptr bug) : bug(bug) {} + + void operator()(StringRef, auto const& data, Error const&) { + BaseTraceEvent* trace = std::any_cast(data); + if (trace->getSeverity() == SevError) { + bug->bugFound = true; + TraceEvent("NegativeTestSuccess").log(); + } + } + }; + + ACTOR static Future driveWorkload(std::shared_ptr bug, int clientCount) { + state ReportTraces _; + state OnTestFailure onTestFailure(bug); + state ProcessEvents::Event ev("TraceEvent::TestFailure"_sr, onTestFailure); + loop { + bug->currentPhase = 1; + wait(waitForPhaseDone(bug, 1, clientCount)); + SimBugInjector().enable(); + bug->currentPhase = 2; + wait(waitForPhaseDone(bug, 2, clientCount)); + SimBugInjector().disable(); + bug->currentPhase = 3; + wait(waitForPhaseDone(bug, 3, clientCount)); + } + } + + ACTOR static Future _start(ResolverBugWorkload* self, Database cx) { + state Reference cycle; + state std::shared_ptr bug = SimBugInjector().get(ResolverBugID()); + state Future f = Never(); + if (self->clientId == 0) { + f = driveWorkload(bug, self->clientCount); + } + while (!bug->bugFound) { + ASSERT(!f.isReady()); + wait(waitForPhase(bug, 1)); + cycle = self->createCycle(); + wait(cycle->setup(cx)); + bug->cycleState[self->clientId] = 1; + wait(waitForPhase(bug, 2)); + wait(cycle->start(cx)); + bug->cycleState[self->clientId] = 2; + wait(waitForPhase(bug, 3)); + wait(success(cycle->check(cx))); + bug->cycleState[self->clientId] = 3; + } + return Void(); + } + + Future start(const Database& cx) override { return _start(this, cx->clone()); } + +private: + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory workloadFactory; + +} // namespace \ No newline at end of file diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 6d4bdee719..8011fd917b 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -55,13 +55,15 @@ struct ProcessEventsImpl { std::map> toRemove; EventMap toInsert; - void trigger(StringRef name, StringRef msg, Error const& e) { + void trigger(StringRef name, std::any const& data, Error const& e) { Triggering _(triggering); auto iter = events.find(name); // strictly speaking this isn't a bug, but having callbacks that aren't caught // by anyone could mean that something was misspelled. Therefore, the safe thing // to do is to abort. - ASSERT(iter != events.end()); + if (iter == events.end()) { + return; + } std::unordered_map callbacks = iter->second; // after we collected all unique callbacks we can call each for (auto const& c : callbacks) { @@ -70,7 +72,7 @@ struct ProcessEventsImpl { // which case attempting to call it will be undefined // behavior. if (toRemove.count(c.first) == 0) { - c.second->callback(name, msg, e); + c.second->callback(name, data, e); } } catch (...) { // callbacks are not allowed to throw @@ -126,8 +128,8 @@ void EventImpl::removeEvent() { namespace ProcessEvents { -void trigger(StringRef name, StringRef msg, Error const& e) { - processEventsImpl.trigger(name, msg, e); +void trigger(StringRef name, std::any const& data, Error const& e) { + processEventsImpl.trigger(name, data, e); } Event::Event(StringRef name, Callback callback) { @@ -146,7 +148,7 @@ TEST_CASE("/flow/ProcessEvents") { { // Basic test unsigned numHits = 0; - Event _("basic"_sr, [&numHits](StringRef n, StringRef, Error const& e) { + Event _("basic"_sr, [&numHits](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "basic"_sr); ASSERT_EQ(e.code(), error_code_success); ++numHits; @@ -161,18 +163,18 @@ TEST_CASE("/flow/ProcessEvents") { std::vector> createdEvents; std::vector numHits; numHits.reserve(2); - Event _("create"_sr, [&](StringRef n, StringRef, Error const& e) { + Event _("create"_sr, [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "create"_sr); ASSERT_EQ(e.code(), error_code_success); ++hits1; numHits.push_back(0); - createdEvents.push_back( - std::make_shared(std::vector{ "create"_sr, "secondaries"_sr }, - [&numHits, idx = numHits.size() - 1](StringRef n, StringRef, Error const& e) { - ASSERT(n == "create"_sr || n == "secondaries"); - ASSERT_EQ(e.code(), error_code_success); - ++numHits[idx]; - })); + createdEvents.push_back(std::make_shared( + std::vector{ "create"_sr, "secondaries"_sr }, + [&numHits, idx = numHits.size() - 1](StringRef n, std::any const&, Error const& e) { + ASSERT(n == "create"_sr || n == "secondaries"); + ASSERT_EQ(e.code(), error_code_success); + ++numHits[idx]; + })); }); trigger("create"_sr, ""_sr, success()); ASSERT_EQ(hits1, 1); @@ -197,7 +199,7 @@ TEST_CASE("/flow/ProcessEvents") { // Remove self unsigned called_self_delete = 0, called_non_delete = 0; std::unique_ptr ev; - auto callback_del = [&](StringRef n, StringRef, Error const& e) { + auto callback_del = [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "deletion"_sr); ASSERT_EQ(e.code(), error_code_success); ++called_self_delete; @@ -205,7 +207,7 @@ TEST_CASE("/flow/ProcessEvents") { ev.reset(); }; ev.reset(new Event("deletion"_sr, callback_del)); - Event _("deletion"_sr, [&](StringRef n, StringRef, Error const& e) { + Event _("deletion"_sr, [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "deletion"_sr); ASSERT_EQ(e.code(), error_code_success); ++called_non_delete; diff --git a/flow/SimBugInjector.cpp b/flow/SimBugInjector.cpp new file mode 100644 index 0000000000..647708622b --- /dev/null +++ b/flow/SimBugInjector.cpp @@ -0,0 +1,85 @@ +/* + * SimBugInjector.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/SimBugInjector.h" +#include "flow/network.h" + +#include + +namespace { + +struct SimBugInjectorImpl { + bool isEnabled = true; + std::unordered_map> bugs; +}; + +SimBugInjectorImpl* simBugInjector = nullptr; + +} // namespace + +ISimBug::~ISimBug() {} +IBugIdentifier::~IBugIdentifier() {} + +bool SimBugInjector::isEnabled() const { + return simBugInjector != nullptr && simBugInjector->isEnabled; +} + +void SimBugInjector::enable() { + // SimBugInjector is very dangerous. It will corrupt your data! Therefore, using it outside of simulation is + // not allowed + UNSTOPPABLE_ASSERT(g_network->isSimulated()); + if (simBugInjector == nullptr) { + simBugInjector = new SimBugInjectorImpl(); + } + simBugInjector->isEnabled = true; +} + +void SimBugInjector::disable() { + if (simBugInjector) { + simBugInjector->isEnabled = false; + } +} + +void SimBugInjector::reset() { + if (simBugInjector) { + delete simBugInjector; + } +} + +std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id) const { + if (!isEnabled()) { + return {}; + } + auto it = simBugInjector->bugs.find(std::type_index(typeid(id))); + if (it == simBugInjector->bugs.end()) { + return {}; + } else { + return it->second; + } +} + +std::shared_ptr SimBugInjector::enableImpl(const IBugIdentifier& id) { + UNSTOPPABLE_ASSERT(isEnabled()); + auto& res = simBugInjector->bugs[std::type_index(typeid(id))]; + if (!res) { + res = id.create(); + } + return res; +} \ No newline at end of file diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 26f17ea2ef..42b983754d 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -42,6 +42,7 @@ #include "flow/TDMetric.actor.h" #include "flow/MetricSample.h" #include "flow/network.h" +#include "flow/SimBugInjector.h" #ifdef _WIN32 #include @@ -60,6 +61,7 @@ thread_local int g_allocation_tracing_disabled = 1; unsigned tracedLines = 0; thread_local int failedLineOverflow = 0; +bool g_traceProcessEvents = false; ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {} @@ -1305,6 +1307,10 @@ BaseTraceEvent& BaseTraceEvent::backtrace(const std::string& prefix) { } void BaseTraceEvent::log() { + if (g_traceProcessEvents) { + auto name = fmt::format("TraceEvent::{}", type); + ProcessEvents::trigger(StringRef(name), this, success()); + } if (!logged) { init(); ++g_allocation_tracing_disabled; diff --git a/flow/include/flow/ProcessEvents.h b/flow/include/flow/ProcessEvents.h index 2126eab3fe..0272d4e8f3 100644 --- a/flow/include/flow/ProcessEvents.h +++ b/flow/include/flow/ProcessEvents.h @@ -21,13 +21,15 @@ #ifndef FLOW_PROCESS_EVENTS_H #define FLOW_PROCESS_EVENTS_H #include +#include + #include "flow/flow.h" namespace ProcessEvents { // A callback is never allowed to throw. Since std::function can't // take noexcept signatures, this is enforced at runtime -using Callback = std::function; +using Callback = std::function; class Event : NonCopyable { void* impl; @@ -38,7 +40,7 @@ public: ~Event(); }; -void trigger(StringRef name, StringRef msg, Error const& e); +void trigger(StringRef name, std::any const& data, Error const& e); } // namespace ProcessEvents diff --git a/flow/include/flow/SimBugInjector.h b/flow/include/flow/SimBugInjector.h new file mode 100644 index 0000000000..6a3d0f50fa --- /dev/null +++ b/flow/include/flow/SimBugInjector.h @@ -0,0 +1,129 @@ +/* + * SimBugInjector.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_SIM_BUG_INJECTOR_H +#define FLOW_SIM_BUG_INJECTOR_H +#pragma once +#include +#include + +/* + * This file provides a general framework to control how bugs should be injected into FDB. This must not be confused + * with Buggify. Buggify is intended to inject faults and set parameters in ways that FDB has to be able to handle + * correctly. This framework is meant to be used to inject actual bugs into FDB. The use-case for this is to implement + * negative tests. This way we can verify that tests actually catch the bugs that we're expecting them to find. + */ + +/** + * A ISimBug is a logical, polymorphic, representation of a bug and is the main means of communication for code across + * multiple places within the codebase. A user isn't supposed to create instances of ISimBug themselves but instead they + * should implement a corresponding IBugIdentifier subclass which can then be used for indexing through the + * SimBugInjector. + */ +class ISimBug : std::enable_shared_from_this { +public: + virtual ~ISimBug(); +}; + +/* + * Each SimBug class should have a corresponding BugIdentifier + */ +class IBugIdentifier { +public: + virtual ~IBugIdentifier(); + /** + * Creates an instance of the SimBug associated with this + * identifier. The reason we use shared_ptr instead of Reference here + * is so we (1) can use weak references and, more importantly, + * (2) shared_ptr has much better support for polymorphic types + */ + virtual std::shared_ptr create() const = 0; +}; + +/* + * SimBugInjector is a wrapper for a singleton and can therefore be instantiated cheaply as often as one wants. + */ +class SimBugInjector { +public: + explicit SimBugInjector() {} + /** + * Globally enable SimBugInjector + * + * Precondition: g_network->isSimulated() + */ + void enable(); + /** + * Globally disable SimBugInjector. If enable is called later, all current state is restored + * + * Precondition: true + */ + void disable(); + /** + * Globally disable SimBugInjector. Unlike disable, this will also remove all existing state + * + * Precondition: true + */ + void reset(); + /** + * Check whether SimBugInjector is globally enabled + */ + bool isEnabled() const; + + /** + * This method can be used to check whether a given bug has been enabled and then fetch the corresponding + * ISimBug object. + * + * @param id The IBugIdentifier corresponding to this bug + * @return A valid shared_ptr, if the bug has been enabled, nullptr otherwise + * @post enabled(bug(id)) -> result is valid else result is nullptr + */ + template + std::shared_ptr get(IBugIdentifier const& id) { + auto res = getImpl(id); + if (!res) { + return {}; + } + return std::dynamic_pointer_cast(res); + } + + std::shared_ptr getImpl(IBugIdentifier const& id) const; + + /** + * Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a + * bug to call this method if bug injection isn't turned on. + * + * @param id + * @return the SimBug instance + * @pre isEnabled() + * @post result.get() != nullptr + */ + template + std::shared_ptr enable(IBugIdentifier const& id) { + auto res = enableImpl(id); + if (!res) { + return {}; + } + return std::dynamic_pointer_cast(res); + } + + std::shared_ptr enableImpl(IBugIdentifier const& id); +}; + +#endif // FLOW_SIM_BUG_INJECTOR_H diff --git a/flow/include/flow/Trace.h b/flow/include/flow/Trace.h index f813087679..324c52f642 100644 --- a/flow/include/flow/Trace.h +++ b/flow/include/flow/Trace.h @@ -51,6 +51,7 @@ inline static bool TRACE_SAMPLE() { } extern thread_local int g_allocation_tracing_disabled; +extern bool g_traceProcessEvents; // Each major level of severity has 10 levels of minor levels, which are not all // used. when the numbers of severity events in each level are counted, they are @@ -413,6 +414,7 @@ public: std::unique_ptr tmpEventMetric; // This just just a place to store fields const TraceEventFields& getFields() const { return fields; } + Severity getSeverity() const { return severity; } template void moveTo(Object& obj) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1c50897837..244d52c398 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -447,6 +447,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml new file mode 100644 index 0000000000..495a2acc33 --- /dev/null +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreTooOld" + +[[test.workload]] +testName = "ResolverBug" +ignoreTooOldProbability = 0.1 From 3dc9ae18f50d65733fa1e744b3c0754bacb5265f Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 8 Mar 2023 17:15:53 -0700 Subject: [PATCH 02/13] fix compiler error --- fdbserver/workloads/ResolverBug.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp index be18959dd9..e70ea59716 100644 --- a/fdbserver/workloads/ResolverBug.actor.cpp +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -40,7 +40,6 @@ struct ResolverBugWorkload : TestWorkload { resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0); for (auto& o : options) { - bool hadPrefix = false; if (o.key.startsWith("cycle_"_sr)) { KeyValueRef option; option.key = o.key.removePrefix("cycle_"_sr); @@ -149,6 +148,7 @@ struct ResolverBugWorkload : TestWorkload { } Future start(const Database& cx) override { return _start(this, cx->clone()); } + Future check(Database const& cx) override { return true; }; private: void getMetrics(std::vector& m) override {} From 4b90a01f0b241be5ce1e9731c4d864342cbdc7f7 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 13:05:00 -0700 Subject: [PATCH 03/13] First successful negative run --- fdbserver/SkipList.cpp | 11 ++++++- fdbserver/workloads/ResolverBug.actor.cpp | 32 ++++++++++++++------ flow/ProcessEvents.cpp | 4 ++- flow/SimBugInjector.cpp | 37 +++++++++++++++++++++-- flow/Trace.cpp | 8 ++--- flow/include/flow/SimBugInjector.h | 26 ++++++++++++++-- tests/negative/ResolverIgnoreTooOld.toml | 8 ++++- 7 files changed, 103 insertions(+), 23 deletions(-) diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index 2fb9363e03..b48d32c6b6 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -834,16 +834,25 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl Arena& arena = transactionInfo.arena(); TransactionInfo* info = new (arena) TransactionInfo; info->reportConflictingKeys = tr.report_conflicting_keys; + bool tooOld = tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size(); + if (tooOld && ignoreTooOld()) { + bugs->hit(); + tooOld = false; + } - if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size() && !ignoreTooOld()) { + if (tooOld) { info->tooOld = true; } else { info->tooOld = false; if (!ignoreReadSet()) { info->readRanges.resize(arena, tr.read_conflict_ranges.size()); + } else { + bugs->hit(); } if (!ignoreWriteSet()) { info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + } else { + bugs->hit(); } for (int r = 0; r < info->readRanges.size(); r++) { diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp index e70ea59716..789497f489 100644 --- a/fdbserver/workloads/ResolverBug.actor.cpp +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -32,6 +32,7 @@ struct ResolverBugWorkload : TestWorkload { ResolverBug resolverBug; Standalone> cycleOptions; KeyRef controlKey = "workload_control"_sr; + Promise bugFound; ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true); @@ -103,7 +104,6 @@ struct ResolverBugWorkload : TestWorkload { BaseTraceEvent* trace = std::any_cast(data); if (trace->getSeverity() == SevError) { bug->bugFound = true; - TraceEvent("NegativeTestSuccess").log(); } } }; @@ -126,13 +126,8 @@ struct ResolverBugWorkload : TestWorkload { ACTOR static Future _start(ResolverBugWorkload* self, Database cx) { state Reference cycle; - state std::shared_ptr bug = SimBugInjector().get(ResolverBugID()); - state Future f = Never(); - if (self->clientId == 0) { - f = driveWorkload(bug, self->clientCount); - } - while (!bug->bugFound) { - ASSERT(!f.isReady()); + state std::shared_ptr bug = SimBugInjector().get(ResolverBugID(), true); + loop { wait(waitForPhase(bug, 1)); cycle = self->createCycle(); wait(cycle->setup(cx)); @@ -144,10 +139,27 @@ struct ResolverBugWorkload : TestWorkload { wait(success(cycle->check(cx))); bug->cycleState[self->clientId] = 3; } - return Void(); } - Future start(const Database& cx) override { return _start(this, cx->clone()); } + ACTOR static Future onBug(std::shared_ptr bug) { + loop { + if (bug->bugFound) { + TraceEvent("NegativeTestSuccess").log(); + return Void(); + } + wait(delay(0.5)); + } + } + + Future start(const Database& cx) override { + std::vector> futures; + auto bug = SimBugInjector().get(ResolverBugID(), true); + if (clientId == 0) { + futures.push_back(driveWorkload(bug, clientCount)); + } + futures.push_back(_start(this, cx->clone())); + return onBug(bug) || waitForAll(futures); + } Future check(Database const& cx) override { return true; }; private: diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 8011fd917b..c56c56b578 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -44,7 +44,9 @@ struct ProcessEventsImpl { struct Triggering { bool& value; explicit Triggering(bool& value) : value(value) { - ASSERT(!value); + if (value) { + ASSERT(false); + } value = true; } ~Triggering() { value = false; } diff --git a/flow/SimBugInjector.cpp b/flow/SimBugInjector.cpp index 647708622b..42cf9b9da5 100644 --- a/flow/SimBugInjector.cpp +++ b/flow/SimBugInjector.cpp @@ -22,6 +22,7 @@ #include "flow/network.h" #include +#include namespace { @@ -30,11 +31,38 @@ struct SimBugInjectorImpl { std::unordered_map> bugs; }; +struct ISimBugImpl { + unsigned numHits = 0; + static ISimBugImpl* get(void* self) { return reinterpret_cast(self); } +}; + SimBugInjectorImpl* simBugInjector = nullptr; } // namespace -ISimBug::~ISimBug() {} +ISimBug::ISimBug() : impl(new ISimBugImpl()) {} + +ISimBug::~ISimBug() { + delete ISimBugImpl::get(impl); +} + +std::string ISimBug::name() const { + auto const& typeInfo = typeid(*this); + return boost::core::demangle(typeInfo.name()); +} + +void ISimBug::hit() { + ++ISimBugImpl::get(impl)->numHits; + TraceEvent(SevWarnAlways, "BugInjected").detail("Name", name()).detail("NumHits", numHits()).log(); + this->onHit(); +} + +void ISimBug::onHit() {} + +unsigned ISimBug::numHits() const { + return ISimBugImpl::get(impl)->numHits; +} + IBugIdentifier::~IBugIdentifier() {} bool SimBugInjector::isEnabled() const { @@ -63,8 +91,11 @@ void SimBugInjector::reset() { } } -std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id) const { - if (!isEnabled()) { +std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id, bool getDisabled /* = false */) const { + if (!simBugInjector) { + return {}; + } + if (!getDisabled && !isEnabled()) { return {}; } auto it = simBugInjector->bugs.find(std::type_index(typeid(id))); diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 42b983754d..e90a223f87 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -1307,10 +1307,6 @@ BaseTraceEvent& BaseTraceEvent::backtrace(const std::string& prefix) { } void BaseTraceEvent::log() { - if (g_traceProcessEvents) { - auto name = fmt::format("TraceEvent::{}", type); - ProcessEvents::trigger(StringRef(name), this, success()); - } if (!logged) { init(); ++g_allocation_tracing_disabled; @@ -1336,6 +1332,10 @@ void BaseTraceEvent::log() { if (isNetworkThread()) { TraceEvent::eventCounts[severity / 10]++; } + if (g_traceProcessEvents) { + auto name = fmt::format("TraceEvent::{}", type); + ProcessEvents::trigger(StringRef(name), this, success()); + } g_traceLog.writeEvent(fields, trackingKey, severity > SevWarnAlways); if (g_traceLog.isOpen()) { diff --git a/flow/include/flow/SimBugInjector.h b/flow/include/flow/SimBugInjector.h index 6a3d0f50fa..290f596ff3 100644 --- a/flow/include/flow/SimBugInjector.h +++ b/flow/include/flow/SimBugInjector.h @@ -38,8 +38,28 @@ * SimBugInjector. */ class ISimBug : std::enable_shared_from_this { + void* impl; + virtual void onHit(); + public: + ISimBug(); virtual ~ISimBug(); + /** + * The name of the bug. By default this will return the class name (using typeid and boost::core::demangle). This is + * supposed to be a human-readable string which can be used to identify the bug when it appears in traces. + * + * @return A human readable string for this bug. + */ + virtual std::string name() const; + /** + * Should be called every time this bug is hit. This method can't be overridden. However, it will call `onHit` which + * can be overriden by a child. + */ + void hit(); + /** + * @return Number of times this bug has been hit (since last call to `SimBugInjector::reset` + */ + unsigned numHits() const; }; /* @@ -95,15 +115,15 @@ public: * @post enabled(bug(id)) -> result is valid else result is nullptr */ template - std::shared_ptr get(IBugIdentifier const& id) { - auto res = getImpl(id); + std::shared_ptr get(IBugIdentifier const& id, bool getDisabled = false) { + auto res = getImpl(id, getDisabled); if (!res) { return {}; } return std::dynamic_pointer_cast(res); } - std::shared_ptr getImpl(IBugIdentifier const& id) const; + std::shared_ptr getImpl(IBugIdentifier const& id, bool getDisabled = false) const; /** * Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml index 495a2acc33..1d02c95c8e 100644 --- a/tests/negative/ResolverIgnoreTooOld.toml +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -1,6 +1,12 @@ +[[knobs]] +enable_version_vector = false +max_read_transaction_life_versions = 1000000 +max_write_transaction_life_versions = 1000000 + [[test]] testTitle = "ResolverIgnoreTooOld" [[test.workload]] testName = "ResolverBug" -ignoreTooOldProbability = 0.1 +# old transactions are rare in the first place, so we ignore all of them to make corruptions likely +ignoreTooOldProbability = 1.0 From fbc79c001cae02f20e4e4f229d834a617ca677eb Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 13:55:54 -0700 Subject: [PATCH 04/13] Make process events reentrant save --- flow/ProcessEvents.cpp | 84 +++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index c56c56b578..7eff0736a0 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -42,23 +42,36 @@ struct EventImpl { struct ProcessEventsImpl { struct Triggering { - bool& value; - explicit Triggering(bool& value) : value(value) { - if (value) { - ASSERT(false); - } - value = true; + unsigned& value; + ProcessEventsImpl& processEvents; + explicit Triggering(unsigned& value, ProcessEventsImpl& processEvents) + : value(value), processEvents(processEvents) { + ++value; + } + ~Triggering() { + if (--value == 0) { + // merge modifications back into the event map + for (auto const& p : processEvents.toRemove) { + for (auto const& n : p.second) { + processEvents.events[n].erase(p.first); + } + } + processEvents.toRemove.clear(); + for (auto const& p : processEvents.toInsert) { + processEvents.events[p.first].insert(p.second.begin(), p.second.end()); + } + processEvents.toInsert.clear(); + } } - ~Triggering() { value = false; } }; using EventMap = std::unordered_map>; - bool triggering = false; + unsigned triggering = 0; EventMap events; std::map> toRemove; EventMap toInsert; void trigger(StringRef name, std::any const& data, Error const& e) { - Triggering _(triggering); + Triggering _(triggering, *this); auto iter = events.find(name); // strictly speaking this isn't a bug, but having callbacks that aren't caught // by anyone could mean that something was misspelled. Therefore, the safe thing @@ -66,7 +79,7 @@ struct ProcessEventsImpl { if (iter == events.end()) { return; } - std::unordered_map callbacks = iter->second; + std::unordered_map& callbacks = iter->second; // after we collected all unique callbacks we can call each for (auto const& c : callbacks) { try { @@ -81,17 +94,6 @@ struct ProcessEventsImpl { UNSTOPPABLE_ASSERT(false); } } - // merge modifications back into the event map - for (auto const& p : toRemove) { - for (auto const& n : p.second) { - events[n].erase(p.first); - } - } - toRemove.clear(); - for (auto const& p : toInsert) { - events[p.first].insert(p.second.begin(), p.second.end()); - } - toInsert.clear(); } void add(StringRef const& name, EventImpl* event) { @@ -107,7 +109,30 @@ struct ProcessEventsImpl { void remove(std::vector names, EventImpl::Id id) { if (triggering) { - toRemove.emplace(id, std::move(names)); + // it's possible that the event hasn't been added yet + bool inInsertMap = false; + for (auto const& name : names) { + auto it = toInsert.find(name); + if (it == toInsert.end()) { + // either all are in the insert map or none + ASSERT(!inInsertMap); + break; + } + auto it2 = it->second.find(id); + if (it2 == it->second.end()) { + // either all are in the insert map or none + ASSERT(!inInsertMap); + break; + } + inInsertMap = true; + it->second.erase(it2); + if (it->second.empty()) { + toInsert.erase(it); + } + } + if (!inInsertMap) { + toRemove.emplace(id, std::move(names)); + } } else { for (auto const& name : names) { events[name].erase(id); @@ -219,6 +244,21 @@ TEST_CASE("/flow/ProcessEvents") { ASSERT_EQ(called_self_delete, 1); ASSERT_EQ(called_non_delete, 2); } + { + // Reentrant safe + Event ev("reentrant"_sr, [](StringRef, std::any const& data, Error const&) { + // call depth of 5 + auto v = std::any_cast(data); + if (v < 5) { + Event doNotCall("reentrant"_sr, [](StringRef, std::any const&, Error const&) { + // should never be called + ASSERT(false); + }); + trigger("reentrant"_sr, v + 1, success()); + } + }); + trigger("reentrant"_sr, 0, success()); + } return Void(); } From b5b9cec9745c79c3d335756678403a6b0fba6344 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 17:25:05 -0700 Subject: [PATCH 05/13] Adding additional tests --- tests/CMakeLists.txt | 2 ++ tests/negative/ResolverIgnoreReads.toml | 6 ++++++ tests/negative/ResolverIgnoreTooOld.toml | 3 +-- tests/negative/ResolverIgnoreWrites.toml | 6 ++++++ 4 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 tests/negative/ResolverIgnoreReads.toml create mode 100644 tests/negative/ResolverIgnoreWrites.toml diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 244d52c398..bed873af92 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -448,6 +448,8 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreReads.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreWrites.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/ResolverIgnoreReads.toml b/tests/negative/ResolverIgnoreReads.toml new file mode 100644 index 0000000000..47c79531de --- /dev/null +++ b/tests/negative/ResolverIgnoreReads.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreReads" + +[[test.workload]] +testName = "ResolverBug" +ignoreReadSetProbability = 0.01 diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml index 1d02c95c8e..6ce5a17a33 100644 --- a/tests/negative/ResolverIgnoreTooOld.toml +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -8,5 +8,4 @@ testTitle = "ResolverIgnoreTooOld" [[test.workload]] testName = "ResolverBug" -# old transactions are rare in the first place, so we ignore all of them to make corruptions likely -ignoreTooOldProbability = 1.0 +ignoreTooOldProbability = 0.2 diff --git a/tests/negative/ResolverIgnoreWrites.toml b/tests/negative/ResolverIgnoreWrites.toml new file mode 100644 index 0000000000..3b5c834feb --- /dev/null +++ b/tests/negative/ResolverIgnoreWrites.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreReads" + +[[test.workload]] +testName = "ResolverBug" +ignoreWriteSetProbability = 0.01 From 2acc3e965d76c2039cd1bf68d760a5680c067a6f Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Fri, 10 Mar 2023 23:29:54 -0700 Subject: [PATCH 06/13] Adding data corruption test to verify consistency check --- .../include/fdbserver/StorageCorruptionBug.h | 34 ++++++++ fdbserver/storageserver.actor.cpp | 31 +++++++- .../workloads/StorageCorruption.actor.cpp | 78 +++++++++++++++++++ flow/ProcessEvents.cpp | 4 + flow/include/flow/ProcessEvents.h | 1 + tests/CMakeLists.txt | 1 + tests/negative/StorageCorruption.toml | 21 +++++ 7 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 fdbserver/include/fdbserver/StorageCorruptionBug.h create mode 100644 fdbserver/workloads/StorageCorruption.actor.cpp create mode 100644 tests/negative/StorageCorruption.toml diff --git a/fdbserver/include/fdbserver/StorageCorruptionBug.h b/fdbserver/include/fdbserver/StorageCorruptionBug.h new file mode 100644 index 0000000000..ea2c826dd4 --- /dev/null +++ b/fdbserver/include/fdbserver/StorageCorruptionBug.h @@ -0,0 +1,34 @@ +/* + * StorageCorruptionBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#ifndef FDBSERVER_STORAGE_CORRUPTION_BUG_H +#define FDBSERVER_STORAGE_CORRUPTION_BUG_H +#include "flow/SimBugInjector.h" + +class StorageCorruptionBug : public ISimBug { +public: + double corruptionProbability = 0.001; +}; +class StorageCorruptionBugID : public IBugIdentifier { +public: + std::shared_ptr create() const override { return std::make_shared(); } +}; + +#endif // FDBSERVER_STORAGE_CORRUPTION_BUG_H diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 42bbe2bddd..0d5283f39c 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -92,6 +92,7 @@ #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" +#include "fdbserver/StorageCorruptionBug.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/Error.h" @@ -457,6 +458,7 @@ private: struct StorageServer* data; IKeyValueStore* storage; void writeMutations(const VectorRef& mutations, Version debugVersion, const char* debugContext); + void writeMutationsBuggy(const VectorRef& mutations, Version debugVersion, const char* debugContext); ACTOR static Future readFirstKey(IKeyValueStore* storage, KeyRangeRef range, Optional options) { RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options)); @@ -10363,6 +10365,29 @@ void StorageServerDisk::writeMutation(MutationRef mutation) { ASSERT(false); } +void StorageServerDisk::writeMutationsBuggy(const VectorRef& mutations, + Version debugVersion, + const char* debugContext) { + auto bug = SimBugInjector().get(StorageCorruptionBugID()); + if (!bug) { + writeMutations(mutations, debugVersion, debugContext); + } + int begin = 0; + while (begin < mutations.size()) { + int i; + for (i = begin; i < mutations.size(); ++i) { + if (deterministicRandom()->random01() < bug->corruptionProbability) { + bug->hit(); + break; + } + } + writeMutations(mutations.slice(begin, i), debugVersion, debugContext); + // we want to drop the mutation at i (unless i == mutations.size(), in which case this will just finish the + // loop) + begin = i + 1; + } +} + void StorageServerDisk::writeMutations(const VectorRef& mutations, Version debugVersion, const char* debugContext) { @@ -10395,7 +10420,11 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion); // TODO(alexmiller): Update to version tracking. // DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); - writeMutations(v.mutations, v.version, "makeVersionDurable"); + if (!SimBugInjector().isEnabled()) { + writeMutations(v.mutations, v.version, "makeVersionDurable"); + } else { + writeMutationsBuggy(v.mutations, v.version, "makeVersionDurable"); + } for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m); prevStorageVersion = v.version; diff --git a/fdbserver/workloads/StorageCorruption.actor.cpp b/fdbserver/workloads/StorageCorruption.actor.cpp new file mode 100644 index 0000000000..89d0eb298a --- /dev/null +++ b/fdbserver/workloads/StorageCorruption.actor.cpp @@ -0,0 +1,78 @@ +/* + * StorageCorruptionBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/StorageCorruptionBug.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "flow/ProcessEvents.h" + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +struct StorageCorruptionWorkload : TestWorkload { + constexpr static auto NAME = "StorageCorruption"; + using Self = StorageCorruptionWorkload; + std::shared_ptr bug; + SimBugInjector bugInjector; + double testDuration; + + StorageCorruptionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + bugInjector.enable(); + bug = bugInjector.enable(StorageCorruptionBugID()); + bugInjector.disable(); + bug->corruptionProbability = getOption(options, "corruptionProbability"_sr, 0.001); + testDuration = getOption(options, "testDuration"_sr, 60.0); + } + + void disableFailureInjectionWorkloads(std::set& out) const override { out.insert("all"); } + + ACTOR static Future _start(Self* self, Database cx) { + wait(success(setDDMode(cx, 0))); + self->bugInjector.enable(); + wait(delay(self->testDuration)); + self->bug->corruptionProbability = 0.0; + TraceEvent("CorruptionInjections").detail("NumCorruptions", self->bug->numHits()).log(); + self->bugInjector.disable(); + ProcessEvents::uncancellableEvent("ConsistencyCheckFailure"_sr, + [](StringRef, std::any const& data, Error const&) { + if (std::any_cast(data)->getSeverity() == SevError) { + TraceEvent("NegativeTestSuccess"); + } + }); + wait(success(setDDMode(cx, 1))); + return Void(); + } + + Future start(Database const& cx) override { + if (clientId != 0) { + return Void(); + } + return _start(this, cx->clone()); + } + Future check(Database const& cx) override { return true; } + +private: + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory factory; + +} // namespace \ No newline at end of file diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 7eff0736a0..44b1cdc4e9 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -159,6 +159,10 @@ void trigger(StringRef name, std::any const& data, Error const& e) { processEventsImpl.trigger(name, data, e); } +void uncancellableEvent(StringRef name, Callback callback) { + new EventImpl({ name }, std::move(callback)); +} + Event::Event(StringRef name, Callback callback) { impl = new EventImpl({ std::move(name) }, std::move(callback)); } diff --git a/flow/include/flow/ProcessEvents.h b/flow/include/flow/ProcessEvents.h index 0272d4e8f3..dc13102cd4 100644 --- a/flow/include/flow/ProcessEvents.h +++ b/flow/include/flow/ProcessEvents.h @@ -40,6 +40,7 @@ public: ~Event(); }; +void uncancellableEvent(StringRef name, Callback callback); void trigger(StringRef name, std::any const& data, Error const& e); } // namespace ProcessEvents diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bed873af92..a5449a159f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -450,6 +450,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreReads.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreWrites.toml) + add_fdb_test(TEST_FILES negative/StorageCorruption.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/StorageCorruption.toml b/tests/negative/StorageCorruption.toml new file mode 100644 index 0000000000..b73c7d6a59 --- /dev/null +++ b/tests/negative/StorageCorruption.toml @@ -0,0 +1,21 @@ +[[test]] +testTitle = "StorageCorruption" + +[[test.workload]] +testName = "StorageCorruption" +corruptionProbability = 0.001 +testDuration = 60.0 + +[[test.workload]] +testName = 'ReadWrite' +testDuration = 60.0 +transactionsPerSecond = 200 +writesPerTransactionA = 5 +readsPerTransactionA = 1 +writesPerTransactionB = 10 +readsPerTransactionB = 1 +alpha = 0.5 +nodeCount = 10000 +valueBytes = 128 +discardEdgeMeasurements = false +warmingDelay = 10.0 \ No newline at end of file From 7a108a27681d9d64bbec106229cca9ecf3978f4e Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 8 Mar 2023 17:09:50 -0700 Subject: [PATCH 07/13] Add framework for writing negative simulation tests --- contrib/TestHarness2/test_harness/run.py | 9 +- .../TestHarness2/test_harness/summarize.py | 17 +- fdbserver/QuietDatabase.actor.cpp | 4 +- fdbserver/Resolver.actor.cpp | 48 +++--- fdbserver/ResolverBug.cpp | 24 +++ fdbserver/SkipList.cpp | 26 ++- fdbserver/include/fdbserver/ConflictSet.h | 7 + fdbserver/include/fdbserver/ResolverBug.h | 44 +++++ .../workloads/ConsistencyCheck.actor.cpp | 4 +- fdbserver/workloads/ResolverBug.actor.cpp | 159 ++++++++++++++++++ flow/ProcessEvents.cpp | 34 ++-- flow/SimBugInjector.cpp | 85 ++++++++++ flow/Trace.cpp | 6 + flow/include/flow/ProcessEvents.h | 6 +- flow/include/flow/SimBugInjector.h | 129 ++++++++++++++ flow/include/flow/Trace.h | 2 + tests/CMakeLists.txt | 1 + tests/negative/ResolverIgnoreTooOld.toml | 6 + 18 files changed, 559 insertions(+), 52 deletions(-) create mode 100644 fdbserver/ResolverBug.cpp create mode 100644 fdbserver/include/fdbserver/ResolverBug.h create mode 100644 fdbserver/workloads/ResolverBug.actor.cpp create mode 100644 flow/SimBugInjector.cpp create mode 100644 flow/include/flow/SimBugInjector.h create mode 100644 tests/negative/ResolverIgnoreTooOld.toml diff --git a/contrib/TestHarness2/test_harness/run.py b/contrib/TestHarness2/test_harness/run.py index f922a1273f..a37a2fab50 100644 --- a/contrib/TestHarness2/test_harness/run.py +++ b/contrib/TestHarness2/test_harness/run.py @@ -304,6 +304,10 @@ def is_restarting_test(test_file: Path): return False +def is_negative(test_file: Path): + return test_file.parts[-2] == "negative" + + def is_no_sim(test_file: Path): return test_file.parts[-2] == "noSim" @@ -449,7 +453,7 @@ class TestRun: command.append("--restarting") if self.buggify_enabled: command += ["-b", "on"] - if config.crash_on_error: + if config.crash_on_error and not is_negative(self.test_file): command.append("--crash") if config.long_running: # disable simulation speedup @@ -488,6 +492,7 @@ class TestRun: resources.join() # we're rounding times up, otherwise we will prefer running very short tests (<1s) self.run_time = math.ceil(resources.time()) + self.summary.is_negative_test = is_negative(self.test_file) self.summary.runtime = resources.time() self.summary.max_rss = resources.max_rss self.summary.was_killed = did_kill @@ -495,7 +500,7 @@ class TestRun: self.summary.error_out = err_out self.summary.summarize(self.temp_path, " ".join(command)) - if not self.summary.ok(): + if not self.summary.is_negative_test and not self.summary.ok(): self._run_rocksdb_logtool() return self.summary.ok() diff --git a/contrib/TestHarness2/test_harness/summarize.py b/contrib/TestHarness2/test_harness/summarize.py index 0f868e3ab7..2d9898f155 100644 --- a/contrib/TestHarness2/test_harness/summarize.py +++ b/contrib/TestHarness2/test_harness/summarize.py @@ -316,6 +316,8 @@ class Summary: self.stderr_severity: str = '40' self.will_restart: bool = will_restart self.test_dir: Path | None = None + self.is_negative_test = False + self.negative_test_success = False if uid is not None: self.out.attributes['TestUID'] = str(uid) @@ -323,6 +325,7 @@ class Summary: self.out.attributes['Statistics'] = stats self.out.attributes['JoshuaSeed'] = str(config.joshua_seed) self.out.attributes['WillRestart'] = '1' if self.will_restart else '0' + self.out.attributes['NegativeTest'] = '1' if self.is_negative_test else '0' self.handler = ParseHandler(self.out) self.register_handlers() @@ -371,7 +374,8 @@ class Summary: return res def ok(self): - return not self.error + # logical xor -- a test is successful if there was either no error or we expected errors (negative test) + return (not self.error) != self.is_negative_test def done(self): if config.print_coverage: @@ -529,6 +533,17 @@ class Summary: self.handler.add_handler(('Type', 'ProgramStart'), program_start) + def negative_test_success(attrs: Dict[str, str]): + self.negative_test_success = True + child = SummaryTree(attrs['Type']) + for k, v in attrs: + if k != 'Type': + child.attributes[k] = v + self.out.append(child) + pass + + self.handler.add_handler(('Type', 'NegativeTestSuccess'), negative_test_success) + def config_string(attrs: Dict[str, str]): self.out.attributes['ConfigString'] = attrs['ConfigString'] diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 1a1ec4835c..fdf6fcda59 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -793,8 +793,8 @@ ACTOR Future reconfigureAfter(Database cx, } struct QuietDatabaseChecker { - ProcessEvents::Callback timeoutCallback = [this](StringRef name, StringRef msg, Error const& e) { - logFailure(name, msg, e); + ProcessEvents::Callback timeoutCallback = [this](StringRef name, std::any const& msg, Error const& e) { + logFailure(name, std::any_cast(msg), e); }; double start = now(); double maxDDRunTime; diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 468c294ab5..bf4118f5f2 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -216,6 +216,31 @@ struct Resolver : ReferenceCounted { }; } // namespace +ACTOR Future versionReady(Resolver* self, ProxyRequestsInfo* proxyInfo, Version prevVersion) { + loop { + if (self->recentStateTransactionsInfo.size() && + proxyInfo->lastVersion <= self->recentStateTransactionsInfo.firstVersion()) { + self->neededVersion.set(std::max(self->neededVersion.get(), prevVersion)); + } + + // Update queue depth metric before waiting. Check if we're going to be one of the waiters or not. + int waiters = self->version.numWaiting(); + if (self->version.get() < prevVersion) { + waiters++; + } + self->queueDepthDist->sampleRecordCounter(waiters); + + choose { + when(wait(self->version.whenAtLeast(prevVersion))) { + // Update queue depth metric after waiting. + self->queueDepthDist->sampleRecordCounter(self->version.numWaiting()); + return Void(); + } + when(wait(self->checkNeededVersion.onTrigger())) {} + } + } +} + ACTOR Future resolveBatch(Reference self, ResolveTransactionBatchRequest req, Reference const> db) { @@ -267,28 +292,7 @@ ACTOR Future resolveBatch(Reference self, g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterQueueSizeCheck"); } - loop { - if (self->recentStateTransactionsInfo.size() && - proxyInfo.lastVersion <= self->recentStateTransactionsInfo.firstVersion()) { - self->neededVersion.set(std::max(self->neededVersion.get(), req.prevVersion)); - } - - // Update queue depth metric before waiting. Check if we're going to be one of the waiters or not. - int waiters = self->version.numWaiting(); - if (self->version.get() < req.prevVersion) { - waiters++; - } - self->queueDepthDist->sampleRecordCounter(waiters); - - choose { - when(wait(self->version.whenAtLeast(req.prevVersion))) { - // Update queue depth metric after waiting. - self->queueDepthDist->sampleRecordCounter(self->version.numWaiting()); - break; - } - when(wait(self->checkNeededVersion.onTrigger())) {} - } - } + wait(versionReady(self.getPtr(), &proxyInfo, req.prevVersion)); if (check_yield(TaskPriority::DefaultEndpoint)) { wait(delay(0, TaskPriority::Low) || delay(SERVER_KNOBS->COMMIT_SLEEP_TIME)); // FIXME: Is this still right? diff --git a/fdbserver/ResolverBug.cpp b/fdbserver/ResolverBug.cpp new file mode 100644 index 0000000000..08debec3aa --- /dev/null +++ b/fdbserver/ResolverBug.cpp @@ -0,0 +1,24 @@ +/* + * ResolverBug.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "fdbserver/ResolverBug.h" + +std::shared_ptr ResolverBugID::create() const { + return std::make_shared(); +} diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index 9f32329931..2fb9363e03 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -816,6 +816,18 @@ struct TransactionInfo { bool reportConflictingKeys; }; +bool ConflictBatch::ignoreTooOld() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreTooOldProbability; +} + +bool ConflictBatch::ignoreReadSet() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreReadSetProbability; +} + +bool ConflictBatch::ignoreWriteSet() const { + return bugs && deterministicRandom()->random01() < bugs->ignoreWriteSetProbability; +} + void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOldestVersion) { const int t = transactionCount++; @@ -823,14 +835,18 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl TransactionInfo* info = new (arena) TransactionInfo; info->reportConflictingKeys = tr.report_conflicting_keys; - if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size()) { + if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size() && !ignoreTooOld()) { info->tooOld = true; } else { info->tooOld = false; - info->readRanges.resize(arena, tr.read_conflict_ranges.size()); - info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + if (!ignoreReadSet()) { + info->readRanges.resize(arena, tr.read_conflict_ranges.size()); + } + if (!ignoreWriteSet()) { + info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + } - for (int r = 0; r < tr.read_conflict_ranges.size(); r++) { + for (int r = 0; r < info->readRanges.size(); r++) { const KeyRangeRef& range = tr.read_conflict_ranges[r]; points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first); points.emplace_back(range.end, false, false, t, &info->readRanges[r].second); @@ -843,7 +859,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl : nullptr, tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr); } - for (int r = 0; r < tr.write_conflict_ranges.size(); r++) { + for (int r = 0; r < info->writeRanges.size(); r++) { const KeyRangeRef& range = tr.write_conflict_ranges[r]; points.emplace_back(range.begin, true, true, t, &info->writeRanges[r].first); points.emplace_back(range.end, false, true, t, &info->writeRanges[r].second); diff --git a/fdbserver/include/fdbserver/ConflictSet.h b/fdbserver/include/fdbserver/ConflictSet.h index c8a14fcdd3..90ed2c4062 100644 --- a/fdbserver/include/fdbserver/ConflictSet.h +++ b/fdbserver/include/fdbserver/ConflictSet.h @@ -26,6 +26,7 @@ #include #include "fdbclient/CommitTransaction.h" +#include "fdbserver/ResolverBug.h" struct ConflictSet; ConflictSet* newConflictSet(); @@ -63,6 +64,12 @@ private: // Stores the map: a transaction -> conflicted transactions' indices std::map>* conflictingKeyRangeMap; Arena* resolveBatchReplyArena; + std::shared_ptr bugs = SimBugInjector().get(ResolverBugID()); + + // bug injection + bool ignoreTooOld() const; + bool ignoreWriteSet() const; + bool ignoreReadSet() const; void checkIntraBatchConflicts(); void combineWriteConflictRanges(); diff --git a/fdbserver/include/fdbserver/ResolverBug.h b/fdbserver/include/fdbserver/ResolverBug.h new file mode 100644 index 0000000000..de84d44eea --- /dev/null +++ b/fdbserver/include/fdbserver/ResolverBug.h @@ -0,0 +1,44 @@ +/* + * ResolverBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_RESOLVER_BUG_H +#define FDBSERVER_RESOLVER_BUG_H +#pragma once + +#include "flow/SimBugInjector.h" +#include + +struct ResolverBug : public ISimBug { + double ignoreTooOldProbability = 0.0; + double ignoreWriteSetProbability = 0.0; + double ignoreReadSetProbability = 0.0; + + // data used to control lifetime of cycle clients + bool bugFound = false; + unsigned currentPhase = 0; + std::vector cycleState; +}; + +class ResolverBugID : public IBugIdentifier { +public: + std::shared_ptr create() const override; +}; + +#endif // FDBSERVER_RESOLVER_BUG_H diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 3933b3b465..d081729e33 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -49,11 +49,11 @@ struct ConsistencyCheckWorkload : TestWorkload { struct OnTimeout { ConsistencyCheckWorkload& self; explicit OnTimeout(ConsistencyCheckWorkload& self) : self(self) {} - void operator()(StringRef name, StringRef msg, Error const& e) { + void operator()(StringRef name, std::any const& msg, Error const& e) { TraceEvent(SevError, "ConsistencyCheckFailure") .error(e) .detail("EventName", name) - .detail("EventMessage", msg) + .detail("EventMessage", std::any_cast(msg)) .log(); } }; diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp new file mode 100644 index 0000000000..be18959dd9 --- /dev/null +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -0,0 +1,159 @@ +/* + * ResolverBug.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flow/ProcessEvents.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/ResolverBug.h" +#include "fdbserver/ServerDBInfo.actor.h" + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +struct ResolverBugWorkload : TestWorkload { + constexpr static auto NAME = "ResolverBug"; + bool disableFailureInjections; + ResolverBug resolverBug; + Standalone> cycleOptions; + KeyRef controlKey = "workload_control"_sr; + + ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true); + resolverBug.ignoreTooOldProbability = getOption(options, "ignoreTooOldProbability"_sr, 0.0); + resolverBug.ignoreWriteSetProbability = getOption(options, "ignoreWriteSetProbability"_sr, 0.0); + resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0); + + for (auto& o : options) { + bool hadPrefix = false; + if (o.key.startsWith("cycle_"_sr)) { + KeyValueRef option; + option.key = o.key.removePrefix("cycle_"_sr); + option.value = o.value; + cycleOptions.push_back_deep(cycleOptions.arena(), option); + o.value = ""_sr; + } + } + + if (clientId == 0) { + SimBugInjector().enable(); + auto bug = SimBugInjector().enable(ResolverBugID()); + *bug = resolverBug; + bug->cycleState.resize(clientCount, 0); + SimBugInjector().disable(); + } + } + + void disableFailureInjectionWorkloads(std::set& out) const override { + if (disableFailureInjections) { + out.insert("all"); + } + } + + Reference createCycle() { + WorkloadContext wcx; + wcx.clientId = clientId; + wcx.clientCount = clientCount; + wcx.ccr = ccr; + wcx.dbInfo = dbInfo; + wcx.options = cycleOptions; + wcx.sharedRandomNumber = sharedRandomNumber; + wcx.defaultTenant = defaultTenant.castTo(); + return IWorkloadFactory::create("Cycle", wcx); + } + + ACTOR static Future waitForPhase(std::shared_ptr bug, int phase) { + while (bug->currentPhase != phase) { + wait(delay(0.5)); + } + return Void(); + } + + ACTOR static Future waitForPhaseDone(std::shared_ptr bug, int phase, int clientCount) { + while (std::count(bug->cycleState.begin(), bug->cycleState.end(), phase) != clientCount) { + wait(delay(0.5)); + } + return Void(); + } + + struct ReportTraces { + ReportTraces() { g_traceProcessEvents = true; } + ~ReportTraces() { g_traceProcessEvents = false; } + }; + + struct OnTestFailure { + std::shared_ptr bug; + OnTestFailure(std::shared_ptr bug) : bug(bug) {} + + void operator()(StringRef, auto const& data, Error const&) { + BaseTraceEvent* trace = std::any_cast(data); + if (trace->getSeverity() == SevError) { + bug->bugFound = true; + TraceEvent("NegativeTestSuccess").log(); + } + } + }; + + ACTOR static Future driveWorkload(std::shared_ptr bug, int clientCount) { + state ReportTraces _; + state OnTestFailure onTestFailure(bug); + state ProcessEvents::Event ev("TraceEvent::TestFailure"_sr, onTestFailure); + loop { + bug->currentPhase = 1; + wait(waitForPhaseDone(bug, 1, clientCount)); + SimBugInjector().enable(); + bug->currentPhase = 2; + wait(waitForPhaseDone(bug, 2, clientCount)); + SimBugInjector().disable(); + bug->currentPhase = 3; + wait(waitForPhaseDone(bug, 3, clientCount)); + } + } + + ACTOR static Future _start(ResolverBugWorkload* self, Database cx) { + state Reference cycle; + state std::shared_ptr bug = SimBugInjector().get(ResolverBugID()); + state Future f = Never(); + if (self->clientId == 0) { + f = driveWorkload(bug, self->clientCount); + } + while (!bug->bugFound) { + ASSERT(!f.isReady()); + wait(waitForPhase(bug, 1)); + cycle = self->createCycle(); + wait(cycle->setup(cx)); + bug->cycleState[self->clientId] = 1; + wait(waitForPhase(bug, 2)); + wait(cycle->start(cx)); + bug->cycleState[self->clientId] = 2; + wait(waitForPhase(bug, 3)); + wait(success(cycle->check(cx))); + bug->cycleState[self->clientId] = 3; + } + return Void(); + } + + Future start(const Database& cx) override { return _start(this, cx->clone()); } + +private: + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory workloadFactory; + +} // namespace \ No newline at end of file diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 6d4bdee719..8011fd917b 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -55,13 +55,15 @@ struct ProcessEventsImpl { std::map> toRemove; EventMap toInsert; - void trigger(StringRef name, StringRef msg, Error const& e) { + void trigger(StringRef name, std::any const& data, Error const& e) { Triggering _(triggering); auto iter = events.find(name); // strictly speaking this isn't a bug, but having callbacks that aren't caught // by anyone could mean that something was misspelled. Therefore, the safe thing // to do is to abort. - ASSERT(iter != events.end()); + if (iter == events.end()) { + return; + } std::unordered_map callbacks = iter->second; // after we collected all unique callbacks we can call each for (auto const& c : callbacks) { @@ -70,7 +72,7 @@ struct ProcessEventsImpl { // which case attempting to call it will be undefined // behavior. if (toRemove.count(c.first) == 0) { - c.second->callback(name, msg, e); + c.second->callback(name, data, e); } } catch (...) { // callbacks are not allowed to throw @@ -126,8 +128,8 @@ void EventImpl::removeEvent() { namespace ProcessEvents { -void trigger(StringRef name, StringRef msg, Error const& e) { - processEventsImpl.trigger(name, msg, e); +void trigger(StringRef name, std::any const& data, Error const& e) { + processEventsImpl.trigger(name, data, e); } Event::Event(StringRef name, Callback callback) { @@ -146,7 +148,7 @@ TEST_CASE("/flow/ProcessEvents") { { // Basic test unsigned numHits = 0; - Event _("basic"_sr, [&numHits](StringRef n, StringRef, Error const& e) { + Event _("basic"_sr, [&numHits](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "basic"_sr); ASSERT_EQ(e.code(), error_code_success); ++numHits; @@ -161,18 +163,18 @@ TEST_CASE("/flow/ProcessEvents") { std::vector> createdEvents; std::vector numHits; numHits.reserve(2); - Event _("create"_sr, [&](StringRef n, StringRef, Error const& e) { + Event _("create"_sr, [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "create"_sr); ASSERT_EQ(e.code(), error_code_success); ++hits1; numHits.push_back(0); - createdEvents.push_back( - std::make_shared(std::vector{ "create"_sr, "secondaries"_sr }, - [&numHits, idx = numHits.size() - 1](StringRef n, StringRef, Error const& e) { - ASSERT(n == "create"_sr || n == "secondaries"); - ASSERT_EQ(e.code(), error_code_success); - ++numHits[idx]; - })); + createdEvents.push_back(std::make_shared( + std::vector{ "create"_sr, "secondaries"_sr }, + [&numHits, idx = numHits.size() - 1](StringRef n, std::any const&, Error const& e) { + ASSERT(n == "create"_sr || n == "secondaries"); + ASSERT_EQ(e.code(), error_code_success); + ++numHits[idx]; + })); }); trigger("create"_sr, ""_sr, success()); ASSERT_EQ(hits1, 1); @@ -197,7 +199,7 @@ TEST_CASE("/flow/ProcessEvents") { // Remove self unsigned called_self_delete = 0, called_non_delete = 0; std::unique_ptr ev; - auto callback_del = [&](StringRef n, StringRef, Error const& e) { + auto callback_del = [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "deletion"_sr); ASSERT_EQ(e.code(), error_code_success); ++called_self_delete; @@ -205,7 +207,7 @@ TEST_CASE("/flow/ProcessEvents") { ev.reset(); }; ev.reset(new Event("deletion"_sr, callback_del)); - Event _("deletion"_sr, [&](StringRef n, StringRef, Error const& e) { + Event _("deletion"_sr, [&](StringRef n, std::any const&, Error const& e) { ASSERT_EQ(n, "deletion"_sr); ASSERT_EQ(e.code(), error_code_success); ++called_non_delete; diff --git a/flow/SimBugInjector.cpp b/flow/SimBugInjector.cpp new file mode 100644 index 0000000000..647708622b --- /dev/null +++ b/flow/SimBugInjector.cpp @@ -0,0 +1,85 @@ +/* + * SimBugInjector.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/SimBugInjector.h" +#include "flow/network.h" + +#include + +namespace { + +struct SimBugInjectorImpl { + bool isEnabled = true; + std::unordered_map> bugs; +}; + +SimBugInjectorImpl* simBugInjector = nullptr; + +} // namespace + +ISimBug::~ISimBug() {} +IBugIdentifier::~IBugIdentifier() {} + +bool SimBugInjector::isEnabled() const { + return simBugInjector != nullptr && simBugInjector->isEnabled; +} + +void SimBugInjector::enable() { + // SimBugInjector is very dangerous. It will corrupt your data! Therefore, using it outside of simulation is + // not allowed + UNSTOPPABLE_ASSERT(g_network->isSimulated()); + if (simBugInjector == nullptr) { + simBugInjector = new SimBugInjectorImpl(); + } + simBugInjector->isEnabled = true; +} + +void SimBugInjector::disable() { + if (simBugInjector) { + simBugInjector->isEnabled = false; + } +} + +void SimBugInjector::reset() { + if (simBugInjector) { + delete simBugInjector; + } +} + +std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id) const { + if (!isEnabled()) { + return {}; + } + auto it = simBugInjector->bugs.find(std::type_index(typeid(id))); + if (it == simBugInjector->bugs.end()) { + return {}; + } else { + return it->second; + } +} + +std::shared_ptr SimBugInjector::enableImpl(const IBugIdentifier& id) { + UNSTOPPABLE_ASSERT(isEnabled()); + auto& res = simBugInjector->bugs[std::type_index(typeid(id))]; + if (!res) { + res = id.create(); + } + return res; +} \ No newline at end of file diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 2f9a880157..1f29a88427 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -42,6 +42,7 @@ #include "flow/TDMetric.actor.h" #include "flow/MetricSample.h" #include "flow/network.h" +#include "flow/SimBugInjector.h" #ifdef _WIN32 #include @@ -60,6 +61,7 @@ thread_local int g_allocation_tracing_disabled = 1; unsigned tracedLines = 0; thread_local int failedLineOverflow = 0; +bool g_traceProcessEvents = false; ITraceLogIssuesReporter::~ITraceLogIssuesReporter() {} @@ -1305,6 +1307,10 @@ BaseTraceEvent& BaseTraceEvent::backtrace(const std::string& prefix) { } void BaseTraceEvent::log() { + if (g_traceProcessEvents) { + auto name = fmt::format("TraceEvent::{}", type); + ProcessEvents::trigger(StringRef(name), this, success()); + } if (!logged) { init(); ++g_allocation_tracing_disabled; diff --git a/flow/include/flow/ProcessEvents.h b/flow/include/flow/ProcessEvents.h index 2126eab3fe..0272d4e8f3 100644 --- a/flow/include/flow/ProcessEvents.h +++ b/flow/include/flow/ProcessEvents.h @@ -21,13 +21,15 @@ #ifndef FLOW_PROCESS_EVENTS_H #define FLOW_PROCESS_EVENTS_H #include +#include + #include "flow/flow.h" namespace ProcessEvents { // A callback is never allowed to throw. Since std::function can't // take noexcept signatures, this is enforced at runtime -using Callback = std::function; +using Callback = std::function; class Event : NonCopyable { void* impl; @@ -38,7 +40,7 @@ public: ~Event(); }; -void trigger(StringRef name, StringRef msg, Error const& e); +void trigger(StringRef name, std::any const& data, Error const& e); } // namespace ProcessEvents diff --git a/flow/include/flow/SimBugInjector.h b/flow/include/flow/SimBugInjector.h new file mode 100644 index 0000000000..6a3d0f50fa --- /dev/null +++ b/flow/include/flow/SimBugInjector.h @@ -0,0 +1,129 @@ +/* + * SimBugInjector.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_SIM_BUG_INJECTOR_H +#define FLOW_SIM_BUG_INJECTOR_H +#pragma once +#include +#include + +/* + * This file provides a general framework to control how bugs should be injected into FDB. This must not be confused + * with Buggify. Buggify is intended to inject faults and set parameters in ways that FDB has to be able to handle + * correctly. This framework is meant to be used to inject actual bugs into FDB. The use-case for this is to implement + * negative tests. This way we can verify that tests actually catch the bugs that we're expecting them to find. + */ + +/** + * A ISimBug is a logical, polymorphic, representation of a bug and is the main means of communication for code across + * multiple places within the codebase. A user isn't supposed to create instances of ISimBug themselves but instead they + * should implement a corresponding IBugIdentifier subclass which can then be used for indexing through the + * SimBugInjector. + */ +class ISimBug : std::enable_shared_from_this { +public: + virtual ~ISimBug(); +}; + +/* + * Each SimBug class should have a corresponding BugIdentifier + */ +class IBugIdentifier { +public: + virtual ~IBugIdentifier(); + /** + * Creates an instance of the SimBug associated with this + * identifier. The reason we use shared_ptr instead of Reference here + * is so we (1) can use weak references and, more importantly, + * (2) shared_ptr has much better support for polymorphic types + */ + virtual std::shared_ptr create() const = 0; +}; + +/* + * SimBugInjector is a wrapper for a singleton and can therefore be instantiated cheaply as often as one wants. + */ +class SimBugInjector { +public: + explicit SimBugInjector() {} + /** + * Globally enable SimBugInjector + * + * Precondition: g_network->isSimulated() + */ + void enable(); + /** + * Globally disable SimBugInjector. If enable is called later, all current state is restored + * + * Precondition: true + */ + void disable(); + /** + * Globally disable SimBugInjector. Unlike disable, this will also remove all existing state + * + * Precondition: true + */ + void reset(); + /** + * Check whether SimBugInjector is globally enabled + */ + bool isEnabled() const; + + /** + * This method can be used to check whether a given bug has been enabled and then fetch the corresponding + * ISimBug object. + * + * @param id The IBugIdentifier corresponding to this bug + * @return A valid shared_ptr, if the bug has been enabled, nullptr otherwise + * @post enabled(bug(id)) -> result is valid else result is nullptr + */ + template + std::shared_ptr get(IBugIdentifier const& id) { + auto res = getImpl(id); + if (!res) { + return {}; + } + return std::dynamic_pointer_cast(res); + } + + std::shared_ptr getImpl(IBugIdentifier const& id) const; + + /** + * Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a + * bug to call this method if bug injection isn't turned on. + * + * @param id + * @return the SimBug instance + * @pre isEnabled() + * @post result.get() != nullptr + */ + template + std::shared_ptr enable(IBugIdentifier const& id) { + auto res = enableImpl(id); + if (!res) { + return {}; + } + return std::dynamic_pointer_cast(res); + } + + std::shared_ptr enableImpl(IBugIdentifier const& id); +}; + +#endif // FLOW_SIM_BUG_INJECTOR_H diff --git a/flow/include/flow/Trace.h b/flow/include/flow/Trace.h index f813087679..324c52f642 100644 --- a/flow/include/flow/Trace.h +++ b/flow/include/flow/Trace.h @@ -51,6 +51,7 @@ inline static bool TRACE_SAMPLE() { } extern thread_local int g_allocation_tracing_disabled; +extern bool g_traceProcessEvents; // Each major level of severity has 10 levels of minor levels, which are not all // used. when the numbers of severity events in each level are counted, they are @@ -413,6 +414,7 @@ public: std::unique_ptr tmpEventMetric; // This just just a place to store fields const TraceEventFields& getFields() const { return fields; } + Severity getSeverity() const { return severity; } template void moveTo(Object& obj) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 73947b4a0d..4185ccf10e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -449,6 +449,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessCycle.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml new file mode 100644 index 0000000000..495a2acc33 --- /dev/null +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreTooOld" + +[[test.workload]] +testName = "ResolverBug" +ignoreTooOldProbability = 0.1 From 3894d5069e1cdebb9cb07089716f9f7374d0f09b Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 8 Mar 2023 17:15:53 -0700 Subject: [PATCH 08/13] fix compiler error --- fdbserver/workloads/ResolverBug.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp index be18959dd9..e70ea59716 100644 --- a/fdbserver/workloads/ResolverBug.actor.cpp +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -40,7 +40,6 @@ struct ResolverBugWorkload : TestWorkload { resolverBug.ignoreReadSetProbability = getOption(options, "ignoreReadSetProbability"_sr, 0.0); for (auto& o : options) { - bool hadPrefix = false; if (o.key.startsWith("cycle_"_sr)) { KeyValueRef option; option.key = o.key.removePrefix("cycle_"_sr); @@ -149,6 +148,7 @@ struct ResolverBugWorkload : TestWorkload { } Future start(const Database& cx) override { return _start(this, cx->clone()); } + Future check(Database const& cx) override { return true; }; private: void getMetrics(std::vector& m) override {} From 79447c6e06221fe9797be4e350a7bf597a17a5a3 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 13:05:00 -0700 Subject: [PATCH 09/13] First successful negative run --- fdbserver/SkipList.cpp | 11 ++++++- fdbserver/workloads/ResolverBug.actor.cpp | 32 ++++++++++++++------ flow/ProcessEvents.cpp | 4 ++- flow/SimBugInjector.cpp | 37 +++++++++++++++++++++-- flow/Trace.cpp | 8 ++--- flow/include/flow/SimBugInjector.h | 26 ++++++++++++++-- tests/negative/ResolverIgnoreTooOld.toml | 8 ++++- 7 files changed, 103 insertions(+), 23 deletions(-) diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index 2fb9363e03..b48d32c6b6 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -834,16 +834,25 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr, Version newOl Arena& arena = transactionInfo.arena(); TransactionInfo* info = new (arena) TransactionInfo; info->reportConflictingKeys = tr.report_conflicting_keys; + bool tooOld = tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size(); + if (tooOld && ignoreTooOld()) { + bugs->hit(); + tooOld = false; + } - if (tr.read_snapshot < newOldestVersion && tr.read_conflict_ranges.size() && !ignoreTooOld()) { + if (tooOld) { info->tooOld = true; } else { info->tooOld = false; if (!ignoreReadSet()) { info->readRanges.resize(arena, tr.read_conflict_ranges.size()); + } else { + bugs->hit(); } if (!ignoreWriteSet()) { info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); + } else { + bugs->hit(); } for (int r = 0; r < info->readRanges.size(); r++) { diff --git a/fdbserver/workloads/ResolverBug.actor.cpp b/fdbserver/workloads/ResolverBug.actor.cpp index e70ea59716..789497f489 100644 --- a/fdbserver/workloads/ResolverBug.actor.cpp +++ b/fdbserver/workloads/ResolverBug.actor.cpp @@ -32,6 +32,7 @@ struct ResolverBugWorkload : TestWorkload { ResolverBug resolverBug; Standalone> cycleOptions; KeyRef controlKey = "workload_control"_sr; + Promise bugFound; ResolverBugWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { disableFailureInjections = getOption(options, "disableFailureInjections"_sr, true); @@ -103,7 +104,6 @@ struct ResolverBugWorkload : TestWorkload { BaseTraceEvent* trace = std::any_cast(data); if (trace->getSeverity() == SevError) { bug->bugFound = true; - TraceEvent("NegativeTestSuccess").log(); } } }; @@ -126,13 +126,8 @@ struct ResolverBugWorkload : TestWorkload { ACTOR static Future _start(ResolverBugWorkload* self, Database cx) { state Reference cycle; - state std::shared_ptr bug = SimBugInjector().get(ResolverBugID()); - state Future f = Never(); - if (self->clientId == 0) { - f = driveWorkload(bug, self->clientCount); - } - while (!bug->bugFound) { - ASSERT(!f.isReady()); + state std::shared_ptr bug = SimBugInjector().get(ResolverBugID(), true); + loop { wait(waitForPhase(bug, 1)); cycle = self->createCycle(); wait(cycle->setup(cx)); @@ -144,10 +139,27 @@ struct ResolverBugWorkload : TestWorkload { wait(success(cycle->check(cx))); bug->cycleState[self->clientId] = 3; } - return Void(); } - Future start(const Database& cx) override { return _start(this, cx->clone()); } + ACTOR static Future onBug(std::shared_ptr bug) { + loop { + if (bug->bugFound) { + TraceEvent("NegativeTestSuccess").log(); + return Void(); + } + wait(delay(0.5)); + } + } + + Future start(const Database& cx) override { + std::vector> futures; + auto bug = SimBugInjector().get(ResolverBugID(), true); + if (clientId == 0) { + futures.push_back(driveWorkload(bug, clientCount)); + } + futures.push_back(_start(this, cx->clone())); + return onBug(bug) || waitForAll(futures); + } Future check(Database const& cx) override { return true; }; private: diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 8011fd917b..c56c56b578 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -44,7 +44,9 @@ struct ProcessEventsImpl { struct Triggering { bool& value; explicit Triggering(bool& value) : value(value) { - ASSERT(!value); + if (value) { + ASSERT(false); + } value = true; } ~Triggering() { value = false; } diff --git a/flow/SimBugInjector.cpp b/flow/SimBugInjector.cpp index 647708622b..42cf9b9da5 100644 --- a/flow/SimBugInjector.cpp +++ b/flow/SimBugInjector.cpp @@ -22,6 +22,7 @@ #include "flow/network.h" #include +#include namespace { @@ -30,11 +31,38 @@ struct SimBugInjectorImpl { std::unordered_map> bugs; }; +struct ISimBugImpl { + unsigned numHits = 0; + static ISimBugImpl* get(void* self) { return reinterpret_cast(self); } +}; + SimBugInjectorImpl* simBugInjector = nullptr; } // namespace -ISimBug::~ISimBug() {} +ISimBug::ISimBug() : impl(new ISimBugImpl()) {} + +ISimBug::~ISimBug() { + delete ISimBugImpl::get(impl); +} + +std::string ISimBug::name() const { + auto const& typeInfo = typeid(*this); + return boost::core::demangle(typeInfo.name()); +} + +void ISimBug::hit() { + ++ISimBugImpl::get(impl)->numHits; + TraceEvent(SevWarnAlways, "BugInjected").detail("Name", name()).detail("NumHits", numHits()).log(); + this->onHit(); +} + +void ISimBug::onHit() {} + +unsigned ISimBug::numHits() const { + return ISimBugImpl::get(impl)->numHits; +} + IBugIdentifier::~IBugIdentifier() {} bool SimBugInjector::isEnabled() const { @@ -63,8 +91,11 @@ void SimBugInjector::reset() { } } -std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id) const { - if (!isEnabled()) { +std::shared_ptr SimBugInjector::getImpl(const IBugIdentifier& id, bool getDisabled /* = false */) const { + if (!simBugInjector) { + return {}; + } + if (!getDisabled && !isEnabled()) { return {}; } auto it = simBugInjector->bugs.find(std::type_index(typeid(id))); diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 1f29a88427..1c681f9a40 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -1307,10 +1307,6 @@ BaseTraceEvent& BaseTraceEvent::backtrace(const std::string& prefix) { } void BaseTraceEvent::log() { - if (g_traceProcessEvents) { - auto name = fmt::format("TraceEvent::{}", type); - ProcessEvents::trigger(StringRef(name), this, success()); - } if (!logged) { init(); ++g_allocation_tracing_disabled; @@ -1336,6 +1332,10 @@ void BaseTraceEvent::log() { if (isNetworkThread()) { TraceEvent::eventCounts[severity / 10]++; } + if (g_traceProcessEvents) { + auto name = fmt::format("TraceEvent::{}", type); + ProcessEvents::trigger(StringRef(name), this, success()); + } g_traceLog.writeEvent(fields, trackingKey, severity > SevWarnAlways); if (g_traceLog.isOpen()) { diff --git a/flow/include/flow/SimBugInjector.h b/flow/include/flow/SimBugInjector.h index 6a3d0f50fa..290f596ff3 100644 --- a/flow/include/flow/SimBugInjector.h +++ b/flow/include/flow/SimBugInjector.h @@ -38,8 +38,28 @@ * SimBugInjector. */ class ISimBug : std::enable_shared_from_this { + void* impl; + virtual void onHit(); + public: + ISimBug(); virtual ~ISimBug(); + /** + * The name of the bug. By default this will return the class name (using typeid and boost::core::demangle). This is + * supposed to be a human-readable string which can be used to identify the bug when it appears in traces. + * + * @return A human readable string for this bug. + */ + virtual std::string name() const; + /** + * Should be called every time this bug is hit. This method can't be overridden. However, it will call `onHit` which + * can be overriden by a child. + */ + void hit(); + /** + * @return Number of times this bug has been hit (since last call to `SimBugInjector::reset` + */ + unsigned numHits() const; }; /* @@ -95,15 +115,15 @@ public: * @post enabled(bug(id)) -> result is valid else result is nullptr */ template - std::shared_ptr get(IBugIdentifier const& id) { - auto res = getImpl(id); + std::shared_ptr get(IBugIdentifier const& id, bool getDisabled = false) { + auto res = getImpl(id, getDisabled); if (!res) { return {}; } return std::dynamic_pointer_cast(res); } - std::shared_ptr getImpl(IBugIdentifier const& id) const; + std::shared_ptr getImpl(IBugIdentifier const& id, bool getDisabled = false) const; /** * Returns the ISimBug instance associated with id if it already exists. Otherwise it will first create it. It is a diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml index 495a2acc33..1d02c95c8e 100644 --- a/tests/negative/ResolverIgnoreTooOld.toml +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -1,6 +1,12 @@ +[[knobs]] +enable_version_vector = false +max_read_transaction_life_versions = 1000000 +max_write_transaction_life_versions = 1000000 + [[test]] testTitle = "ResolverIgnoreTooOld" [[test.workload]] testName = "ResolverBug" -ignoreTooOldProbability = 0.1 +# old transactions are rare in the first place, so we ignore all of them to make corruptions likely +ignoreTooOldProbability = 1.0 From 3050aa611f3c4b12cc16fc6c67b041733bb9e10e Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 13:55:54 -0700 Subject: [PATCH 10/13] Make process events reentrant save --- flow/ProcessEvents.cpp | 84 +++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index c56c56b578..7eff0736a0 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -42,23 +42,36 @@ struct EventImpl { struct ProcessEventsImpl { struct Triggering { - bool& value; - explicit Triggering(bool& value) : value(value) { - if (value) { - ASSERT(false); - } - value = true; + unsigned& value; + ProcessEventsImpl& processEvents; + explicit Triggering(unsigned& value, ProcessEventsImpl& processEvents) + : value(value), processEvents(processEvents) { + ++value; + } + ~Triggering() { + if (--value == 0) { + // merge modifications back into the event map + for (auto const& p : processEvents.toRemove) { + for (auto const& n : p.second) { + processEvents.events[n].erase(p.first); + } + } + processEvents.toRemove.clear(); + for (auto const& p : processEvents.toInsert) { + processEvents.events[p.first].insert(p.second.begin(), p.second.end()); + } + processEvents.toInsert.clear(); + } } - ~Triggering() { value = false; } }; using EventMap = std::unordered_map>; - bool triggering = false; + unsigned triggering = 0; EventMap events; std::map> toRemove; EventMap toInsert; void trigger(StringRef name, std::any const& data, Error const& e) { - Triggering _(triggering); + Triggering _(triggering, *this); auto iter = events.find(name); // strictly speaking this isn't a bug, but having callbacks that aren't caught // by anyone could mean that something was misspelled. Therefore, the safe thing @@ -66,7 +79,7 @@ struct ProcessEventsImpl { if (iter == events.end()) { return; } - std::unordered_map callbacks = iter->second; + std::unordered_map& callbacks = iter->second; // after we collected all unique callbacks we can call each for (auto const& c : callbacks) { try { @@ -81,17 +94,6 @@ struct ProcessEventsImpl { UNSTOPPABLE_ASSERT(false); } } - // merge modifications back into the event map - for (auto const& p : toRemove) { - for (auto const& n : p.second) { - events[n].erase(p.first); - } - } - toRemove.clear(); - for (auto const& p : toInsert) { - events[p.first].insert(p.second.begin(), p.second.end()); - } - toInsert.clear(); } void add(StringRef const& name, EventImpl* event) { @@ -107,7 +109,30 @@ struct ProcessEventsImpl { void remove(std::vector names, EventImpl::Id id) { if (triggering) { - toRemove.emplace(id, std::move(names)); + // it's possible that the event hasn't been added yet + bool inInsertMap = false; + for (auto const& name : names) { + auto it = toInsert.find(name); + if (it == toInsert.end()) { + // either all are in the insert map or none + ASSERT(!inInsertMap); + break; + } + auto it2 = it->second.find(id); + if (it2 == it->second.end()) { + // either all are in the insert map or none + ASSERT(!inInsertMap); + break; + } + inInsertMap = true; + it->second.erase(it2); + if (it->second.empty()) { + toInsert.erase(it); + } + } + if (!inInsertMap) { + toRemove.emplace(id, std::move(names)); + } } else { for (auto const& name : names) { events[name].erase(id); @@ -219,6 +244,21 @@ TEST_CASE("/flow/ProcessEvents") { ASSERT_EQ(called_self_delete, 1); ASSERT_EQ(called_non_delete, 2); } + { + // Reentrant safe + Event ev("reentrant"_sr, [](StringRef, std::any const& data, Error const&) { + // call depth of 5 + auto v = std::any_cast(data); + if (v < 5) { + Event doNotCall("reentrant"_sr, [](StringRef, std::any const&, Error const&) { + // should never be called + ASSERT(false); + }); + trigger("reentrant"_sr, v + 1, success()); + } + }); + trigger("reentrant"_sr, 0, success()); + } return Void(); } From e5444dd4e158dc008c9526264b5f90e9fffd05b9 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Thu, 9 Mar 2023 17:25:05 -0700 Subject: [PATCH 11/13] Adding additional tests --- tests/CMakeLists.txt | 2 ++ tests/negative/ResolverIgnoreReads.toml | 6 ++++++ tests/negative/ResolverIgnoreTooOld.toml | 3 +-- tests/negative/ResolverIgnoreWrites.toml | 6 ++++++ 4 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 tests/negative/ResolverIgnoreReads.toml create mode 100644 tests/negative/ResolverIgnoreWrites.toml diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4185ccf10e..e6eaa761eb 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -450,6 +450,8 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.toml) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreReads.toml) + add_fdb_test(TEST_FILES negative/ResolverIgnoreWrites.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/ResolverIgnoreReads.toml b/tests/negative/ResolverIgnoreReads.toml new file mode 100644 index 0000000000..47c79531de --- /dev/null +++ b/tests/negative/ResolverIgnoreReads.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreReads" + +[[test.workload]] +testName = "ResolverBug" +ignoreReadSetProbability = 0.01 diff --git a/tests/negative/ResolverIgnoreTooOld.toml b/tests/negative/ResolverIgnoreTooOld.toml index 1d02c95c8e..6ce5a17a33 100644 --- a/tests/negative/ResolverIgnoreTooOld.toml +++ b/tests/negative/ResolverIgnoreTooOld.toml @@ -8,5 +8,4 @@ testTitle = "ResolverIgnoreTooOld" [[test.workload]] testName = "ResolverBug" -# old transactions are rare in the first place, so we ignore all of them to make corruptions likely -ignoreTooOldProbability = 1.0 +ignoreTooOldProbability = 0.2 diff --git a/tests/negative/ResolverIgnoreWrites.toml b/tests/negative/ResolverIgnoreWrites.toml new file mode 100644 index 0000000000..3b5c834feb --- /dev/null +++ b/tests/negative/ResolverIgnoreWrites.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = "ResolverIgnoreReads" + +[[test.workload]] +testName = "ResolverBug" +ignoreWriteSetProbability = 0.01 From 303b833d7b23b1ded1962c26cafbe6275b0eef94 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Fri, 10 Mar 2023 23:29:54 -0700 Subject: [PATCH 12/13] Adding data corruption test to verify consistency check --- .../include/fdbserver/StorageCorruptionBug.h | 34 ++++++++ fdbserver/storageserver.actor.cpp | 31 +++++++- .../workloads/StorageCorruption.actor.cpp | 78 +++++++++++++++++++ flow/ProcessEvents.cpp | 4 + flow/include/flow/ProcessEvents.h | 1 + tests/CMakeLists.txt | 1 + tests/negative/StorageCorruption.toml | 21 +++++ 7 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 fdbserver/include/fdbserver/StorageCorruptionBug.h create mode 100644 fdbserver/workloads/StorageCorruption.actor.cpp create mode 100644 tests/negative/StorageCorruption.toml diff --git a/fdbserver/include/fdbserver/StorageCorruptionBug.h b/fdbserver/include/fdbserver/StorageCorruptionBug.h new file mode 100644 index 0000000000..ea2c826dd4 --- /dev/null +++ b/fdbserver/include/fdbserver/StorageCorruptionBug.h @@ -0,0 +1,34 @@ +/* + * StorageCorruptionBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#ifndef FDBSERVER_STORAGE_CORRUPTION_BUG_H +#define FDBSERVER_STORAGE_CORRUPTION_BUG_H +#include "flow/SimBugInjector.h" + +class StorageCorruptionBug : public ISimBug { +public: + double corruptionProbability = 0.001; +}; +class StorageCorruptionBugID : public IBugIdentifier { +public: + std::shared_ptr create() const override { return std::make_shared(); } +}; + +#endif // FDBSERVER_STORAGE_CORRUPTION_BUG_H diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index f3c59ae471..d8d6bee49d 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -92,6 +92,7 @@ #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" +#include "fdbserver/StorageCorruptionBug.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/Error.h" @@ -458,6 +459,7 @@ private: struct StorageServer* data; IKeyValueStore* storage; void writeMutations(const VectorRef& mutations, Version debugVersion, const char* debugContext); + void writeMutationsBuggy(const VectorRef& mutations, Version debugVersion, const char* debugContext); ACTOR static Future readFirstKey(IKeyValueStore* storage, KeyRangeRef range, Optional options) { RangeResult r = wait(storage->readRange(range, 1, 1 << 30, options)); @@ -10368,6 +10370,29 @@ void StorageServerDisk::writeMutation(MutationRef mutation) { ASSERT(false); } +void StorageServerDisk::writeMutationsBuggy(const VectorRef& mutations, + Version debugVersion, + const char* debugContext) { + auto bug = SimBugInjector().get(StorageCorruptionBugID()); + if (!bug) { + writeMutations(mutations, debugVersion, debugContext); + } + int begin = 0; + while (begin < mutations.size()) { + int i; + for (i = begin; i < mutations.size(); ++i) { + if (deterministicRandom()->random01() < bug->corruptionProbability) { + bug->hit(); + break; + } + } + writeMutations(mutations.slice(begin, i), debugVersion, debugContext); + // we want to drop the mutation at i (unless i == mutations.size(), in which case this will just finish the + // loop) + begin = i + 1; + } +} + void StorageServerDisk::writeMutations(const VectorRef& mutations, Version debugVersion, const char* debugContext) { @@ -10400,7 +10425,11 @@ bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion); // TODO(alexmiller): Update to version tracking. // DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef()); - writeMutations(v.mutations, v.version, "makeVersionDurable"); + if (!SimBugInjector().isEnabled()) { + writeMutations(v.mutations, v.version, "makeVersionDurable"); + } else { + writeMutationsBuggy(v.mutations, v.version, "makeVersionDurable"); + } for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m); prevStorageVersion = v.version; diff --git a/fdbserver/workloads/StorageCorruption.actor.cpp b/fdbserver/workloads/StorageCorruption.actor.cpp new file mode 100644 index 0000000000..89d0eb298a --- /dev/null +++ b/fdbserver/workloads/StorageCorruption.actor.cpp @@ -0,0 +1,78 @@ +/* + * StorageCorruptionBug.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/StorageCorruptionBug.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "flow/ProcessEvents.h" + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +struct StorageCorruptionWorkload : TestWorkload { + constexpr static auto NAME = "StorageCorruption"; + using Self = StorageCorruptionWorkload; + std::shared_ptr bug; + SimBugInjector bugInjector; + double testDuration; + + StorageCorruptionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + bugInjector.enable(); + bug = bugInjector.enable(StorageCorruptionBugID()); + bugInjector.disable(); + bug->corruptionProbability = getOption(options, "corruptionProbability"_sr, 0.001); + testDuration = getOption(options, "testDuration"_sr, 60.0); + } + + void disableFailureInjectionWorkloads(std::set& out) const override { out.insert("all"); } + + ACTOR static Future _start(Self* self, Database cx) { + wait(success(setDDMode(cx, 0))); + self->bugInjector.enable(); + wait(delay(self->testDuration)); + self->bug->corruptionProbability = 0.0; + TraceEvent("CorruptionInjections").detail("NumCorruptions", self->bug->numHits()).log(); + self->bugInjector.disable(); + ProcessEvents::uncancellableEvent("ConsistencyCheckFailure"_sr, + [](StringRef, std::any const& data, Error const&) { + if (std::any_cast(data)->getSeverity() == SevError) { + TraceEvent("NegativeTestSuccess"); + } + }); + wait(success(setDDMode(cx, 1))); + return Void(); + } + + Future start(Database const& cx) override { + if (clientId != 0) { + return Void(); + } + return _start(this, cx->clone()); + } + Future check(Database const& cx) override { return true; } + +private: + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory factory; + +} // namespace \ No newline at end of file diff --git a/flow/ProcessEvents.cpp b/flow/ProcessEvents.cpp index 7eff0736a0..44b1cdc4e9 100644 --- a/flow/ProcessEvents.cpp +++ b/flow/ProcessEvents.cpp @@ -159,6 +159,10 @@ void trigger(StringRef name, std::any const& data, Error const& e) { processEventsImpl.trigger(name, data, e); } +void uncancellableEvent(StringRef name, Callback callback) { + new EventImpl({ name }, std::move(callback)); +} + Event::Event(StringRef name, Callback callback) { impl = new EventImpl({ std::move(name) }, std::move(callback)); } diff --git a/flow/include/flow/ProcessEvents.h b/flow/include/flow/ProcessEvents.h index 0272d4e8f3..dc13102cd4 100644 --- a/flow/include/flow/ProcessEvents.h +++ b/flow/include/flow/ProcessEvents.h @@ -40,6 +40,7 @@ public: ~Event(); }; +void uncancellableEvent(StringRef name, Callback callback); void trigger(StringRef name, std::any const& data, Error const& e); } // namespace ProcessEvents diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e6eaa761eb..5519efd218 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -452,6 +452,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES negative/ResolverIgnoreTooOld.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreReads.toml) add_fdb_test(TEST_FILES negative/ResolverIgnoreWrites.toml) + add_fdb_test(TEST_FILES negative/StorageCorruption.toml) add_fdb_test(TEST_FILES ParallelRestoreOldBackupApiCorrectnessAtomicRestore.toml IGNORE) # Note that status tests are not deterministic. diff --git a/tests/negative/StorageCorruption.toml b/tests/negative/StorageCorruption.toml new file mode 100644 index 0000000000..b73c7d6a59 --- /dev/null +++ b/tests/negative/StorageCorruption.toml @@ -0,0 +1,21 @@ +[[test]] +testTitle = "StorageCorruption" + +[[test.workload]] +testName = "StorageCorruption" +corruptionProbability = 0.001 +testDuration = 60.0 + +[[test.workload]] +testName = 'ReadWrite' +testDuration = 60.0 +transactionsPerSecond = 200 +writesPerTransactionA = 5 +readsPerTransactionA = 1 +writesPerTransactionB = 10 +readsPerTransactionB = 1 +alpha = 0.5 +nodeCount = 10000 +valueBytes = 128 +discardEdgeMeasurements = false +warmingDelay = 10.0 \ No newline at end of file From b4bc24ae2c32d8486611af65dae78cbda49b94d6 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Wed, 15 Mar 2023 14:06:33 -0700 Subject: [PATCH 13/13] Fix includes (these broke the windows build) --- flow/include/flow/SimBugInjector.h | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/include/flow/SimBugInjector.h b/flow/include/flow/SimBugInjector.h index 290f596ff3..69924a744e 100644 --- a/flow/include/flow/SimBugInjector.h +++ b/flow/include/flow/SimBugInjector.h @@ -23,6 +23,7 @@ #pragma once #include #include +#include /* * This file provides a general framework to control how bugs should be injected into FDB. This must not be confused