diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index f698b06a54..9e731564f5 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -82,7 +82,8 @@ extern "C" DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_erro code == error_code_grv_proxy_memory_limit_exceeded || code == error_code_commit_proxy_memory_limit_exceeded || code == error_code_batch_transaction_throttled || code == error_code_process_behind || - code == error_code_tag_throttled || code == error_code_unknown_tenant; + code == error_code_tag_throttled || code == error_code_unknown_tenant || + code == error_code_proxy_tag_throttled; } return false; } diff --git a/bindings/c/test/apitester/TesterApiWorkload.cpp b/bindings/c/test/apitester/TesterApiWorkload.cpp index 9001512d27..efbfe35a53 100644 --- a/bindings/c/test/apitester/TesterApiWorkload.cpp +++ b/bindings/c/test/apitester/TesterApiWorkload.cpp @@ -70,10 +70,13 @@ void ApiWorkload::start() { schedule([this]() { // 1. Clear data clearData([this]() { - // 2. Populate initial data - populateData([this]() { - // 3. Generate random workload - runTests(); + // 2. Workload setup + setup([this]() { + // 3. Populate initial data + populateData([this]() { + // 4. Generate random workload + runTests(); + }); }); }); }); @@ -249,6 +252,10 @@ void ApiWorkload::populateData(TTaskFct cont) { } } +void ApiWorkload::setup(TTaskFct cont) { + schedule(cont); +} + void ApiWorkload::randomInsertOp(TTaskFct cont, std::optional tenantId) { int numKeys = Random::get().randomInt(1, maxKeysPerTransaction); auto kvPairs = std::make_shared>(); @@ -322,4 +329,85 @@ std::optional ApiWorkload::getTenant(std::optional tenantId) } } +std::string ApiWorkload::debugTenantStr(std::optional tenantId) { + return tenantId.has_value() ? fmt::format("(tenant {0})", tenantId.value()) : "()"; +} + +// BlobGranule setup. +// This blobbifies ['\x00', '\xff') per tenant or for the whole database if there are no tenants. +void ApiWorkload::setupBlobGranules(TTaskFct cont) { + // This count is used to synchronize the # of tenant blobbifyRange() calls to ensure + // we only start the workload once blobbification has fully finished. + auto blobbifiedCount = std::make_shared>(1); + + if (tenants.empty()) { + blobbifiedCount->store(1); + blobbifyTenant({}, blobbifiedCount, cont); + } else { + blobbifiedCount->store(tenants.size()); + for (int i = 0; i < tenants.size(); i++) { + schedule([=]() { blobbifyTenant(i, blobbifiedCount, cont); }); + } + } +} + +void ApiWorkload::blobbifyTenant(std::optional tenantId, + std::shared_ptr> blobbifiedCount, + TTaskFct cont) { + auto retBlobbifyRange = std::make_shared(false); + execOperation( + [=](auto ctx) { + fdb::Key begin(1, '\x00'); + fdb::Key end(1, '\xff'); + + info(fmt::format("setup: blobbifying {}: [\\x00 - \\xff)\n", debugTenantStr(tenantId))); + + fdb::Future f = ctx->dbOps()->blobbifyRange(begin, end).eraseType(); + ctx->continueAfter(f, [ctx, retBlobbifyRange, f]() { + *retBlobbifyRange = f.get(); + ctx->done(); + }); + }, + [=]() { + if (!*retBlobbifyRange) { + schedule([=]() { blobbifyTenant(tenantId, blobbifiedCount, cont); }); + } else { + schedule([=]() { verifyTenant(tenantId, blobbifiedCount, cont); }); + } + }, + /*tenant=*/getTenant(tenantId), + /* failOnError = */ false); +} + +void ApiWorkload::verifyTenant(std::optional tenantId, + std::shared_ptr> blobbifiedCount, + TTaskFct cont) { + auto retVerifyVersion = std::make_shared(-1); + + execOperation( + [=](auto ctx) { + fdb::Key begin(1, '\x00'); + fdb::Key end(1, '\xff'); + + info(fmt::format("setup: verifying {}: [\\x00 - \\xff)\n", debugTenantStr(tenantId))); + + fdb::Future f = ctx->dbOps()->verifyBlobRange(begin, end, /*latest_version*/ -2).eraseType(); + ctx->continueAfter(f, [ctx, retVerifyVersion, f]() { + *retVerifyVersion = f.get(); + ctx->done(); + }); + }, + [=]() { + if (*retVerifyVersion == -1) { + schedule([=]() { verifyTenant(tenantId, blobbifiedCount, cont); }); + } else { + if (blobbifiedCount->fetch_sub(1) == 1) { + schedule(cont); + } + } + }, + /*tenant=*/getTenant(tenantId), + /* failOnError = */ false); +} + } // namespace FdbApiTester diff --git a/bindings/c/test/apitester/TesterApiWorkload.h b/bindings/c/test/apitester/TesterApiWorkload.h index a3a13e964d..71590eeb65 100644 --- a/bindings/c/test/apitester/TesterApiWorkload.h +++ b/bindings/c/test/apitester/TesterApiWorkload.h @@ -41,6 +41,9 @@ public: virtual void checkProgress() override; + // Workload specific setup phase. + virtual void setup(TTaskFct cont); + // Running specific tests // The default implementation generates a workload consisting of // random operations generated by randomOperation @@ -126,6 +129,12 @@ protected: void randomClearRangeOp(TTaskFct cont, std::optional tenantId); std::optional getTenant(std::optional tenantId); + std::string debugTenantStr(std::optional tenantId); + + // Generic BlobGranules setup. + void setupBlobGranules(TTaskFct cont); + void blobbifyTenant(std::optional tenantId, std::shared_ptr> blobbifiedCount, TTaskFct cont); + void verifyTenant(std::optional tenantId, std::shared_ptr> blobbifiedCount, TTaskFct cont); private: void populateDataTx(TTaskFct cont, std::optional tenantId); diff --git a/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp index a4ce988c5c..6c8fc911fa 100644 --- a/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp @@ -52,26 +52,23 @@ private: }; std::vector excludedOpTypes; + void setup(TTaskFct cont) override { setupBlobGranules(cont); } + // Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet // FIXME: should still guarantee a read succeeds eventually somehow - // FIXME: this needs to be per tenant if tenant ids are set std::unordered_set> tenantsWithReadSuccess; inline void setReadSuccess(std::optional tenantId) { tenantsWithReadSuccess.insert(tenantId); } inline bool seenReadSuccess(std::optional tenantId) { return tenantsWithReadSuccess.count(tenantId); } - std::string tenantDebugString(std::optional tenantId) { - return tenantId.has_value() ? fmt::format(" (tenant {0})", tenantId.value()) : ""; - } - void debugOp(std::string opName, fdb::Key begin, fdb::Key end, std::optional tenantId, std::string message) { if (BG_API_DEBUG_VERBOSE) { - info(fmt::format("{0}: [{1} - {2}){3}: {4}", + info(fmt::format("{0}: [{1} - {2}) {3}: {4}", opName, fdb::toCharsRef(begin), fdb::toCharsRef(end), - tenantDebugString(tenantId), + debugTenantStr(tenantId), message)); } } @@ -117,7 +114,7 @@ private: results.get()->assign(resVector.begin(), resVector.end()); bool previousSuccess = seenReadSuccess(tenantId); if (!previousSuccess) { - info(fmt::format("Read{0}: first success\n", tenantDebugString(tenantId))); + info(fmt::format("Read {0}: first success\n", debugTenantStr(tenantId))); setReadSuccess(tenantId); } else { debugOp("Read", begin, end, tenantId, "complete"); @@ -289,20 +286,19 @@ private: } // TODO: tenant support - void randomGetBlobRangesOp(TTaskFct cont) { + void randomGetBlobRangesOp(TTaskFct cont, std::optional tenantId) { fdb::Key begin = randomKeyName(); fdb::Key end = randomKeyName(); auto results = std::make_shared>(); if (begin > end) { std::swap(begin, end); } - std::optional tenantId = {}; debugOp("GetBlobRanges", begin, end, tenantId, "starting"); execOperation( [begin, end, results](auto ctx) { - fdb::Future f = ctx->db().listBlobbifiedRanges(begin, end, 1000).eraseType(); + fdb::Future f = ctx->dbOps()->listBlobbifiedRanges(begin, end, 1000).eraseType(); ctx->continueAfter(f, [ctx, f, results]() { *results = copyKeyRangeArray(f.get()); ctx->done(); @@ -314,25 +310,24 @@ private: this->validateRanges(results, begin, end, seenReadSuccess(tenantId)); schedule(cont); }, + getTenant(tenantId), /* failOnError = */ false); } // TODO: tenant support - void randomVerifyOp(TTaskFct cont) { + void randomVerifyOp(TTaskFct cont, std::optional tenantId) { fdb::Key begin = randomKeyName(); fdb::Key end = randomKeyName(); - std::optional tenantId; if (begin > end) { std::swap(begin, end); } - auto verifyVersion = std::make_shared(false); - debugOp("Verify", begin, end, tenantId, "starting"); + auto verifyVersion = std::make_shared(-1); execOperation( [begin, end, verifyVersion](auto ctx) { - fdb::Future f = ctx->db().verifyBlobRange(begin, end, -2 /* latest version*/).eraseType(); + fdb::Future f = ctx->dbOps()->verifyBlobRange(begin, end, -2 /* latest version*/).eraseType(); ctx->continueAfter(f, [ctx, verifyVersion, f]() { *verifyVersion = f.get(); ctx->done(); @@ -344,15 +339,16 @@ private: if (*verifyVersion == -1) { ASSERT(!previousSuccess); } else if (!previousSuccess) { - info(fmt::format("Verify{0}: first success\n", tenantDebugString(tenantId))); + info(fmt::format("Verify {0}: first success\n", debugTenantStr(tenantId))); setReadSuccess(tenantId); } schedule(cont); }, + getTenant(tenantId), /* failOnError = */ false); } - void randomOperation(TTaskFct cont) { + void randomOperation(TTaskFct cont) override { std::optional tenantId = randomTenant(); OpType txType = (stores[tenantId].size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST); @@ -380,10 +376,10 @@ private: randomSummarizeOp(cont, tenantId); break; case OP_GET_BLOB_RANGES: - randomGetBlobRangesOp(cont); + randomGetBlobRangesOp(cont, tenantId); break; case OP_VERIFY: - randomVerifyOp(cont); + randomVerifyOp(cont, tenantId); break; } } diff --git a/bindings/c/test/apitester/TesterBlobGranuleErrorsWorkload.cpp b/bindings/c/test/apitester/TesterBlobGranuleErrorsWorkload.cpp index b4bcaacdc6..d4a0383119 100644 --- a/bindings/c/test/apitester/TesterBlobGranuleErrorsWorkload.cpp +++ b/bindings/c/test/apitester/TesterBlobGranuleErrorsWorkload.cpp @@ -47,6 +47,8 @@ private: OP_LAST = OP_CANCEL_PURGE }; + void setup(TTaskFct cont) override { setupBlobGranules(cont); } + // could add summarize too old and verify too old as ops if desired but those are lower value // Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.cpp b/bindings/c/test/apitester/TesterTransactionExecutor.cpp index 9b52f80417..7189616d2e 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.cpp +++ b/bindings/c/test/apitester/TesterTransactionExecutor.cpp @@ -91,13 +91,15 @@ public: fdbDb = executor->selectDatabase(); } + if (tenantName) { + fdbTenant = fdbDb.openTenant(*tenantName); + fdbDbOps = std::make_shared(fdbTenant); + } else { + fdbDbOps = std::make_shared(fdbDb); + } + if (transactional) { - if (tenantName) { - fdb::Tenant tenant = fdbDb.openTenant(*tenantName); - fdbTx = tenant.createTransaction(); - } else { - fdbTx = fdbDb.createTransaction(); - } + fdbTx = fdbDbOps->createTransaction(); } } @@ -109,6 +111,10 @@ public: fdb::Database db() override { return fdbDb.atomic_load(); } + fdb::Tenant tenant() override { return fdbTenant.atomic_load(); } + + std::shared_ptr dbOps() override { return std::atomic_load(&fdbDbOps); } + fdb::Transaction tx() override { return fdbTx.atomic_load(); } // Set a continuation to be executed when a future gets ready @@ -272,13 +278,17 @@ protected: scheduler->schedule([thisRef]() { fdb::Database db = thisRef->executor->selectDatabase(); thisRef->fdbDb.atomic_store(db); + if (thisRef->tenantName) { + fdb::Tenant tenant = db.openTenant(*thisRef->tenantName); + thisRef->fdbTenant.atomic_store(tenant); + std::atomic_store(&thisRef->fdbDbOps, + std::dynamic_pointer_cast(std::make_shared(tenant))); + } else { + std::atomic_store(&thisRef->fdbDbOps, + std::dynamic_pointer_cast(std::make_shared(db))); + } if (thisRef->transactional) { - if (thisRef->tenantName) { - fdb::Tenant tenant = db.openTenant(*thisRef->tenantName); - thisRef->fdbTx.atomic_store(tenant.createTransaction()); - } else { - thisRef->fdbTx.atomic_store(db.createTransaction()); - } + thisRef->fdbTx.atomic_store(thisRef->fdbDbOps->createTransaction()); } thisRef->restartTransaction(); }); @@ -317,6 +327,14 @@ protected: // Provides a thread safe interface by itself (no need for mutex) fdb::Database fdbDb; + // FDB tenant + // Provides a thread safe interface by itself (no need for mutex) + fdb::Tenant fdbTenant; + + // FDB IDatabaseOps to hide database/tenant accordingly. + // Provides a shared pointer to database functions based on if db or tenant. + std::shared_ptr fdbDbOps; + // FDB transaction // Provides a thread safe interface by itself (no need for mutex) fdb::Transaction fdbTx; diff --git a/bindings/c/test/apitester/TesterTransactionExecutor.h b/bindings/c/test/apitester/TesterTransactionExecutor.h index e0a4474d6a..3a3748faa7 100644 --- a/bindings/c/test/apitester/TesterTransactionExecutor.h +++ b/bindings/c/test/apitester/TesterTransactionExecutor.h @@ -41,6 +41,12 @@ public: // Current FDB database virtual fdb::Database db() = 0; + // Current FDB tenant + virtual fdb::Tenant tenant() = 0; + + // Current FDB IDatabaseOps + virtual std::shared_ptr dbOps() = 0; + // Current FDB transaction virtual fdb::Transaction tx() = 0; diff --git a/bindings/c/test/apitester/TesterWorkload.cpp b/bindings/c/test/apitester/TesterWorkload.cpp index d8790667ae..643e773705 100644 --- a/bindings/c/test/apitester/TesterWorkload.cpp +++ b/bindings/c/test/apitester/TesterWorkload.cpp @@ -117,8 +117,11 @@ void WorkloadBase::execTransaction(TOpStartFct startFct, } // Execute a non-transactional database operation within the workload -void WorkloadBase::execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError) { - doExecute(startFct, cont, {}, failOnError, false); +void WorkloadBase::execOperation(TOpStartFct startFct, + TTaskFct cont, + std::optional tenant, + bool failOnError) { + doExecute(startFct, cont, tenant, failOnError, false); } void WorkloadBase::doExecute(TOpStartFct startFct, diff --git a/bindings/c/test/apitester/TesterWorkload.h b/bindings/c/test/apitester/TesterWorkload.h index 0ba93b1a2b..f908d2bcfb 100644 --- a/bindings/c/test/apitester/TesterWorkload.h +++ b/bindings/c/test/apitester/TesterWorkload.h @@ -125,7 +125,10 @@ protected: bool failOnError = true); // Execute a non-transactional database operation within the workload - void execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError = true); + void execOperation(TOpStartFct startFct, + TTaskFct cont, + std::optional tenant = std::optional(), + bool failOnError = true); // Log an error message, increase error counter void error(const std::string& msg); diff --git a/bindings/c/test/fdb_api.hpp b/bindings/c/test/fdb_api.hpp index f18cfef0a1..c60ca32c44 100644 --- a/bindings/c/test/fdb_api.hpp +++ b/bindings/c/test/fdb_api.hpp @@ -677,7 +677,28 @@ public: } }; -class Tenant final { +// Handle this as an abstract class instead of interface to preserve lifetime of fdb objects owned by Tenant and +// Database. +class IDatabaseOps { +public: + virtual ~IDatabaseOps() = default; + + virtual Transaction createTransaction() = 0; + + virtual TypedFuture blobbifyRange(KeyRef begin, KeyRef end) = 0; + virtual TypedFuture unblobbifyRange(KeyRef begin, KeyRef end) = 0; + virtual TypedFuture listBlobbifiedRanges(KeyRef begin, + KeyRef end, + int rangeLimit) = 0; + virtual TypedFuture verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) = 0; + virtual TypedFuture purgeBlobGranules(KeyRef begin, + KeyRef end, + int64_t version, + bool force) = 0; + virtual TypedFuture waitPurgeGranulesComplete(KeyRef purgeKey) = 0; +}; + +class Tenant final : public IDatabaseOps { friend class Database; std::shared_ptr tenant; @@ -694,6 +715,14 @@ public: Tenant& operator=(const Tenant&) noexcept = default; Tenant() noexcept : tenant(nullptr) {} + void atomic_store(Tenant other) { std::atomic_store(&tenant, other.tenant); } + + Tenant atomic_load() { + Tenant retVal; + retVal.tenant = std::atomic_load(&tenant); + return retVal; + } + static void createTenant(Transaction tr, BytesRef name) { tr.setOption(FDBTransactionOption::FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, BytesRef()); tr.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef()); @@ -715,7 +744,7 @@ public: return tr.get(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))), false); } - Transaction createTransaction() { + Transaction createTransaction() override { auto tx_native = static_cast(nullptr); auto err = Error(native::fdb_tenant_create_transaction(tenant.get(), &tx_native)); if (err) @@ -723,14 +752,49 @@ public: return Transaction(tx_native); } - TypedFuture blobbifyRange(KeyRef begin, KeyRef end) { + TypedFuture blobbifyRange(KeyRef begin, KeyRef end) override { if (!tenant) - throw std::runtime_error("blobbifyRange from null tenant"); + throw std::runtime_error("blobbifyRange() from null tenant"); return native::fdb_tenant_blobbify_range(tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end)); } + + TypedFuture unblobbifyRange(KeyRef begin, KeyRef end) override { + if (!tenant) + throw std::runtime_error("unblobbifyRange() from null tenant"); + return native::fdb_tenant_unblobbify_range( + tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end)); + } + + TypedFuture listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) override { + if (!tenant) + throw std::runtime_error("listBlobbifiedRanges() from null tenant"); + return native::fdb_tenant_list_blobbified_ranges( + tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit); + } + + TypedFuture verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) override { + if (!tenant) + throw std::runtime_error("verifyBlobRange() from null tenant"); + return native::fdb_tenant_verify_blob_range( + tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), version); + } + + TypedFuture purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) override { + if (!tenant) + throw std::runtime_error("purgeBlobGranules() from null tenant"); + native::fdb_bool_t forceBool = force; + return native::fdb_tenant_purge_blob_granules( + tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), version, forceBool); + } + + TypedFuture waitPurgeGranulesComplete(KeyRef purgeKey) override { + if (!tenant) + throw std::runtime_error("waitPurgeGranulesComplete() from null tenant"); + return native::fdb_tenant_wait_purge_granules_complete(tenant.get(), purgeKey.data(), intSize(purgeKey)); + } }; -class Database { +class Database : public IDatabaseOps { friend class Tenant; std::shared_ptr db; @@ -789,7 +853,7 @@ public: return Tenant(tenant_native); } - Transaction createTransaction() { + Transaction createTransaction() override { if (!db) throw std::runtime_error("create_transaction from null database"); auto tx_native = static_cast(nullptr); @@ -799,33 +863,33 @@ public: return Transaction(tx_native); } - TypedFuture listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) { + TypedFuture listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) override { if (!db) throw std::runtime_error("listBlobbifiedRanges from null database"); return native::fdb_database_list_blobbified_ranges( db.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit); } - TypedFuture verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) { + TypedFuture verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) override { if (!db) throw std::runtime_error("verifyBlobRange from null database"); return native::fdb_database_verify_blob_range( db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version); } - TypedFuture blobbifyRange(KeyRef begin, KeyRef end) { + TypedFuture blobbifyRange(KeyRef begin, KeyRef end) override { if (!db) throw std::runtime_error("blobbifyRange from null database"); return native::fdb_database_blobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end)); } - TypedFuture unblobbifyRange(KeyRef begin, KeyRef end) { + TypedFuture unblobbifyRange(KeyRef begin, KeyRef end) override { if (!db) throw std::runtime_error("unblobbifyRange from null database"); return native::fdb_database_unblobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end)); } - TypedFuture purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) { + TypedFuture purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) override { if (!db) throw std::runtime_error("purgeBlobGranules from null database"); native::fdb_bool_t forceBool = force; @@ -833,7 +897,7 @@ public: db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version, forceBool); } - TypedFuture waitPurgeGranulesComplete(KeyRef purgeKey) { + TypedFuture waitPurgeGranulesComplete(KeyRef purgeKey) override { if (!db) throw std::runtime_error("purgeBlobGranules from null database"); return native::fdb_database_wait_purge_granules_complete(db.get(), purgeKey.data(), intSize(purgeKey)); diff --git a/bindings/go/src/fdb/generated.go b/bindings/go/src/fdb/generated.go index a00e17d1a2..9a9104325e 100644 --- a/bindings/go/src/fdb/generated.go +++ b/bindings/go/src/fdb/generated.go @@ -497,6 +497,11 @@ func (o TransactionOptions) SetRawAccess() error { return o.setOpt(303, nil) } +// Allows this transaction to bypass storage quota enforcement. Should only be used for transactions that directly or indirectly decrease the size of the tenant group's data. +func (o TransactionOptions) SetBypassStorageQuota() error { + return o.setOpt(304, nil) +} + // Not yet implemented. func (o TransactionOptions) SetDebugRetryLogging(param string) error { return o.setOpt(401, []byte(param)) diff --git a/contrib/TestHarness2/test_harness/fdb.py b/contrib/TestHarness2/test_harness/fdb.py index 1e6afa3906..a0baae1598 100644 --- a/contrib/TestHarness2/test_harness/fdb.py +++ b/contrib/TestHarness2/test_harness/fdb.py @@ -54,7 +54,7 @@ def write_coverage_chunk(tr, path: Tuple[str, ...], metadata: Tuple[str, ...], initialized = v.present() for cov, covered in coverage: if not initialized or covered: - tr.add(cov_dir.pack((cov.file, cov.line, cov.comment)), struct.pack(' OrderedDict[Coverage, int]: res = collections.OrderedDict() cov_dir = fdb.directory.create_or_open(tr, cov_path) for k, v in tr[cov_dir.range()]: - file, line, comment = cov_dir.unpack(k) + file, line, comment, rare = cov_dir.unpack(k) count = struct.unpack(' count: self.min_coverage_hit = count self.coverage.sort(key=lambda x: (x[1], x[0].file, x[0].line)) @@ -63,9 +66,12 @@ class EnsembleResults: out.attributes['MinProbeHit'] = str(self.min_coverage_hit) out.attributes['TotalProbes'] = str(len(self.coverage)) out.attributes['MissedProbes'] = str(self.global_statistics.total_missed_probes) + out.attributes['MissedNonRareProbes'] = str(self.global_statistics.total_missed_nonrare_probes) for cov, count in self.coverage: - severity = 10 if count > self.ratio else 40 + severity = 10 + if count <= self.ratio: + severity = 30 if cov.rare else 40 if severity == 40: errors += 1 if (severity == 40 and errors <= config.max_errors) or config.details: @@ -75,6 +81,7 @@ class EnsembleResults: child.attributes['Line'] = str(cov.line) child.attributes['Comment'] = '' if cov.comment is None else cov.comment child.attributes['HitCount'] = str(count) + child.attributes['Rare'] = str(cov.rare) out.append(child) if config.details: diff --git a/contrib/TestHarness2/test_harness/summarize.py b/contrib/TestHarness2/test_harness/summarize.py index a0271c37dc..4cdf92d9e1 100644 --- a/contrib/TestHarness2/test_harness/summarize.py +++ b/contrib/TestHarness2/test_harness/summarize.py @@ -193,16 +193,17 @@ class JsonParser(Parser): class Coverage: - def __init__(self, file: str, line: str | int, comment: str | None = None): + def __init__(self, file: str, line: str | int, comment: str | None = None, rare: bool = False): self.file = file self.line = int(line) self.comment = comment + self.rare = rare def to_tuple(self) -> Tuple[str, int, str | None]: - return self.file, self.line, self.comment + return self.file, self.line, self.comment, self.rare def __eq__(self, other) -> bool: - if isinstance(other, tuple) and len(other) == 3: + if isinstance(other, tuple) and len(other) == 4: return self.to_tuple() == other elif isinstance(other, Coverage): return self.to_tuple() == other.to_tuple() @@ -210,7 +211,7 @@ class Coverage: return False def __lt__(self, other) -> bool: - if isinstance(other, tuple) and len(other) == 3: + if isinstance(other, tuple) and len(other) == 4: return self.to_tuple() < other elif isinstance(other, Coverage): return self.to_tuple() < other.to_tuple() @@ -218,7 +219,7 @@ class Coverage: return False def __le__(self, other) -> bool: - if isinstance(other, tuple) and len(other) == 3: + if isinstance(other, tuple) and len(other) == 4: return self.to_tuple() <= other elif isinstance(other, Coverage): return self.to_tuple() <= other.to_tuple() @@ -226,7 +227,7 @@ class Coverage: return False def __gt__(self, other: Coverage) -> bool: - if isinstance(other, tuple) and len(other) == 3: + if isinstance(other, tuple) and len(other) == 4: return self.to_tuple() > other elif isinstance(other, Coverage): return self.to_tuple() > other.to_tuple() @@ -234,7 +235,7 @@ class Coverage: return False def __ge__(self, other): - if isinstance(other, tuple) and len(other) == 3: + if isinstance(other, tuple) and len(other) == 4: return self.to_tuple() >= other elif isinstance(other, Coverage): return self.to_tuple() >= other.to_tuple() @@ -242,7 +243,7 @@ class Coverage: return False def __hash__(self): - return hash((self.file, self.line, self.comment)) + return hash((self.file, self.line, self.comment, self.rare)) class TraceFiles: @@ -378,6 +379,7 @@ class Summary: child = SummaryTree('CodeCoverage') child.attributes['File'] = k.file child.attributes['Line'] = str(k.line) + child.attributes['Rare'] = k.rare if not v: child.attributes['Covered'] = '0' if k.comment is not None and len(k.comment): @@ -595,7 +597,10 @@ class Summary: comment = '' if 'Comment' in attrs: comment = attrs['Comment'] - c = Coverage(attrs['File'], attrs['Line'], comment) + rare = False + if 'Rare' in attrs: + rare = bool(int(attrs['Rare'])) + c = Coverage(attrs['File'], attrs['Line'], comment, rare) if covered or c not in self.coverage: self.coverage[c] = covered diff --git a/design/global-tag-throttling.md b/design/global-tag-throttling.md index 0d5b3d813d..9166246212 100644 --- a/design/global-tag-throttling.md +++ b/design/global-tag-throttling.md @@ -116,12 +116,12 @@ If an individual zone is unhealthy, it may cause the throttling ratio for storag ### Client Rate Calculation The smoothed per-client rate for each tag is tracked within `GlobalTagThrottlerImpl::PerTagStatistics`. Once a target rate has been computed, this is passed to `GlobalTagThrotterImpl::PerTagStatistics::updateAndGetPerClientRate` which adjusts the per-client rate. The per-client rate is meant to limit the busiest clients, so that at equilibrium, the per-client rate will remain constant and the sum of throughput from all clients will match the target rate. -## Testing -The `GlobalTagThrottling.toml` test provides a simple end-to-end test using the global tag throttler. Quotas are set using the internal tag quota API in the `GlobalTagThrottling` workload. This is run in parallel with the `ReadWrite` workload, which tags transactions. The number of `transaction_tag_throttled` errors is reported, along with the throughput, which should be roughly predictable based on the quota parameters chosen. +## Simulation Testing +The `ThroughputQuota.toml` test provides a simple end-to-end test using the global tag throttler. Quotas are set using the internal tag quota API in the `ThroughputQuota` workload. This is run with the `Cycle` workload, which randomly tags transactions. In addition to this end-to-end test, there is a suite of unit tests with the `/GlobalTagThrottler/` prefix. These tests run in a mock environment, with mock storage servers providing simulated storage queue statistics and tag busyness reports. Mock clients simulate workload on these mock storage servers, and get throttling feedback directly from a global tag throttler which is monitoring the mock storage servers. -In each test, the `GlobalTagThrottlerTesting::monitor` function is used to periodically check whether or not a desired equilibrium state has been reached. If the desired state is reached and maintained for a sufficient period of time, the test passes. If the unit test is unable to reach this desired equilibrium state before a timeout, the test will fail. Commonly, the desired state is for the global tag throttler to report a client rate sufficiently close to the desired rate specified as an input to the `GlobalTagThrottlerTesting::rateIsNear` function. +In each unit test, the `GlobalTagThrottlerTesting::monitor` function is used to periodically check whether or not a desired equilibrium state has been reached. If the desired state is reached and maintained for a sufficient period of time, the test passes. If the unit test is unable to reach this desired equilibrium state before a timeout, the test will fail. Commonly, the desired state is for the global tag throttler to report a client rate sufficiently close to the desired rate specified as an input to the `GlobalTagThrottlerTesting::rateIsNear` function. ## Visibility diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 8aeea5017f..14f2adaba3 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -107,9 +107,9 @@ struct ConvertParams { bool log_enabled = false; std::string log_dir, trace_format, trace_log_group; - bool isValid() { return begin != invalidVersion && end != invalidVersion && !container_url.empty(); } + bool isValid() const { return begin != invalidVersion && end != invalidVersion && !container_url.empty(); } - std::string toString() { + std::string toString() const { std::string s; s.append("ContainerURL:"); s.append(container_url); diff --git a/fdbcli/QuotaCommand.actor.cpp b/fdbcli/QuotaCommand.actor.cpp index e9b49fa975..7b07de71d3 100644 --- a/fdbcli/QuotaCommand.actor.cpp +++ b/fdbcli/QuotaCommand.actor.cpp @@ -19,11 +19,13 @@ */ #include "fdbcli/fdbcli.actor.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/SystemData.h" #include "flow/actorcompiler.h" // This must be the last include namespace { -enum class LimitType { RESERVED, TOTAL }; +enum class QuotaType { RESERVED, TOTAL, STORAGE }; Optional parseTag(StringRef token) { if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) { @@ -33,17 +35,19 @@ Optional parseTag(StringRef token) { } } -Optional parseLimitType(StringRef token) { +Optional parseQuotaType(StringRef token) { if (token == "reserved_throughput"_sr) { - return LimitType::RESERVED; + return QuotaType::RESERVED; } else if (token == "total_throughput"_sr) { - return LimitType::TOTAL; + return QuotaType::TOTAL; + } else if (token == "storage"_sr) { + return QuotaType::STORAGE; } else { return {}; } } -Optional parseLimitValue(StringRef token) { +Optional parseQuotaValue(StringRef token) { try { return std::stol(token.toString()); } catch (...) { @@ -51,20 +55,26 @@ Optional parseLimitValue(StringRef token) { } } -ACTOR Future getQuota(Reference db, TransactionTag tag, LimitType limitType) { +ACTOR Future getQuota(Reference db, TransactionTag tag, QuotaType quotaType) { state Reference tr = db->createTransaction(); loop { tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); try { - state ThreadFuture> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag)); + state ThreadFuture> resultFuture = + tr->get(quotaType == QuotaType::STORAGE ? storageQuotaKey(tag) : ThrottleApi::getTagQuotaKey(tag)); Optional v = wait(safeThreadFutureToFuture(resultFuture)); if (!v.present()) { fmt::print("\n"); } else { + if (quotaType == QuotaType::STORAGE) { + int64_t storageQuota = BinaryReader::fromStringRef(v.get(), Unversioned()); + fmt::print("{}\n", storageQuota); + return Void(); + } auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get()); - if (limitType == LimitType::TOTAL) { + if (quotaType == QuotaType::TOTAL) { fmt::print("{}\n", quota.totalQuota); - } else if (limitType == LimitType::RESERVED) { + } else if (quotaType == QuotaType::RESERVED) { fmt::print("{}\n", quota.reservedQuota); } } @@ -75,32 +85,36 @@ ACTOR Future getQuota(Reference db, TransactionTag tag, LimitTy } } -ACTOR Future setQuota(Reference db, TransactionTag tag, LimitType limitType, int64_t value) { +ACTOR Future setQuota(Reference db, TransactionTag tag, QuotaType quotaType, int64_t value) { state Reference tr = db->createTransaction(); loop { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { - state ThreadFuture> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag)); - Optional v = wait(safeThreadFutureToFuture(resultFuture)); - ThrottleApi::TagQuotaValue quota; - if (v.present()) { - quota = ThrottleApi::TagQuotaValue::fromValue(v.get()); + if (quotaType == QuotaType::STORAGE) { + tr->set(storageQuotaKey(tag), BinaryWriter::toValue(value, Unversioned())); + } else { + state ThreadFuture> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag)); + Optional v = wait(safeThreadFutureToFuture(resultFuture)); + ThrottleApi::TagQuotaValue quota; + if (v.present()) { + quota = ThrottleApi::TagQuotaValue::fromValue(v.get()); + } + // Internally, costs are stored in terms of pages, but in the API, + // costs are specified in terms of bytes + if (quotaType == QuotaType::TOTAL) { + // Round up to nearest page size + quota.totalQuota = ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * + CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; + } else if (quotaType == QuotaType::RESERVED) { + // Round up to nearest page size + quota.reservedQuota = ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * + CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; + } + if (!quota.isValid()) { + throw invalid_throttle_quota_value(); + } + ThrottleApi::setTagQuota(tr, tag, quota.reservedQuota, quota.totalQuota); } - // Internally, costs are stored in terms of pages, but in the API, - // costs are specified in terms of bytes - if (limitType == LimitType::TOTAL) { - // Round up to nearest page size - quota.totalQuota = - ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; - } else if (limitType == LimitType::RESERVED) { - // Round up to nearest page size - quota.reservedQuota = - ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE; - } - if (!quota.isValid()) { - throw invalid_throttle_quota_value(); - } - ThrottleApi::setTagQuota(tr, tag, quota.reservedQuota, quota.totalQuota); wait(safeThreadFutureToFuture(tr->commit())); fmt::print("Successfully updated quota.\n"); return Void(); @@ -115,6 +129,7 @@ ACTOR Future clearQuota(Reference db, TransactionTag tag) { loop { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { + tr->clear(storageQuotaKey(tag)); tr->clear(ThrottleApi::getTagQuotaKey(tag)); wait(safeThreadFutureToFuture(tr->commit())); fmt::print("Successfully cleared quota.\n"); @@ -125,8 +140,8 @@ ACTOR Future clearQuota(Reference db, TransactionTag tag) { } } -constexpr auto usage = "quota [get [reserved_throughput|total_throughput] | set " - "[reserved_throughput|total_throughput] | clear ]"; +constexpr auto usage = "quota [get [reserved_throughput|total_throughput|storage] | set " + "[reserved_throughput|total_throughput|storage] | clear ]"; bool exitFailure() { fmt::print(usage); @@ -150,22 +165,22 @@ ACTOR Future quotaCommandActor(Reference db, std::vector' + command = 'quota get green storage' + output = run_fdbcli_command(command) + logger.debug(command + ' : ' + output) + assert output == '' + # Too few arguments, should log help message command = 'quota get green' output = run_fdbcli_command(command) diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index e0646ec0a9..ec01a6d17d 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -971,6 +971,11 @@ void sortDeltasByKey(const Standalone& deltasByVersion, // clearVersion as previous guy) } +void sortDeltasByKey(const Standalone& deltasByVersion, const KeyRangeRef& fileRange) { + SortedDeltasT deltasByKey; + sortDeltasByKey(deltasByVersion, fileRange, deltasByKey); +} + // FIXME: Could maybe reduce duplicated code between this and chunkedSnapshot for chunking Value serializeChunkedDeltaFile(const Standalone& fileNameRef, const Standalone& deltas, diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 85e840a106..b5e10f4a48 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -5924,7 +5924,6 @@ public: printf("Restoring backup to version: %lld\n", (long long)targetVersion); } - state int retryCount = 0; state Reference tr(new ReadYourWritesTransaction(cx)); loop { try { @@ -5948,17 +5947,9 @@ public: wait(tr->commit()); break; } catch (Error& e) { - if (e.code() == error_code_transaction_too_old) { - retryCount++; - } if (e.code() == error_code_restore_duplicate_tag) { throw; } - if (g_network->isSimulated() && retryCount > 50) { - CODE_PROBE(true, "submitRestore simulation speedup"); - // try to make the read window back to normal size (5 * version_per_sec) - g_simulator->speedUpSimulation = true; - } wait(tr->onError(e)); } } diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 1abf6e9bcf..e2ae1142ed 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -2145,6 +2145,9 @@ ACTOR Future lockDatabase(Reference tr, UID id) ACTOR Future lockDatabase(Database cx, UID id) { state Transaction tr(cx); + UID debugID = deterministicRandom()->randomUniqueID(); + TraceEvent("LockDatabaseTransaction", debugID).log(); + tr.debugTransaction(debugID); loop { try { wait(lockDatabase(&tr, id)); diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 6c34369e6a..1b909a0eb8 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -965,7 +965,8 @@ ACTOR Future monitorProxiesOneGeneration( allConnectionsFailed = false; } else { CODE_PROBE(rep.getError().code() == error_code_failed_to_progress, - "Coordinator cant talk to cluster controller"); + "Coordinator cant talk to cluster controller", + probe::decoration::rare); TraceEvent("MonitorProxiesConnectFailed") .detail("Error", rep.getError().name()) .detail("Coordinator", clientLeaderServer.getAddressString()); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 488e85f8e0..7f2753a8ab 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2175,7 +2175,7 @@ void DatabaseContext::removeWatch() { ASSERT(outstandingWatches >= 0); } -Future DatabaseContext::onConnected() { +Future DatabaseContext::onConnected() const { return connected; } @@ -2802,26 +2802,26 @@ void GetRangeLimits::decrement(MappedKeyValueRef const& data) { } // True if either the row or byte limit has been reached -bool GetRangeLimits::isReached() { +bool GetRangeLimits::isReached() const { return rows == 0 || (bytes == 0 && minRows == 0); } // True if data would cause the row or byte limit to be reached -bool GetRangeLimits::reachedBy(VectorRef const& data) { +bool GetRangeLimits::reachedBy(VectorRef const& data) const { return (rows != GetRangeLimits::ROW_LIMIT_UNLIMITED && data.size() >= rows) || (bytes != GetRangeLimits::BYTE_LIMIT_UNLIMITED && (int)data.expectedSize() + (8 - (int)sizeof(KeyValueRef)) * data.size() >= bytes && data.size() >= minRows); } -bool GetRangeLimits::hasByteLimit() { +bool GetRangeLimits::hasByteLimit() const { return bytes != GetRangeLimits::BYTE_LIMIT_UNLIMITED; } -bool GetRangeLimits::hasRowLimit() { +bool GetRangeLimits::hasRowLimit() const { return rows != GetRangeLimits::ROW_LIMIT_UNLIMITED; } -bool GetRangeLimits::hasSatisfiedMinRows() { +bool GetRangeLimits::hasSatisfiedMinRows() const { return hasByteLimit() && minRows == 0; } @@ -4771,7 +4771,8 @@ static Future tssStreamComparison(Request request, TSS_traceMismatch(mismatchEvent, request, ssReply.get(), tssReply.get()); CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, - "Tracing Full TSS Mismatch in stream comparison"); + "Tracing Full TSS Mismatch in stream comparison", + probe::decoration::rare); CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, "Tracing Partial TSS Mismatch in stream comparison and storing the rest in FDB"); @@ -4813,7 +4814,7 @@ maybeDuplicateTSSStreamFragment(Request& req, QueueModel* model, RequestStream tssData = model->getTssData(ssStream->getEndpoint().token.first()); if (tssData.present()) { - CODE_PROBE(true, "duplicating stream to TSS"); + CODE_PROBE(true, "duplicating stream to TSS", probe::decoration::rare); resetReply(req); // FIXME: optimize to avoid creating new netNotifiedQueueWithAcknowledgements for each stream duplication RequestStream tssRequestStream(tssData.get().endpoint); @@ -5952,6 +5953,7 @@ void TransactionOptions::clear() { useGrvCache = false; skipGrvCache = false; rawAccess = false; + bypassStorageQuota = false; } TransactionOptions::TransactionOptions() { @@ -6693,6 +6695,9 @@ Future Transaction::commitMutations() { if (trState->options.firstInBatch) { tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH; } + if (trState->options.bypassStorageQuota) { + tr.flags = tr.flags | CommitTransactionRequest::FLAG_BYPASS_STORAGE_QUOTA; + } if (trState->options.reportConflictingKeys) { tr.transaction.report_conflicting_keys = true; } @@ -6971,6 +6976,10 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optionaloptions.rawAccess = true; break; + case FDBTransactionOptions::BYPASS_STORAGE_QUOTA: + trState->options.bypassStorageQuota = true; + break; + case FDBTransactionOptions::AUTHORIZATION_TOKEN: if (value.present()) trState->authToken = Standalone(value.get()); @@ -9406,7 +9415,8 @@ void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request, mismatchEvent.detail("TSSVersion", tssVersion); CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, - "Tracing Full TSS Feed Mismatch in stream comparison"); + "Tracing Full TSS Feed Mismatch in stream comparison", + probe::decoration::rare); CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, "Tracing Partial TSS Feed Mismatch in stream comparison and storing the rest in FDB"); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 965a9b59ca..161eacb6d6 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1654,7 +1654,7 @@ Future ReadYourWritesTransaction::getRange(KeySelector begin, // This optimization prevents nullptr operations from being added to the conflict range if (limits.isReached()) { - CODE_PROBE(true, "RYW range read limit 0", probe::decoration::rare); + CODE_PROBE(true, "RYW range read limit 0"); return RangeResult(); } @@ -1668,7 +1668,7 @@ Future ReadYourWritesTransaction::getRange(KeySelector begin, end.removeOrEqual(end.arena()); if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) { - CODE_PROBE(true, "RYW range inverted", probe::decoration::rare); + CODE_PROBE(true, "RYW range inverted"); return RangeResult(); } @@ -1698,7 +1698,7 @@ Future ReadYourWritesTransaction::getMappedRange(KeySelector if (getDatabase()->apiVersionAtLeast(630)) { if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() && end.getKey() <= specialKeys.end) { - CODE_PROBE(true, "Special key space get range (getMappedRange)"); + CODE_PROBE(true, "Special key space get range (getMappedRange)", probe::decoration::rare); throw client_invalid_operation(); // Not support special keys. } } else { @@ -1720,7 +1720,7 @@ Future ReadYourWritesTransaction::getMappedRange(KeySelector // This optimization prevents nullptr operations from being added to the conflict range if (limits.isReached()) { - CODE_PROBE(true, "RYW range read limit 0 (getMappedRange)"); + CODE_PROBE(true, "RYW range read limit 0 (getMappedRange)", probe::decoration::rare); return MappedRangeResult(); } @@ -1734,7 +1734,7 @@ Future ReadYourWritesTransaction::getMappedRange(KeySelector end.removeOrEqual(end.arena()); if (begin.offset >= end.offset && begin.getKey() >= end.getKey()) { - CODE_PROBE(true, "RYW range inverted (getMappedRange)"); + CODE_PROBE(true, "RYW range inverted (getMappedRange)", probe::decoration::rare); return MappedRangeResult(); } diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 2c63e176e4..7d7598cdcf 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -821,10 +821,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 ); init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 ); + // Read priority definitions in the form of a list of their relative concurrency share weights + init( STORAGESERVER_READ_PRIORITIES, "120,10,20,40,60" ); + // The total concurrency which will be shared by active priorities according to their relative weights init( STORAGE_SERVER_READ_CONCURRENCY, 70 ); - // Priorities which each ReadType maps to, in enumeration order - init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" ); - init( STORAGESERVER_READ_PRIORITIES, "48,32,8" ); + // The priority number which each ReadType maps to in enumeration order + // This exists for flexibility but assigning each ReadType to its own unique priority number makes the most sense + // The enumeration is currently: eager, fetch, low, normal, high + init( STORAGESERVER_READTYPE_PRIORITY_MAP, "0,1,2,3,4" ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; @@ -948,7 +952,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 ); init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; } init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); } - init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" ); + init( REDWOOD_IO_PRIORITIES, "32,32,32,32" ); init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false ); // Server request latency measurement @@ -1022,6 +1026,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 ); init( BLOB_FULL_RESTORE_MODE, false ); init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0); + init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000); init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); diff --git a/fdbclient/include/fdbclient/BlobGranuleFiles.h b/fdbclient/include/fdbclient/BlobGranuleFiles.h index b9c6ceaf8c..9bf9e0eeec 100644 --- a/fdbclient/include/fdbclient/BlobGranuleFiles.h +++ b/fdbclient/include/fdbclient/BlobGranuleFiles.h @@ -56,4 +56,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, std::string randomBGFilename(UID blobWorkerID, UID granuleID, Version version, std::string suffix); -#endif \ No newline at end of file +// For benchmark testing only. It should never be called in prod. +void sortDeltasByKey(const Standalone& deltasByVersion, const KeyRangeRef& fileRange); + +#endif diff --git a/fdbclient/include/fdbclient/CommitProxyInterface.h b/fdbclient/include/fdbclient/CommitProxyInterface.h index 2791338b02..c07db40143 100644 --- a/fdbclient/include/fdbclient/CommitProxyInterface.h +++ b/fdbclient/include/fdbclient/CommitProxyInterface.h @@ -196,10 +196,11 @@ struct CommitID { struct CommitTransactionRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 93948; - enum { FLAG_IS_LOCK_AWARE = 0x1, FLAG_FIRST_IN_BATCH = 0x2 }; + enum { FLAG_IS_LOCK_AWARE = 0x1, FLAG_FIRST_IN_BATCH = 0x2, FLAG_BYPASS_STORAGE_QUOTA = 0x4 }; bool isLockAware() const { return (flags & FLAG_IS_LOCK_AWARE) != 0; } bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } + bool bypassStorageQuota() const { return (flags & FLAG_BYPASS_STORAGE_QUOTA) != 0; } Arena arena; SpanContext spanContext; diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index a16baaffe2..8899d219a2 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -353,8 +353,9 @@ public: int apiVersionAtLeast(int minVersion) const { return apiVersion.version() >= minVersion; } - Future onConnected(); // Returns after a majority of coordination servers are available and have reported a - // leader. The cluster file therefore is valid, but the database might be unavailable. + Future onConnected() + const; // Returns after a majority of coordination servers are available and have reported a + // leader. The cluster file therefore is valid, but the database might be unavailable. Reference getConnectionRecord(); // Switch the database to use the new connection file, and recreate all pending watches for committed transactions. diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 9a2205a1f3..46f28073fc 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -706,15 +706,15 @@ struct GetRangeLimits { void decrement(MappedKeyValueRef const& data); // True if either the row or byte limit has been reached - bool isReached(); + bool isReached() const; // True if data would cause the row or byte limit to be reached - bool reachedBy(VectorRef const& data); + bool reachedBy(VectorRef const& data) const; - bool hasByteLimit(); - bool hasRowLimit(); + bool hasByteLimit() const; + bool hasRowLimit() const; - bool hasSatisfiedMinRows(); + bool hasSatisfiedMinRows() const; bool isValid() const { return (rows >= 0 || rows == ROW_LIMIT_UNLIMITED) && (bytes >= 0 || bytes == BYTE_LIMIT_UNLIMITED) && minRows >= 0 && (minRows <= rows || rows == ROW_LIMIT_UNLIMITED); diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index 1ba7fe4323..aa5a5ebffd 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -161,6 +161,7 @@ struct TransactionOptions { bool useGrvCache : 1; bool skipGrvCache : 1; bool rawAccess : 1; + bool bypassStorageQuota : 1; TransactionPriority priority; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 2be513d7bb..2f708a77b7 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -772,9 +772,9 @@ public: int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; int STORAGE_FEED_QUERY_HARD_LIMIT; - int STORAGE_SERVER_READ_CONCURRENCY; - std::string STORAGESERVER_READ_RANKS; std::string STORAGESERVER_READ_PRIORITIES; + int STORAGE_SERVER_READ_CONCURRENCY; + std::string STORAGESERVER_READTYPE_PRIORITY_MAP; // Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; @@ -923,7 +923,7 @@ public: int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches bool REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT; // Whether to split pages by tenant if encryption is enabled - std::string REDWOOD_PRIORITY_LAUNCHS; + std::string REDWOOD_IO_PRIORITIES; // Server request latency measurement int LATENCY_SAMPLE_SIZE; @@ -998,6 +998,7 @@ public: double BLOB_MANIFEST_BACKUP_INTERVAL; bool BLOB_FULL_RESTORE_MODE; double BLOB_MIGRATOR_CHECK_INTERVAL; + int BLOB_MANIFEST_RW_ROWS; // Blob metadata int64_t BLOB_METADATA_CACHE_TTL; diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index 2bea2b3dcd..36962a86be 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -253,6 +253,8 @@ description is not currently required but encouraged. description="Allows this transaction to read system keys (those that start with the byte 0xFF). Implies raw_access."/>