diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index f46d491a28..e077e6a975 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -62,6 +62,7 @@ set(FDBCLIENT_SRCS GlobalConfig.actor.h GlobalConfig.actor.cpp GrvProxyInterface.h + HighContentionAllocator.actor.h HTTP.actor.cpp IClientApi.h IConfigTransaction.cpp diff --git a/fdbclient/HighContentionAllocator.actor.h b/fdbclient/HighContentionAllocator.actor.h new file mode 100644 index 0000000000..f077939296 --- /dev/null +++ b/fdbclient/HighContentionAllocator.actor.h @@ -0,0 +1,141 @@ +/* + * HighContentionAllocator.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source +// version. +#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_HIGHCONTENTIONALLOCATOR_ACTOR_G_H) +#define FDBCLIENT_HIGHCONTENTIONALLOCATOR_ACTOR_G_H +#include "fdbclient/HighContentionAllocator.actor.g.h" +#elif !defined(FDBCLIENT_HIGHCONTENTIONALLOCATOR_ACTOR_H) +#define FDBCLIENT_HIGHCONTENTIONALLOCATOR_ACTOR_H + +#include "fdbclient/ClientBooleanParams.h" +#include "fdbclient/CommitTransaction.h" +#include "fdbclient/FDBOptions.g.h" +#include "fdbclient/Subspace.h" +#include "flow/UnitTest.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +class HighContentionAllocator { +public: + HighContentionAllocator(Subspace subspace) : counters(subspace.get(0)), recent(subspace.get(1)) {} + + template + Future> allocate(Reference tr) { + return allocate(this, tr); + } + + static int64_t windowSize(int64_t start) { + if (start < 255) { + return 64; + } + if (start < 65535) { + return 1024; + } + + return 8192; + } + +private: + Subspace counters; + Subspace recent; + + ACTOR template + Future> allocate(HighContentionAllocator* self, Reference tr) { + state int64_t start = 0; + state int64_t window = 0; + + loop { + RangeResult range = + wait(safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True))); + + if (range.size() > 0) { + start = self->counters.unpack(range[0].key).getInt(0); + } + + state bool windowAdvanced = false; + loop { + // if thread safety is needed, this should be locked { + if (windowAdvanced) { + tr->clear(KeyRangeRef(self->counters.key(), self->counters.get(start).key())); + tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); + tr->clear(KeyRangeRef(self->recent.key(), self->recent.get(start).key())); + } + + int64_t inc = 1; + tr->atomicOp(self->counters.get(start).key(), StringRef((uint8_t*)&inc, 8), MutationRef::AddValue); + Future> countFuture = + safeThreadFutureToFuture(tr->get(self->counters.get(start).key(), Snapshot::True)); + // } + + Optional countValue = wait(countFuture); + + int64_t count = 0; + if (countValue.present()) { + if (countValue.get().size() != 8) { + throw invalid_directory_layer_metadata(); + } + count = *(int64_t*)countValue.get().begin(); + } + + window = HighContentionAllocator::windowSize(start); + if (count * 2 < window) { + break; + } + + start += window; + windowAdvanced = true; + } + + loop { + state int64_t candidate = deterministicRandom()->randomInt(start, start + window); + + // if thread safety is needed, this should be locked { + state Future latestCounterFuture = + safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True)); + state Future> candidateValueFuture = + safeThreadFutureToFuture(tr->get(self->recent.get(candidate).key())); + tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); + tr->set(self->recent.get(candidate).key(), ValueRef()); + // } + + wait(success(latestCounterFuture) && success(candidateValueFuture)); + int64_t currentWindowStart = 0; + if (latestCounterFuture.get().size() > 0) { + currentWindowStart = self->counters.unpack(latestCounterFuture.get()[0].key).getInt(0); + } + + if (currentWindowStart > start) { + break; + } + + if (!candidateValueFuture.get().present()) { + tr->addWriteConflictRange(singleKeyRange(self->recent.get(candidate).key())); + return Tuple().append(candidate).pack(); + } + } + } + } +}; + +#include "flow/unactorcompiler.h" +#endif \ No newline at end of file diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 39338739a2..cd93fed41a 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -197,6 +197,7 @@ set(FDBSERVER_SRCS workloads/FuzzApiCorrectness.actor.cpp workloads/GetRangeStream.actor.cpp workloads/HealthMetricsApi.actor.cpp + workloads/HighContentionAllocatorWorkload.actor.cpp workloads/IncrementalBackup.actor.cpp workloads/Increment.actor.cpp workloads/IndexScan.actor.cpp diff --git a/fdbserver/workloads/HighContentionAllocatorWorkload.actor.cpp b/fdbserver/workloads/HighContentionAllocatorWorkload.actor.cpp new file mode 100644 index 0000000000..11adafe3e5 --- /dev/null +++ b/fdbserver/workloads/HighContentionAllocatorWorkload.actor.cpp @@ -0,0 +1,159 @@ +/* + * HighContentionAllocatorWorkload.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/HighContentionAllocator.actor.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// This workload tests the basic contract of the high contention allocator +struct HighContentionAllocatorWorkload : TestWorkload { + static constexpr const char* NAME = "HighContentionAllocator"; + + Subspace allocatorSubspace; + HighContentionAllocator allocator; + int numRounds; + int maxTransactionsPerRound; + int maxAllocationsPerTransaction; + + int expectedPrefixes = 0; + std::set allocatedPrefixes; + + HighContentionAllocatorWorkload(WorkloadContext const& wcx) + : TestWorkload(wcx), allocatorSubspace("test_subspace"_sr), allocator(allocatorSubspace) { + numRounds = getOption(options, LiteralStringRef("numRounds"), 500); + maxTransactionsPerRound = getOption(options, LiteralStringRef("maxTransactionsPerRound"), 20); + maxAllocationsPerTransaction = getOption(options, LiteralStringRef("maxAllocationsPerTransaction"), 20); + } + + std::string description() const override { return HighContentionAllocatorWorkload::NAME; } + + Future setup(Database const& cx) override { return Void(); } + + ACTOR static Future runAllocationTransaction(Database cx, HighContentionAllocatorWorkload* self) { + state Reference tr = cx->createTransaction(); + + state int numAllocations = deterministicRandom()->randomInt(1, self->maxAllocationsPerTransaction + 1); + self->expectedPrefixes += numAllocations; + + loop { + try { + state std::vector> futures; + for (int i = 0; i < numAllocations; ++i) { + futures.push_back(self->allocator.allocate(tr)); + } + + wait(waitForAll(futures)); + wait(tr->commit()); + + for (auto f : futures) { + Key prefix = f.get(); + + // There should be no previously allocated prefix that is prefixed by our newly allocated one + auto itr = self->allocatedPrefixes.lower_bound(prefix); + if (itr != self->allocatedPrefixes.end() && itr->startsWith(prefix)) { + TraceEvent(SevError, "HighContentionAllocationWorkloadFailure") + .detail("Reason", "Prefix collision") + .detail("AllocatedPrefix", prefix) + .detail("PreviousPrefix", *itr); + + ASSERT(false); + } + + // There should be no previously allocated prefix that is a prefix of our newly allocated one + if (itr != self->allocatedPrefixes.begin()) { + --itr; + + if (prefix.startsWith(*itr)) { + TraceEvent(SevError, "HighContentionAllocationWorkloadFailure") + .detail("Reason", "Prefix collision") + .detail("AllocatedPrefix", prefix) + .detail("PreviousPrefix", *itr); + + ASSERT(false); + } + } + + // This is technically redundant, but the prefix should not have been allocated previously + ASSERT(self->allocatedPrefixes.insert(f.get()).second); + } + + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + return Void(); + } + + ACTOR static Future runTest(Database cx, HighContentionAllocatorWorkload* self) { + state int roundNum = 0; + for (; roundNum < self->numRounds; ++roundNum) { + std::vector> futures; + int numTransactions = deterministicRandom()->randomInt(1, self->maxTransactionsPerRound + 1); + for (int i = 0; i < numTransactions; ++i) { + futures.push_back(runAllocationTransaction(cx, self)); + } + + wait(waitForAll(futures)); + } + + return Void(); + } + + Future start(Database const& cx) override { return runTest(cx, this); } + + ACTOR static Future _check(Database cx, HighContentionAllocatorWorkload* self) { + if (self->expectedPrefixes != self->allocatedPrefixes.size()) { + TraceEvent(SevError, "HighContentionAllocationWorkloadFailure") + .detail("Reason", "Incorrect Number of Prefixes Allocated") + .detail("NumAllocated", self->allocatedPrefixes.size()) + .detail("Expected", self->expectedPrefixes); + + return false; + } + + state Reference tr = cx->createTransaction(); + loop { + try { + state Key k1 = wait(tr->getKey(firstGreaterOrEqual(""_sr))); + Key k2 = wait(tr->getKey(lastLessThan("\xff"_sr))); + if (!k1.startsWith(self->allocatorSubspace.key()) || !k2.startsWith(self->allocatorSubspace.key())) { + TraceEvent(SevError, "HighContentionAllocationWorkloadFailure") + .detail("Reason", "Keys written outside allocator subspace") + .detail("MinKey", k1) + .detail("MaxKey", k2); + + return false; + } + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + return true; + } + Future check(Database const& cx) override { return _check(cx, this); } + + void getMetrics(std::vector& m) override {} +}; +WorkloadFactory HighContentionAllocatorWorkload(HighContentionAllocatorWorkload::NAME); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4c1b09be55..fff1693645 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -186,6 +186,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml) add_fdb_test(TEST_FILES rare/CycleWithKills.toml) add_fdb_test(TEST_FILES rare/FuzzTest.toml) + add_fdb_test(TEST_FILES rare/HighContentionAllocator.toml) add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml) add_fdb_test(TEST_FILES rare/LargeApiCorrectness.toml) add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.toml) diff --git a/tests/rare/HighContentionAllocator.toml b/tests/rare/HighContentionAllocator.toml new file mode 100644 index 0000000000..940365d1f0 --- /dev/null +++ b/tests/rare/HighContentionAllocator.toml @@ -0,0 +1,5 @@ +[[test]] +testTitle = 'HighContentionAllocator' + + [[test.workload]] + testName = 'HighContentionAllocator'