Merge remote-tracking branch 'origin/main' into ddsketch
This commit is contained in:
commit
99d4bacf5d
|
@ -70,10 +70,13 @@ void ApiWorkload::start() {
|
|||
schedule([this]() {
|
||||
// 1. Clear data
|
||||
clearData([this]() {
|
||||
// 2. Populate initial data
|
||||
populateData([this]() {
|
||||
// 3. Generate random workload
|
||||
runTests();
|
||||
// 2. Workload setup
|
||||
setup([this]() {
|
||||
// 3. Populate initial data
|
||||
populateData([this]() {
|
||||
// 4. Generate random workload
|
||||
runTests();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -249,6 +252,10 @@ void ApiWorkload::populateData(TTaskFct cont) {
|
|||
}
|
||||
}
|
||||
|
||||
void ApiWorkload::setup(TTaskFct cont) {
|
||||
schedule(cont);
|
||||
}
|
||||
|
||||
void ApiWorkload::randomInsertOp(TTaskFct cont, std::optional<int> tenantId) {
|
||||
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
|
||||
auto kvPairs = std::make_shared<std::vector<fdb::KeyValue>>();
|
||||
|
@ -322,4 +329,85 @@ std::optional<fdb::BytesRef> ApiWorkload::getTenant(std::optional<int> tenantId)
|
|||
}
|
||||
}
|
||||
|
||||
std::string ApiWorkload::debugTenantStr(std::optional<int> tenantId) {
|
||||
return tenantId.has_value() ? fmt::format("(tenant {0})", tenantId.value()) : "()";
|
||||
}
|
||||
|
||||
// BlobGranule setup.
|
||||
// This blobbifies ['\x00', '\xff') per tenant or for the whole database if there are no tenants.
|
||||
void ApiWorkload::setupBlobGranules(TTaskFct cont) {
|
||||
// This count is used to synchronize the # of tenant blobbifyRange() calls to ensure
|
||||
// we only start the workload once blobbification has fully finished.
|
||||
auto blobbifiedCount = std::make_shared<std::atomic<int>>(1);
|
||||
|
||||
if (tenants.empty()) {
|
||||
blobbifiedCount->store(1);
|
||||
blobbifyTenant({}, blobbifiedCount, cont);
|
||||
} else {
|
||||
blobbifiedCount->store(tenants.size());
|
||||
for (int i = 0; i < tenants.size(); i++) {
|
||||
schedule([=]() { blobbifyTenant(i, blobbifiedCount, cont); });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ApiWorkload::blobbifyTenant(std::optional<int> tenantId,
|
||||
std::shared_ptr<std::atomic<int>> blobbifiedCount,
|
||||
TTaskFct cont) {
|
||||
auto retBlobbifyRange = std::make_shared<bool>(false);
|
||||
execOperation(
|
||||
[=](auto ctx) {
|
||||
fdb::Key begin(1, '\x00');
|
||||
fdb::Key end(1, '\xff');
|
||||
|
||||
info(fmt::format("setup: blobbifying {}: [\\x00 - \\xff)\n", debugTenantStr(tenantId)));
|
||||
|
||||
fdb::Future f = ctx->dbOps()->blobbifyRange(begin, end).eraseType();
|
||||
ctx->continueAfter(f, [ctx, retBlobbifyRange, f]() {
|
||||
*retBlobbifyRange = f.get<fdb::future_var::Bool>();
|
||||
ctx->done();
|
||||
});
|
||||
},
|
||||
[=]() {
|
||||
if (!*retBlobbifyRange) {
|
||||
schedule([=]() { blobbifyTenant(tenantId, blobbifiedCount, cont); });
|
||||
} else {
|
||||
schedule([=]() { verifyTenant(tenantId, blobbifiedCount, cont); });
|
||||
}
|
||||
},
|
||||
/*tenant=*/getTenant(tenantId),
|
||||
/* failOnError = */ false);
|
||||
}
|
||||
|
||||
void ApiWorkload::verifyTenant(std::optional<int> tenantId,
|
||||
std::shared_ptr<std::atomic<int>> blobbifiedCount,
|
||||
TTaskFct cont) {
|
||||
auto retVerifyVersion = std::make_shared<int64_t>(-1);
|
||||
|
||||
execOperation(
|
||||
[=](auto ctx) {
|
||||
fdb::Key begin(1, '\x00');
|
||||
fdb::Key end(1, '\xff');
|
||||
|
||||
info(fmt::format("setup: verifying {}: [\\x00 - \\xff)\n", debugTenantStr(tenantId)));
|
||||
|
||||
fdb::Future f = ctx->dbOps()->verifyBlobRange(begin, end, /*latest_version*/ -2).eraseType();
|
||||
ctx->continueAfter(f, [ctx, retVerifyVersion, f]() {
|
||||
*retVerifyVersion = f.get<fdb::future_var::Int64>();
|
||||
ctx->done();
|
||||
});
|
||||
},
|
||||
[=]() {
|
||||
if (*retVerifyVersion == -1) {
|
||||
schedule([=]() { verifyTenant(tenantId, blobbifiedCount, cont); });
|
||||
} else {
|
||||
if (blobbifiedCount->fetch_sub(1) == 1) {
|
||||
schedule(cont);
|
||||
}
|
||||
}
|
||||
},
|
||||
/*tenant=*/getTenant(tenantId),
|
||||
/* failOnError = */ false);
|
||||
}
|
||||
|
||||
} // namespace FdbApiTester
|
||||
|
|
|
@ -41,6 +41,9 @@ public:
|
|||
|
||||
virtual void checkProgress() override;
|
||||
|
||||
// Workload specific setup phase.
|
||||
virtual void setup(TTaskFct cont);
|
||||
|
||||
// Running specific tests
|
||||
// The default implementation generates a workload consisting of
|
||||
// random operations generated by randomOperation
|
||||
|
@ -126,6 +129,12 @@ protected:
|
|||
void randomClearRangeOp(TTaskFct cont, std::optional<int> tenantId);
|
||||
|
||||
std::optional<fdb::BytesRef> getTenant(std::optional<int> tenantId);
|
||||
std::string debugTenantStr(std::optional<int> tenantId);
|
||||
|
||||
// Generic BlobGranules setup.
|
||||
void setupBlobGranules(TTaskFct cont);
|
||||
void blobbifyTenant(std::optional<int> tenantId, std::shared_ptr<std::atomic<int>> blobbifiedCount, TTaskFct cont);
|
||||
void verifyTenant(std::optional<int> tenantId, std::shared_ptr<std::atomic<int>> blobbifiedCount, TTaskFct cont);
|
||||
|
||||
private:
|
||||
void populateDataTx(TTaskFct cont, std::optional<int> tenantId);
|
||||
|
|
|
@ -52,26 +52,23 @@ private:
|
|||
};
|
||||
std::vector<OpType> excludedOpTypes;
|
||||
|
||||
void setup(TTaskFct cont) override { setupBlobGranules(cont); }
|
||||
|
||||
// Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet
|
||||
// FIXME: should still guarantee a read succeeds eventually somehow
|
||||
// FIXME: this needs to be per tenant if tenant ids are set
|
||||
std::unordered_set<std::optional<int>> tenantsWithReadSuccess;
|
||||
|
||||
inline void setReadSuccess(std::optional<int> tenantId) { tenantsWithReadSuccess.insert(tenantId); }
|
||||
|
||||
inline bool seenReadSuccess(std::optional<int> tenantId) { return tenantsWithReadSuccess.count(tenantId); }
|
||||
|
||||
std::string tenantDebugString(std::optional<int> tenantId) {
|
||||
return tenantId.has_value() ? fmt::format(" (tenant {0})", tenantId.value()) : "";
|
||||
}
|
||||
|
||||
void debugOp(std::string opName, fdb::Key begin, fdb::Key end, std::optional<int> tenantId, std::string message) {
|
||||
if (BG_API_DEBUG_VERBOSE) {
|
||||
info(fmt::format("{0}: [{1} - {2}){3}: {4}",
|
||||
info(fmt::format("{0}: [{1} - {2}) {3}: {4}",
|
||||
opName,
|
||||
fdb::toCharsRef(begin),
|
||||
fdb::toCharsRef(end),
|
||||
tenantDebugString(tenantId),
|
||||
debugTenantStr(tenantId),
|
||||
message));
|
||||
}
|
||||
}
|
||||
|
@ -117,7 +114,7 @@ private:
|
|||
results.get()->assign(resVector.begin(), resVector.end());
|
||||
bool previousSuccess = seenReadSuccess(tenantId);
|
||||
if (!previousSuccess) {
|
||||
info(fmt::format("Read{0}: first success\n", tenantDebugString(tenantId)));
|
||||
info(fmt::format("Read {0}: first success\n", debugTenantStr(tenantId)));
|
||||
setReadSuccess(tenantId);
|
||||
} else {
|
||||
debugOp("Read", begin, end, tenantId, "complete");
|
||||
|
@ -289,20 +286,19 @@ private:
|
|||
}
|
||||
|
||||
// TODO: tenant support
|
||||
void randomGetBlobRangesOp(TTaskFct cont) {
|
||||
void randomGetBlobRangesOp(TTaskFct cont, std::optional<int> tenantId) {
|
||||
fdb::Key begin = randomKeyName();
|
||||
fdb::Key end = randomKeyName();
|
||||
auto results = std::make_shared<std::vector<fdb::KeyRange>>();
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
std::optional<int> tenantId = {};
|
||||
|
||||
debugOp("GetBlobRanges", begin, end, tenantId, "starting");
|
||||
|
||||
execOperation(
|
||||
[begin, end, results](auto ctx) {
|
||||
fdb::Future f = ctx->db().listBlobbifiedRanges(begin, end, 1000).eraseType();
|
||||
fdb::Future f = ctx->dbOps()->listBlobbifiedRanges(begin, end, 1000).eraseType();
|
||||
ctx->continueAfter(f, [ctx, f, results]() {
|
||||
*results = copyKeyRangeArray(f.get<fdb::future_var::KeyRangeRefArray>());
|
||||
ctx->done();
|
||||
|
@ -314,25 +310,24 @@ private:
|
|||
this->validateRanges(results, begin, end, seenReadSuccess(tenantId));
|
||||
schedule(cont);
|
||||
},
|
||||
getTenant(tenantId),
|
||||
/* failOnError = */ false);
|
||||
}
|
||||
|
||||
// TODO: tenant support
|
||||
void randomVerifyOp(TTaskFct cont) {
|
||||
void randomVerifyOp(TTaskFct cont, std::optional<int> tenantId) {
|
||||
fdb::Key begin = randomKeyName();
|
||||
fdb::Key end = randomKeyName();
|
||||
std::optional<int> tenantId;
|
||||
if (begin > end) {
|
||||
std::swap(begin, end);
|
||||
}
|
||||
|
||||
auto verifyVersion = std::make_shared<int64_t>(false);
|
||||
|
||||
debugOp("Verify", begin, end, tenantId, "starting");
|
||||
|
||||
auto verifyVersion = std::make_shared<int64_t>(-1);
|
||||
execOperation(
|
||||
[begin, end, verifyVersion](auto ctx) {
|
||||
fdb::Future f = ctx->db().verifyBlobRange(begin, end, -2 /* latest version*/).eraseType();
|
||||
fdb::Future f = ctx->dbOps()->verifyBlobRange(begin, end, -2 /* latest version*/).eraseType();
|
||||
ctx->continueAfter(f, [ctx, verifyVersion, f]() {
|
||||
*verifyVersion = f.get<fdb::future_var::Int64>();
|
||||
ctx->done();
|
||||
|
@ -344,15 +339,16 @@ private:
|
|||
if (*verifyVersion == -1) {
|
||||
ASSERT(!previousSuccess);
|
||||
} else if (!previousSuccess) {
|
||||
info(fmt::format("Verify{0}: first success\n", tenantDebugString(tenantId)));
|
||||
info(fmt::format("Verify {0}: first success\n", debugTenantStr(tenantId)));
|
||||
setReadSuccess(tenantId);
|
||||
}
|
||||
schedule(cont);
|
||||
},
|
||||
getTenant(tenantId),
|
||||
/* failOnError = */ false);
|
||||
}
|
||||
|
||||
void randomOperation(TTaskFct cont) {
|
||||
void randomOperation(TTaskFct cont) override {
|
||||
std::optional<int> tenantId = randomTenant();
|
||||
|
||||
OpType txType = (stores[tenantId].size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
|
||||
|
@ -380,10 +376,10 @@ private:
|
|||
randomSummarizeOp(cont, tenantId);
|
||||
break;
|
||||
case OP_GET_BLOB_RANGES:
|
||||
randomGetBlobRangesOp(cont);
|
||||
randomGetBlobRangesOp(cont, tenantId);
|
||||
break;
|
||||
case OP_VERIFY:
|
||||
randomVerifyOp(cont);
|
||||
randomVerifyOp(cont, tenantId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,8 @@ private:
|
|||
OP_LAST = OP_CANCEL_PURGE
|
||||
};
|
||||
|
||||
void setup(TTaskFct cont) override { setupBlobGranules(cont); }
|
||||
|
||||
// could add summarize too old and verify too old as ops if desired but those are lower value
|
||||
|
||||
// Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet
|
||||
|
|
|
@ -91,13 +91,15 @@ public:
|
|||
fdbDb = executor->selectDatabase();
|
||||
}
|
||||
|
||||
if (tenantName) {
|
||||
fdbTenant = fdbDb.openTenant(*tenantName);
|
||||
fdbDbOps = std::make_shared<fdb::Tenant>(fdbTenant);
|
||||
} else {
|
||||
fdbDbOps = std::make_shared<fdb::Database>(fdbDb);
|
||||
}
|
||||
|
||||
if (transactional) {
|
||||
if (tenantName) {
|
||||
fdb::Tenant tenant = fdbDb.openTenant(*tenantName);
|
||||
fdbTx = tenant.createTransaction();
|
||||
} else {
|
||||
fdbTx = fdbDb.createTransaction();
|
||||
}
|
||||
fdbTx = fdbDbOps->createTransaction();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,6 +111,10 @@ public:
|
|||
|
||||
fdb::Database db() override { return fdbDb.atomic_load(); }
|
||||
|
||||
fdb::Tenant tenant() override { return fdbTenant.atomic_load(); }
|
||||
|
||||
std::shared_ptr<fdb::IDatabaseOps> dbOps() override { return std::atomic_load(&fdbDbOps); }
|
||||
|
||||
fdb::Transaction tx() override { return fdbTx.atomic_load(); }
|
||||
|
||||
// Set a continuation to be executed when a future gets ready
|
||||
|
@ -272,13 +278,17 @@ protected:
|
|||
scheduler->schedule([thisRef]() {
|
||||
fdb::Database db = thisRef->executor->selectDatabase();
|
||||
thisRef->fdbDb.atomic_store(db);
|
||||
if (thisRef->tenantName) {
|
||||
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
|
||||
thisRef->fdbTenant.atomic_store(tenant);
|
||||
std::atomic_store(&thisRef->fdbDbOps,
|
||||
std::dynamic_pointer_cast<fdb::IDatabaseOps>(std::make_shared<fdb::Tenant>(tenant)));
|
||||
} else {
|
||||
std::atomic_store(&thisRef->fdbDbOps,
|
||||
std::dynamic_pointer_cast<fdb::IDatabaseOps>(std::make_shared<fdb::Database>(db)));
|
||||
}
|
||||
if (thisRef->transactional) {
|
||||
if (thisRef->tenantName) {
|
||||
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
|
||||
thisRef->fdbTx.atomic_store(tenant.createTransaction());
|
||||
} else {
|
||||
thisRef->fdbTx.atomic_store(db.createTransaction());
|
||||
}
|
||||
thisRef->fdbTx.atomic_store(thisRef->fdbDbOps->createTransaction());
|
||||
}
|
||||
thisRef->restartTransaction();
|
||||
});
|
||||
|
@ -317,6 +327,14 @@ protected:
|
|||
// Provides a thread safe interface by itself (no need for mutex)
|
||||
fdb::Database fdbDb;
|
||||
|
||||
// FDB tenant
|
||||
// Provides a thread safe interface by itself (no need for mutex)
|
||||
fdb::Tenant fdbTenant;
|
||||
|
||||
// FDB IDatabaseOps to hide database/tenant accordingly.
|
||||
// Provides a shared pointer to database functions based on if db or tenant.
|
||||
std::shared_ptr<fdb::IDatabaseOps> fdbDbOps;
|
||||
|
||||
// FDB transaction
|
||||
// Provides a thread safe interface by itself (no need for mutex)
|
||||
fdb::Transaction fdbTx;
|
||||
|
|
|
@ -41,6 +41,12 @@ public:
|
|||
// Current FDB database
|
||||
virtual fdb::Database db() = 0;
|
||||
|
||||
// Current FDB tenant
|
||||
virtual fdb::Tenant tenant() = 0;
|
||||
|
||||
// Current FDB IDatabaseOps
|
||||
virtual std::shared_ptr<fdb::IDatabaseOps> dbOps() = 0;
|
||||
|
||||
// Current FDB transaction
|
||||
virtual fdb::Transaction tx() = 0;
|
||||
|
||||
|
|
|
@ -117,8 +117,11 @@ void WorkloadBase::execTransaction(TOpStartFct startFct,
|
|||
}
|
||||
|
||||
// Execute a non-transactional database operation within the workload
|
||||
void WorkloadBase::execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError) {
|
||||
doExecute(startFct, cont, {}, failOnError, false);
|
||||
void WorkloadBase::execOperation(TOpStartFct startFct,
|
||||
TTaskFct cont,
|
||||
std::optional<fdb::BytesRef> tenant,
|
||||
bool failOnError) {
|
||||
doExecute(startFct, cont, tenant, failOnError, false);
|
||||
}
|
||||
|
||||
void WorkloadBase::doExecute(TOpStartFct startFct,
|
||||
|
|
|
@ -125,7 +125,10 @@ protected:
|
|||
bool failOnError = true);
|
||||
|
||||
// Execute a non-transactional database operation within the workload
|
||||
void execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError = true);
|
||||
void execOperation(TOpStartFct startFct,
|
||||
TTaskFct cont,
|
||||
std::optional<fdb::BytesRef> tenant = std::optional<fdb::BytesRef>(),
|
||||
bool failOnError = true);
|
||||
|
||||
// Log an error message, increase error counter
|
||||
void error(const std::string& msg);
|
||||
|
|
|
@ -677,7 +677,28 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
class Tenant final {
|
||||
// Handle this as an abstract class instead of interface to preserve lifetime of fdb objects owned by Tenant and
|
||||
// Database.
|
||||
class IDatabaseOps {
|
||||
public:
|
||||
virtual ~IDatabaseOps() = default;
|
||||
|
||||
virtual Transaction createTransaction() = 0;
|
||||
|
||||
virtual TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) = 0;
|
||||
virtual TypedFuture<future_var::Bool> unblobbifyRange(KeyRef begin, KeyRef end) = 0;
|
||||
virtual TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin,
|
||||
KeyRef end,
|
||||
int rangeLimit) = 0;
|
||||
virtual TypedFuture<future_var::Int64> verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) = 0;
|
||||
virtual TypedFuture<future_var::KeyRef> purgeBlobGranules(KeyRef begin,
|
||||
KeyRef end,
|
||||
int64_t version,
|
||||
bool force) = 0;
|
||||
virtual TypedFuture<future_var::None> waitPurgeGranulesComplete(KeyRef purgeKey) = 0;
|
||||
};
|
||||
|
||||
class Tenant final : public IDatabaseOps {
|
||||
friend class Database;
|
||||
std::shared_ptr<native::FDBTenant> tenant;
|
||||
|
||||
|
@ -694,6 +715,14 @@ public:
|
|||
Tenant& operator=(const Tenant&) noexcept = default;
|
||||
Tenant() noexcept : tenant(nullptr) {}
|
||||
|
||||
void atomic_store(Tenant other) { std::atomic_store(&tenant, other.tenant); }
|
||||
|
||||
Tenant atomic_load() {
|
||||
Tenant retVal;
|
||||
retVal.tenant = std::atomic_load(&tenant);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
static void createTenant(Transaction tr, BytesRef name) {
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, BytesRef());
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef());
|
||||
|
@ -715,7 +744,7 @@ public:
|
|||
return tr.get(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))), false);
|
||||
}
|
||||
|
||||
Transaction createTransaction() {
|
||||
Transaction createTransaction() override {
|
||||
auto tx_native = static_cast<native::FDBTransaction*>(nullptr);
|
||||
auto err = Error(native::fdb_tenant_create_transaction(tenant.get(), &tx_native));
|
||||
if (err)
|
||||
|
@ -723,14 +752,49 @@ public:
|
|||
return Transaction(tx_native);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) {
|
||||
TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("blobbifyRange from null tenant");
|
||||
throw std::runtime_error("blobbifyRange() from null tenant");
|
||||
return native::fdb_tenant_blobbify_range(tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end));
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Bool> unblobbifyRange(KeyRef begin, KeyRef end) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("unblobbifyRange() from null tenant");
|
||||
return native::fdb_tenant_unblobbify_range(
|
||||
tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end));
|
||||
}
|
||||
|
||||
TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("listBlobbifiedRanges() from null tenant");
|
||||
return native::fdb_tenant_list_blobbified_ranges(
|
||||
tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Int64> verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("verifyBlobRange() from null tenant");
|
||||
return native::fdb_tenant_verify_blob_range(
|
||||
tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), version);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::KeyRef> purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("purgeBlobGranules() from null tenant");
|
||||
native::fdb_bool_t forceBool = force;
|
||||
return native::fdb_tenant_purge_blob_granules(
|
||||
tenant.get(), begin.data(), intSize(begin), end.data(), intSize(end), version, forceBool);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::None> waitPurgeGranulesComplete(KeyRef purgeKey) override {
|
||||
if (!tenant)
|
||||
throw std::runtime_error("waitPurgeGranulesComplete() from null tenant");
|
||||
return native::fdb_tenant_wait_purge_granules_complete(tenant.get(), purgeKey.data(), intSize(purgeKey));
|
||||
}
|
||||
};
|
||||
|
||||
class Database {
|
||||
class Database : public IDatabaseOps {
|
||||
friend class Tenant;
|
||||
std::shared_ptr<native::FDBDatabase> db;
|
||||
|
||||
|
@ -789,7 +853,7 @@ public:
|
|||
return Tenant(tenant_native);
|
||||
}
|
||||
|
||||
Transaction createTransaction() {
|
||||
Transaction createTransaction() override {
|
||||
if (!db)
|
||||
throw std::runtime_error("create_transaction from null database");
|
||||
auto tx_native = static_cast<native::FDBTransaction*>(nullptr);
|
||||
|
@ -799,33 +863,33 @@ public:
|
|||
return Transaction(tx_native);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) {
|
||||
TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("listBlobbifiedRanges from null database");
|
||||
return native::fdb_database_list_blobbified_ranges(
|
||||
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Int64> verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) {
|
||||
TypedFuture<future_var::Int64> verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("verifyBlobRange from null database");
|
||||
return native::fdb_database_verify_blob_range(
|
||||
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) {
|
||||
TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("blobbifyRange from null database");
|
||||
return native::fdb_database_blobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end));
|
||||
}
|
||||
|
||||
TypedFuture<future_var::Bool> unblobbifyRange(KeyRef begin, KeyRef end) {
|
||||
TypedFuture<future_var::Bool> unblobbifyRange(KeyRef begin, KeyRef end) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("unblobbifyRange from null database");
|
||||
return native::fdb_database_unblobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end));
|
||||
}
|
||||
|
||||
TypedFuture<future_var::KeyRef> purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) {
|
||||
TypedFuture<future_var::KeyRef> purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("purgeBlobGranules from null database");
|
||||
native::fdb_bool_t forceBool = force;
|
||||
|
@ -833,7 +897,7 @@ public:
|
|||
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version, forceBool);
|
||||
}
|
||||
|
||||
TypedFuture<future_var::None> waitPurgeGranulesComplete(KeyRef purgeKey) {
|
||||
TypedFuture<future_var::None> waitPurgeGranulesComplete(KeyRef purgeKey) override {
|
||||
if (!db)
|
||||
throw std::runtime_error("purgeBlobGranules from null database");
|
||||
return native::fdb_database_wait_purge_granules_complete(db.get(), purgeKey.data(), intSize(purgeKey));
|
||||
|
|
|
@ -116,12 +116,12 @@ If an individual zone is unhealthy, it may cause the throttling ratio for storag
|
|||
### Client Rate Calculation
|
||||
The smoothed per-client rate for each tag is tracked within `GlobalTagThrottlerImpl::PerTagStatistics`. Once a target rate has been computed, this is passed to `GlobalTagThrotterImpl::PerTagStatistics::updateAndGetPerClientRate` which adjusts the per-client rate. The per-client rate is meant to limit the busiest clients, so that at equilibrium, the per-client rate will remain constant and the sum of throughput from all clients will match the target rate.
|
||||
|
||||
## Testing
|
||||
The `GlobalTagThrottling.toml` test provides a simple end-to-end test using the global tag throttler. Quotas are set using the internal tag quota API in the `GlobalTagThrottling` workload. This is run in parallel with the `ReadWrite` workload, which tags transactions. The number of `transaction_tag_throttled` errors is reported, along with the throughput, which should be roughly predictable based on the quota parameters chosen.
|
||||
## Simulation Testing
|
||||
The `ThroughputQuota.toml` test provides a simple end-to-end test using the global tag throttler. Quotas are set using the internal tag quota API in the `ThroughputQuota` workload. This is run with the `Cycle` workload, which randomly tags transactions.
|
||||
|
||||
In addition to this end-to-end test, there is a suite of unit tests with the `/GlobalTagThrottler/` prefix. These tests run in a mock environment, with mock storage servers providing simulated storage queue statistics and tag busyness reports. Mock clients simulate workload on these mock storage servers, and get throttling feedback directly from a global tag throttler which is monitoring the mock storage servers.
|
||||
|
||||
In each test, the `GlobalTagThrottlerTesting::monitor` function is used to periodically check whether or not a desired equilibrium state has been reached. If the desired state is reached and maintained for a sufficient period of time, the test passes. If the unit test is unable to reach this desired equilibrium state before a timeout, the test will fail. Commonly, the desired state is for the global tag throttler to report a client rate sufficiently close to the desired rate specified as an input to the `GlobalTagThrottlerTesting::rateIsNear` function.
|
||||
In each unit test, the `GlobalTagThrottlerTesting::monitor` function is used to periodically check whether or not a desired equilibrium state has been reached. If the desired state is reached and maintained for a sufficient period of time, the test passes. If the unit test is unable to reach this desired equilibrium state before a timeout, the test will fail. Commonly, the desired state is for the global tag throttler to report a client rate sufficiently close to the desired rate specified as an input to the `GlobalTagThrottlerTesting::rateIsNear` function.
|
||||
|
||||
## Visibility
|
||||
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
*/
|
||||
|
||||
#include "fdbcli/fdbcli.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last include
|
||||
|
||||
namespace {
|
||||
|
||||
enum class LimitType { RESERVED, TOTAL };
|
||||
enum class QuotaType { RESERVED, TOTAL, STORAGE };
|
||||
|
||||
Optional<TransactionTag> parseTag(StringRef token) {
|
||||
if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
|
||||
|
@ -33,17 +35,19 @@ Optional<TransactionTag> parseTag(StringRef token) {
|
|||
}
|
||||
}
|
||||
|
||||
Optional<LimitType> parseLimitType(StringRef token) {
|
||||
Optional<QuotaType> parseQuotaType(StringRef token) {
|
||||
if (token == "reserved_throughput"_sr) {
|
||||
return LimitType::RESERVED;
|
||||
return QuotaType::RESERVED;
|
||||
} else if (token == "total_throughput"_sr) {
|
||||
return LimitType::TOTAL;
|
||||
return QuotaType::TOTAL;
|
||||
} else if (token == "storage"_sr) {
|
||||
return QuotaType::STORAGE;
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
Optional<int64_t> parseLimitValue(StringRef token) {
|
||||
Optional<int64_t> parseQuotaValue(StringRef token) {
|
||||
try {
|
||||
return std::stol(token.toString());
|
||||
} catch (...) {
|
||||
|
@ -51,20 +55,26 @@ Optional<int64_t> parseLimitValue(StringRef token) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType) {
|
||||
ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, QuotaType quotaType) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
try {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
|
||||
state ThreadFuture<Optional<Value>> resultFuture =
|
||||
tr->get(quotaType == QuotaType::STORAGE ? storageQuotaKey(tag) : ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
if (!v.present()) {
|
||||
fmt::print("<empty>\n");
|
||||
} else {
|
||||
if (quotaType == QuotaType::STORAGE) {
|
||||
int64_t storageQuota = BinaryReader::fromStringRef<int64_t>(v.get(), Unversioned());
|
||||
fmt::print("{}\n", storageQuota);
|
||||
return Void();
|
||||
}
|
||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
if (limitType == LimitType::TOTAL) {
|
||||
if (quotaType == QuotaType::TOTAL) {
|
||||
fmt::print("{}\n", quota.totalQuota);
|
||||
} else if (limitType == LimitType::RESERVED) {
|
||||
} else if (quotaType == QuotaType::RESERVED) {
|
||||
fmt::print("{}\n", quota.reservedQuota);
|
||||
}
|
||||
}
|
||||
|
@ -75,32 +85,36 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, int64_t value) {
|
||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, QuotaType quotaType, int64_t value) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
ThrottleApi::TagQuotaValue quota;
|
||||
if (v.present()) {
|
||||
quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
if (quotaType == QuotaType::STORAGE) {
|
||||
tr->set(storageQuotaKey(tag), BinaryWriter::toValue<int64_t>(value, Unversioned()));
|
||||
} else {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
ThrottleApi::TagQuotaValue quota;
|
||||
if (v.present()) {
|
||||
quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
}
|
||||
// Internally, costs are stored in terms of pages, but in the API,
|
||||
// costs are specified in terms of bytes
|
||||
if (quotaType == QuotaType::TOTAL) {
|
||||
// Round up to nearest page size
|
||||
quota.totalQuota = ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) *
|
||||
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
|
||||
} else if (quotaType == QuotaType::RESERVED) {
|
||||
// Round up to nearest page size
|
||||
quota.reservedQuota = ((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) *
|
||||
CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
|
||||
}
|
||||
if (!quota.isValid()) {
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
ThrottleApi::setTagQuota(tr, tag, quota.reservedQuota, quota.totalQuota);
|
||||
}
|
||||
// Internally, costs are stored in terms of pages, but in the API,
|
||||
// costs are specified in terms of bytes
|
||||
if (limitType == LimitType::TOTAL) {
|
||||
// Round up to nearest page size
|
||||
quota.totalQuota =
|
||||
((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
|
||||
} else if (limitType == LimitType::RESERVED) {
|
||||
// Round up to nearest page size
|
||||
quota.reservedQuota =
|
||||
((value - 1) / CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE + 1) * CLIENT_KNOBS->TAG_THROTTLING_PAGE_SIZE;
|
||||
}
|
||||
if (!quota.isValid()) {
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
ThrottleApi::setTagQuota(tr, tag, quota.reservedQuota, quota.totalQuota);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
fmt::print("Successfully updated quota.\n");
|
||||
return Void();
|
||||
|
@ -115,6 +129,7 @@ ACTOR Future<Void> clearQuota(Reference<IDatabase> db, TransactionTag tag) {
|
|||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
tr->clear(storageQuotaKey(tag));
|
||||
tr->clear(ThrottleApi::getTagQuotaKey(tag));
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
fmt::print("Successfully cleared quota.\n");
|
||||
|
@ -125,8 +140,8 @@ ACTOR Future<Void> clearQuota(Reference<IDatabase> db, TransactionTag tag) {
|
|||
}
|
||||
}
|
||||
|
||||
constexpr auto usage = "quota [get <tag> [reserved_throughput|total_throughput] | set <tag> "
|
||||
"[reserved_throughput|total_throughput] <value> | clear <tag>]";
|
||||
constexpr auto usage = "quota [get <tag> [reserved_throughput|total_throughput|storage] | set <tag> "
|
||||
"[reserved_throughput|total_throughput|storage] <value> | clear <tag>]";
|
||||
|
||||
bool exitFailure() {
|
||||
fmt::print(usage);
|
||||
|
@ -150,22 +165,22 @@ ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<String
|
|||
if (tokens.size() != 4) {
|
||||
return exitFailure();
|
||||
}
|
||||
auto const limitType = parseLimitType(tokens[3]);
|
||||
if (!limitType.present()) {
|
||||
auto const quotaType = parseQuotaType(tokens[3]);
|
||||
if (!quotaType.present()) {
|
||||
return exitFailure();
|
||||
}
|
||||
wait(getQuota(db, tag.get(), limitType.get()));
|
||||
wait(getQuota(db, tag.get(), quotaType.get()));
|
||||
return true;
|
||||
} else if (tokens[1] == "set"_sr) {
|
||||
if (tokens.size() != 5) {
|
||||
return exitFailure();
|
||||
}
|
||||
auto const limitType = parseLimitType(tokens[3]);
|
||||
auto const limitValue = parseLimitValue(tokens[4]);
|
||||
if (!limitType.present() || !limitValue.present()) {
|
||||
auto const quotaType = parseQuotaType(tokens[3]);
|
||||
auto const quotaValue = parseQuotaValue(tokens[4]);
|
||||
if (!quotaType.present() || !quotaValue.present()) {
|
||||
return exitFailure();
|
||||
}
|
||||
wait(setQuota(db, tag.get(), limitType.get(), limitValue.get()));
|
||||
wait(setQuota(db, tag.get(), quotaType.get(), quotaValue.get()));
|
||||
return true;
|
||||
} else if (tokens[1] == "clear"_sr) {
|
||||
if (tokens.size() != 3) {
|
||||
|
|
|
@ -137,6 +137,11 @@ def quota(logger):
|
|||
logger.debug(command + ' : ' + output)
|
||||
assert output == 'Successfully updated quota.'
|
||||
|
||||
command = 'quota set green storage 98765'
|
||||
output = run_fdbcli_command(command)
|
||||
logger.debug(command + ' : ' + output)
|
||||
assert output == 'Successfully updated quota.'
|
||||
|
||||
command = 'quota get green total_throughput'
|
||||
output = run_fdbcli_command(command)
|
||||
logger.debug(command + ' : ' + output)
|
||||
|
@ -147,6 +152,11 @@ def quota(logger):
|
|||
logger.debug(command + ' : ' + output)
|
||||
assert output == '16384'
|
||||
|
||||
command = 'quota get green storage'
|
||||
output = run_fdbcli_command(command)
|
||||
logger.debug(command + ' : ' + output)
|
||||
assert output == '98765'
|
||||
|
||||
command = 'quota clear green'
|
||||
output = run_fdbcli_command(command)
|
||||
logger.debug(command + ' : ' + output)
|
||||
|
@ -157,6 +167,11 @@ def quota(logger):
|
|||
logger.debug(command + ' : ' + output)
|
||||
assert output == '<empty>'
|
||||
|
||||
command = 'quota get green storage'
|
||||
output = run_fdbcli_command(command)
|
||||
logger.debug(command + ' : ' + output)
|
||||
assert output == '<empty>'
|
||||
|
||||
# Too few arguments, should log help message
|
||||
command = 'quota get green'
|
||||
output = run_fdbcli_command(command)
|
||||
|
|
|
@ -971,6 +971,11 @@ void sortDeltasByKey(const Standalone<GranuleDeltas>& deltasByVersion,
|
|||
// clearVersion as previous guy)
|
||||
}
|
||||
|
||||
void sortDeltasByKey(const Standalone<GranuleDeltas>& deltasByVersion, const KeyRangeRef& fileRange) {
|
||||
SortedDeltasT deltasByKey;
|
||||
sortDeltasByKey(deltasByVersion, fileRange, deltasByKey);
|
||||
}
|
||||
|
||||
// FIXME: Could maybe reduce duplicated code between this and chunkedSnapshot for chunking
|
||||
Value serializeChunkedDeltaFile(const Standalone<StringRef>& fileNameRef,
|
||||
const Standalone<GranuleDeltas>& deltas,
|
||||
|
|
|
@ -5924,7 +5924,6 @@ public:
|
|||
printf("Restoring backup to version: %lld\n", (long long)targetVersion);
|
||||
}
|
||||
|
||||
state int retryCount = 0;
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
|
@ -5948,17 +5947,9 @@ public:
|
|||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
retryCount++;
|
||||
}
|
||||
if (e.code() == error_code_restore_duplicate_tag) {
|
||||
throw;
|
||||
}
|
||||
if (g_network->isSimulated() && retryCount > 50) {
|
||||
CODE_PROBE(true, "submitRestore simulation speedup");
|
||||
// try to make the read window back to normal size (5 * version_per_sec)
|
||||
g_simulator->speedUpSimulation = true;
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2565,6 +2565,12 @@ void setStorageQuota(Transaction& tr, StringRef tenantGroupName, int64_t quota)
|
|||
tr.set(key, BinaryWriter::toValue<int64_t>(quota, Unversioned()));
|
||||
}
|
||||
|
||||
void clearStorageQuota(Transaction& tr, StringRef tenantGroupName) {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
auto key = storageQuotaKey(tenantGroupName);
|
||||
tr.clear(key);
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<int64_t>> getStorageQuota(Transaction* tr, StringRef tenantGroupName) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state Optional<Value> v = wait(tr->get(storageQuotaKey(tenantGroupName)));
|
||||
|
|
|
@ -820,10 +820,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
|
||||
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
|
||||
// Read priority definitions in the form of a list of their relative concurrency share weights
|
||||
init( STORAGESERVER_READ_PRIORITIES, "120,10,20,40,60" );
|
||||
// The total concurrency which will be shared by active priorities according to their relative weights
|
||||
init( STORAGE_SERVER_READ_CONCURRENCY, 70 );
|
||||
// Priorities which each ReadType maps to, in enumeration order
|
||||
init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" );
|
||||
init( STORAGESERVER_READ_PRIORITIES, "48,32,8" );
|
||||
// The priority number which each ReadType maps to in enumeration order
|
||||
// This exists for flexibility but assigning each ReadType to its own unique priority number makes the most sense
|
||||
// The enumeration is currently: eager, fetch, low, normal, high
|
||||
init( STORAGESERVER_READTYPE_PRIORITY_MAP, "0,1,2,3,4" );
|
||||
|
||||
//Wait Failure
|
||||
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
|
||||
|
@ -947,7 +951,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
|
||||
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
|
||||
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
|
||||
init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" );
|
||||
init( REDWOOD_IO_PRIORITIES, "32,32,32,32" );
|
||||
init( REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT, false );
|
||||
|
||||
// Server request latency measurement
|
||||
|
@ -1021,6 +1025,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 );
|
||||
init( BLOB_FULL_RESTORE_MODE, false );
|
||||
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0);
|
||||
init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000);
|
||||
|
||||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
|
|
@ -56,4 +56,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
|||
|
||||
std::string randomBGFilename(UID blobWorkerID, UID granuleID, Version version, std::string suffix);
|
||||
|
||||
#endif
|
||||
// For benchmark testing only. It should never be called in prod.
|
||||
void sortDeltasByKey(const Standalone<GranuleDeltas>& deltasByVersion, const KeyRangeRef& fileRange);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -163,8 +163,9 @@ bool schemaMatch(json_spirit::mValue const& schema,
|
|||
// storage nodes
|
||||
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
|
||||
|
||||
// Set and get the storage quota per tenant group
|
||||
// Set/clear/get the storage quota for the given tenant group
|
||||
void setStorageQuota(Transaction& tr, StringRef tenantGroupName, int64_t quota);
|
||||
void clearStorageQuota(Transaction& tr, StringRef tenantGroupName);
|
||||
ACTOR Future<Optional<int64_t>> getStorageQuota(Transaction* tr, StringRef tenantGroupName);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -770,9 +770,9 @@ public:
|
|||
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||
int STORAGE_FEED_QUERY_HARD_LIMIT;
|
||||
int STORAGE_SERVER_READ_CONCURRENCY;
|
||||
std::string STORAGESERVER_READ_RANKS;
|
||||
std::string STORAGESERVER_READ_PRIORITIES;
|
||||
int STORAGE_SERVER_READ_CONCURRENCY;
|
||||
std::string STORAGESERVER_READTYPE_PRIORITY_MAP;
|
||||
|
||||
// Wait Failure
|
||||
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
|
||||
|
@ -921,7 +921,7 @@ public:
|
|||
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
|
||||
bool REDWOOD_SPLIT_ENCRYPTED_PAGES_BY_TENANT; // Whether to split pages by tenant if encryption is enabled
|
||||
|
||||
std::string REDWOOD_PRIORITY_LAUNCHS;
|
||||
std::string REDWOOD_IO_PRIORITIES;
|
||||
|
||||
// Server request latency measurement
|
||||
double LATENCY_SKETCH_ACCURACY;
|
||||
|
@ -996,6 +996,7 @@ public:
|
|||
double BLOB_MANIFEST_BACKUP_INTERVAL;
|
||||
bool BLOB_FULL_RESTORE_MODE;
|
||||
double BLOB_MIGRATOR_CHECK_INTERVAL;
|
||||
int BLOB_MANIFEST_RW_ROWS;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
|
@ -137,10 +138,23 @@ private:
|
|||
blobRangeKeys // Key ranges managed by blob
|
||||
};
|
||||
for (auto range : ranges) {
|
||||
// todo use getRangeStream for better performance
|
||||
RangeResult result = wait(tr.getRange(range, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
for (auto& row : result) {
|
||||
rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value));
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(range.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(range.end);
|
||||
loop {
|
||||
RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : result) {
|
||||
rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value));
|
||||
}
|
||||
if (!result.more) {
|
||||
break;
|
||||
}
|
||||
if (result.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(result.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(result.end()[-1].key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return rows;
|
||||
|
@ -152,6 +166,13 @@ private:
|
|||
|
||||
// Write data to blob manifest file
|
||||
ACTOR static Future<Void> writeToFile(Reference<BlobManifestDumper> self, Value data) {
|
||||
static int32_t lastWrittenBytes = 0;
|
||||
if (data.size() == lastWrittenBytes) {
|
||||
dprint("Skip writting blob manifest with same size {}\n", lastWrittenBytes);
|
||||
return Void();
|
||||
}
|
||||
lastWrittenBytes = data.size();
|
||||
|
||||
state Reference<BackupContainerFileSystem> writer;
|
||||
state std::string fullPath;
|
||||
|
||||
|
@ -212,7 +233,7 @@ public:
|
|||
ACTOR static Future<Void> execute(Reference<BlobManifestLoader> self) {
|
||||
try {
|
||||
Value data = wait(readFromFile(self));
|
||||
Standalone<BlobManifest> manifest = decode(data);
|
||||
state Standalone<BlobManifest> manifest = decode(data);
|
||||
wait(writeSystemKeys(self, manifest.rows));
|
||||
BlobGranuleRestoreVersionVector _ = wait(listGranules(self));
|
||||
} catch (Error& e) {
|
||||
|
@ -231,13 +252,32 @@ public:
|
|||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
try {
|
||||
std::vector<KeyRangeRef> granules;
|
||||
state Standalone<VectorRef<KeyRef>> blobRanges;
|
||||
// Read all granules
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(blobGranuleMappingKeys.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(blobGranuleMappingKeys.end);
|
||||
loop {
|
||||
RangeResult rows = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : rows) {
|
||||
blobRanges.push_back_deep(blobRanges.arena(), row.key);
|
||||
}
|
||||
if (!rows.more) {
|
||||
break;
|
||||
}
|
||||
if (rows.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(rows.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(rows.end()[-1].key);
|
||||
}
|
||||
}
|
||||
|
||||
// check each granule range
|
||||
state int i = 0;
|
||||
auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED;
|
||||
state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit));
|
||||
for (i = 0; i < blobRanges.size() - 1; i++) {
|
||||
Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key startKey = blobRanges[i].removePrefix(blobGranuleMappingKeys.begin);
|
||||
Key endKey = blobRanges[i + 1].removePrefix(blobGranuleMappingKeys.begin);
|
||||
state KeyRange granuleRange = KeyRangeRef(startKey, endKey);
|
||||
try {
|
||||
Standalone<BlobGranuleRestoreVersion> granule = wait(getGranule(&tr, granuleRange));
|
||||
|
@ -300,17 +340,32 @@ private:
|
|||
|
||||
// Write system keys to database
|
||||
ACTOR static Future<Void> writeSystemKeys(Reference<BlobManifestLoader> self, VectorRef<KeyValueRef> rows) {
|
||||
state int start = 0;
|
||||
state int end = 0;
|
||||
for (start = 0; start < rows.size(); start = end) {
|
||||
end = std::min(start + SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS, rows.size());
|
||||
wait(writeSystemKeys(self, rows, start, end));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Write system keys from start index to end(exclusive), so that we don't exceed the limit of transaction limit
|
||||
ACTOR static Future<Void> writeSystemKeys(Reference<BlobManifestLoader> self,
|
||||
VectorRef<KeyValueRef> rows,
|
||||
int start,
|
||||
int end) {
|
||||
state Transaction tr(self->db_);
|
||||
loop {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
for (auto& row : rows) {
|
||||
tr.set(row.key, row.value);
|
||||
for (int i = start; i < end; ++i) {
|
||||
tr.set(rows[i].key, rows[i].value);
|
||||
}
|
||||
wait(tr.commit());
|
||||
dprint("Blob manifest loaded {} rows\n", rows.size());
|
||||
dprint("Blob manifest loaded rows from {} to {}\n", start, end);
|
||||
TraceEvent("BlobManifestLoader").detail("RowStart", start).detail("RowEnd", end);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
@ -324,8 +379,7 @@ private:
|
|||
KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range);
|
||||
// reverse lookup so that the first row is the newest version
|
||||
state RangeResult results =
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::False, Reverse::True));
|
||||
|
||||
wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::True, Reverse::True));
|
||||
for (KeyValueRef row : results) {
|
||||
state KeyRange keyRange;
|
||||
state Version version;
|
||||
|
@ -367,24 +421,39 @@ private:
|
|||
|
||||
// List all files for given granule
|
||||
ACTOR static Future<std::vector<GranuleFileVersion>> listGranuleFiles(Transaction* tr, UID granuleID) {
|
||||
state std::vector<GranuleFileVersion> files;
|
||||
|
||||
state KeyRange fileKeyRange = blobGranuleFileKeyRangeFor(granuleID);
|
||||
RangeResult results = wait(tr->getRange(fileKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(fileKeyRange.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(fileKeyRange.end);
|
||||
loop {
|
||||
RangeResult results = wait(tr->getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& row : results) {
|
||||
UID gid;
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
Standalone<StringRef> filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::vector<GranuleFileVersion> files;
|
||||
for (auto& row : results) {
|
||||
UID gid;
|
||||
Version version;
|
||||
uint8_t fileType;
|
||||
Standalone<StringRef> filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) =
|
||||
decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
}
|
||||
if (!results.more) {
|
||||
break;
|
||||
}
|
||||
if (results.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(results.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(results.end()[-1].key);
|
||||
}
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
@ -466,12 +535,26 @@ ACTOR Future<bool> isFullRestoreMode(Database db, KeyRangeRef keys) {
|
|||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
RangeResult ranges = wait(tr.getRange(blobRestoreCommandKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
for (auto& r : ranges) {
|
||||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keyRange.contains(keys)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
return status.progress < 100; // progress is less than 100
|
||||
state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS);
|
||||
limits.minRows = 0;
|
||||
state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreCommandKeys.begin);
|
||||
state KeySelectorRef end = firstGreaterOrEqual(blobRestoreCommandKeys.end);
|
||||
loop {
|
||||
RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True));
|
||||
for (auto& r : ranges) {
|
||||
KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key);
|
||||
if (keyRange.contains(keys)) {
|
||||
Standalone<BlobRestoreStatus> status = decodeBlobRestoreStatus(r.value);
|
||||
return status.progress < 100; // progress is less than 100
|
||||
}
|
||||
}
|
||||
if (!ranges.more) {
|
||||
break;
|
||||
}
|
||||
if (ranges.readThrough.present()) {
|
||||
begin = firstGreaterOrEqual(ranges.readThrough.get());
|
||||
} else {
|
||||
begin = firstGreaterThan(ranges.end()[-1].key);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -697,6 +697,9 @@ struct DDQueue : public IDDRelocationQueue {
|
|||
RemoteTeamIsFull,
|
||||
RemoteTeamIsNotHealthy,
|
||||
NoAvailablePhysicalShard,
|
||||
UnknownForceNew,
|
||||
NoAnyHealthy,
|
||||
DstOverloaded,
|
||||
NumberOfTypes,
|
||||
};
|
||||
std::vector<int> retryFindDstReasonCount;
|
||||
|
@ -1626,6 +1629,13 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
ASSERT(foundTeams);
|
||||
ShardsAffectedByTeamFailure::Team primaryTeam =
|
||||
ShardsAffectedByTeamFailure::Team(bestTeams[0].first->getServerIDs(), true);
|
||||
if (forceToUseNewPhysicalShard &&
|
||||
retryFindDstReason == DDQueue::RetryFindDstReason::None) {
|
||||
// This is an abnormally state where we try to create new physical shard, but we
|
||||
// don't know why. This state is to track unknown reason for force creating new
|
||||
// physical shard.
|
||||
retryFindDstReason = DDQueue::RetryFindDstReason::UnknownForceNew;
|
||||
}
|
||||
physicalShardIDCandidate =
|
||||
self->physicalShardCollection->determinePhysicalShardIDGivenPrimaryTeam(
|
||||
primaryTeam, metrics, forceToUseNewPhysicalShard, debugID);
|
||||
|
@ -1648,6 +1658,14 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueue* self,
|
|||
break;
|
||||
}
|
||||
|
||||
if (retryFindDstReason == DDQueue::RetryFindDstReason::None && foundTeams) {
|
||||
if (!anyHealthy) {
|
||||
retryFindDstReason = DDQueue::RetryFindDstReason::NoAnyHealthy;
|
||||
} else if (anyDestOverloaded) {
|
||||
retryFindDstReason = DDQueue::RetryFindDstReason::DstOverloaded;
|
||||
}
|
||||
}
|
||||
|
||||
if (anyDestOverloaded) {
|
||||
CODE_PROBE(true, "Destination overloaded throttled move");
|
||||
destOverloadedCount++;
|
||||
|
@ -2519,6 +2537,12 @@ ACTOR Future<Void> dataDistributionQueue(Reference<IDDTxnProcessor> db,
|
|||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::RemoteTeamIsFull])
|
||||
.detail("RemoteTeamIsNotHealthy",
|
||||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::RemoteTeamIsNotHealthy])
|
||||
.detail("UnknownForceNew",
|
||||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::UnknownForceNew])
|
||||
.detail("NoAnyHealthy",
|
||||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::NoAnyHealthy])
|
||||
.detail("DstOverloaded",
|
||||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::DstOverloaded])
|
||||
.detail(
|
||||
"NoAvailablePhysicalShard",
|
||||
self.retryFindDstReasonCount[DDQueue::RetryFindDstReason::NoAvailablePhysicalShard]);
|
||||
|
|
|
@ -475,7 +475,7 @@ public:
|
|||
if (targetTps.present()) {
|
||||
auto const smoothedTargetTps = stats.updateAndGetTargetLimit(targetTps.get());
|
||||
te.detail("SmoothedTargetTps", smoothedTargetTps).detail("NumProxies", numProxies);
|
||||
result[tag] = smoothedTargetTps / numProxies;
|
||||
result[tag] = std::max(1.0, smoothedTargetTps / numProxies);
|
||||
} else {
|
||||
te.disable();
|
||||
}
|
||||
|
|
|
@ -58,6 +58,14 @@ void GrvProxyTagThrottler::TagQueue::rejectRequests(LatencyBandsMap& latencyBand
|
|||
}
|
||||
}
|
||||
|
||||
void GrvProxyTagThrottler::TagQueue::endReleaseWindow(int64_t numStarted, double elapsed) {
|
||||
if (rateInfo.present()) {
|
||||
CODE_PROBE(requests.empty(), "Tag queue ending release window with empty request queue");
|
||||
CODE_PROBE(!requests.empty(), "Tag queue ending release window with requests still queued");
|
||||
rateInfo.get().endReleaseWindow(numStarted, requests.empty(), elapsed);
|
||||
}
|
||||
}
|
||||
|
||||
GrvProxyTagThrottler::GrvProxyTagThrottler(double maxThrottleDuration)
|
||||
: maxThrottleDuration(maxThrottleDuration),
|
||||
latencyBandsMap("GrvProxyTagThrottler",
|
||||
|
@ -202,16 +210,14 @@ void GrvProxyTagThrottler::releaseTransactions(double elapsed,
|
|||
}
|
||||
}
|
||||
|
||||
// End release windows for queues with valid rateInfo
|
||||
// End release windows for all tag queues
|
||||
{
|
||||
TransactionTagMap<uint32_t> transactionsReleasedMap;
|
||||
for (const auto& [tag, count] : transactionsReleased) {
|
||||
transactionsReleasedMap[tag] = count;
|
||||
}
|
||||
for (auto& [tag, queue] : queues) {
|
||||
if (queue.rateInfo.present()) {
|
||||
queue.rateInfo.get().endReleaseWindow(transactionsReleasedMap[tag], false, elapsed);
|
||||
}
|
||||
queue.endReleaseWindow(transactionsReleasedMap[tag], elapsed);
|
||||
}
|
||||
}
|
||||
// If the capacity is increased, that means the vector has been illegally resized, potentially
|
||||
|
@ -438,3 +444,33 @@ TEST_CASE("/GrvProxyTagThrottler/Fifo") {
|
|||
wait(mockFifoClient(&throttler));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Tests that while throughput is low, the tag throttler
|
||||
// does not accumulate too much budget.
|
||||
//
|
||||
// A server is setup to server 10 transactions per second,
|
||||
// then runs idly for 60 seconds. Then a client starts
|
||||
// and attempts 20 transactions per second for 60 seconds.
|
||||
// The server throttles the client to only achieve
|
||||
// 10 transactions per second during this 60 second window.
|
||||
// If the throttler is allowed to accumulate budget indefinitely
|
||||
// during the idle 60 seconds, this test will fail.
|
||||
TEST_CASE("/GrvProxyTagThrottler/LimitedIdleBudget") {
|
||||
state GrvProxyTagThrottler throttler(5.0);
|
||||
state TagSet tagSet;
|
||||
state TransactionTagMap<uint32_t> counters;
|
||||
{
|
||||
TransactionTagMap<double> rates;
|
||||
rates["sampleTag"_sr] = 10.0;
|
||||
throttler.updateRates(rates);
|
||||
}
|
||||
tagSet.addTag("sampleTag"_sr);
|
||||
|
||||
state Future<Void> server = mockServer(&throttler);
|
||||
wait(delay(60.0));
|
||||
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters);
|
||||
wait(timeout(client && server, 60.0, Void()));
|
||||
TraceEvent("TagQuotaTest_LimitedIdleBudget").detail("Counter", counters["sampleTag"_sr]);
|
||||
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ bool GrvTransactionRateInfo::canStart(int64_t numAlreadyStarted, int64_t count)
|
|||
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
|
||||
}
|
||||
|
||||
void GrvTransactionRateInfo::endReleaseWindow(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
|
||||
void GrvTransactionRateInfo::endReleaseWindow(int64_t numStarted, bool queueEmpty, double elapsed) {
|
||||
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
|
||||
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the rate window that
|
||||
// elapsed.
|
||||
|
@ -52,16 +52,15 @@ void GrvTransactionRateInfo::endReleaseWindow(int64_t numStartedAtPriority, bool
|
|||
//
|
||||
// Note that "rate window" here indicates a period of SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW seconds,
|
||||
// whereas "release window" is the period between wait statements, with duration indicated by "elapsed."
|
||||
budget =
|
||||
std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
|
||||
budget = std::max(0.0, budget + elapsed * (limit - numStarted) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
|
||||
|
||||
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
|
||||
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
|
||||
if (queueEmptyAtPriority) {
|
||||
if (queueEmpty) {
|
||||
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
|
||||
}
|
||||
|
||||
smoothReleased.addDelta(numStartedAtPriority);
|
||||
smoothReleased.addDelta(numStarted);
|
||||
}
|
||||
|
||||
void GrvTransactionRateInfo::disable() {
|
||||
|
|
|
@ -289,11 +289,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
// Detect conflicts
|
||||
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
|
||||
ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
|
||||
Version newOldestVersion = req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
|
||||
if (g_network->isSimulated() && g_simulator->speedUpSimulation) {
|
||||
newOldestVersion = req.version - std::max(5 * SERVER_KNOBS->VERSIONS_PER_SECOND,
|
||||
SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS);
|
||||
}
|
||||
const Version newOldestVersion = req.version - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
|
||||
for (int t = 0; t < req.transactions.size(); t++) {
|
||||
conflictBatch.addTransaction(req.transactions[t], newOldestVersion);
|
||||
self->resolvedReadConflictRanges += req.transactions[t].read_conflict_ranges.size();
|
||||
|
|
|
@ -177,6 +177,11 @@ public:
|
|||
loop {
|
||||
try {
|
||||
state RangeResult currentQuotas = wait(tr.getRange(storageQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
// Reset the quota for all groups; this essentially sets the quota to `max` for groups where the
|
||||
// quota might have been cleared (i.e., groups that will not be returned in `getRange` request above).
|
||||
for (auto& [group, storage] : tenantCache->tenantStorageMap) {
|
||||
storage.quota = std::numeric_limits<int64_t>::max();
|
||||
}
|
||||
for (const auto kv : currentQuotas) {
|
||||
const TenantGroupName group = kv.key.removePrefix(storageQuotaPrefix);
|
||||
const int64_t quota = BinaryReader::fromStringRef<int64_t>(kv.value, Unversioned());
|
||||
|
|
|
@ -2025,7 +2025,8 @@ public:
|
|||
bool memoryOnly,
|
||||
Reference<IPageEncryptionKeyProvider> keyProvider,
|
||||
Promise<Void> errorPromise = {})
|
||||
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
|
||||
: keyProvider(keyProvider),
|
||||
ioLock(makeReference<PriorityMultiLock>(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_IO_PRIORITIES)),
|
||||
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
|
||||
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
|
||||
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
|
||||
|
@ -2037,7 +2038,7 @@ public:
|
|||
// This sets the page cache size for all PageCacheT instances using the same evictor
|
||||
pageCache.evictor().sizeLimit = pageCacheBytes;
|
||||
|
||||
g_redwoodMetrics.ioLock = &ioLock;
|
||||
g_redwoodMetrics.ioLock = ioLock.getPtr();
|
||||
if (!g_redwoodMetricsActor.isValid()) {
|
||||
g_redwoodMetricsActor = redwoodMetricsLogger();
|
||||
}
|
||||
|
@ -2499,7 +2500,7 @@ public:
|
|||
unsigned int level,
|
||||
bool header) {
|
||||
|
||||
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(header ? ioMaxPriority : ioMinPriority));
|
||||
state PriorityMultiLock::Lock lock = wait(self->ioLock->lock(header ? ioMaxPriority : ioMinPriority));
|
||||
++g_redwoodMetrics.metric.pagerDiskWrite;
|
||||
g_redwoodMetrics.level(level).metrics.events.addEventReason(PagerEvents::PageWrite, reason);
|
||||
if (self->memoryOnly) {
|
||||
|
@ -2779,7 +2780,7 @@ public:
|
|||
int blockSize,
|
||||
int64_t offset,
|
||||
int priority) {
|
||||
state PriorityMultiLock::Lock lock = wait(self->ioLock.lock(std::min(priority, ioMaxPriority)));
|
||||
state PriorityMultiLock::Lock lock = wait(self->ioLock->lock(std::min(priority, ioMaxPriority)));
|
||||
++g_redwoodMetrics.metric.pagerDiskRead;
|
||||
int bytes = wait(self->pageFile->read(pageBuffer->rawData() + pageOffset, blockSize, offset));
|
||||
return bytes;
|
||||
|
@ -3593,7 +3594,7 @@ public:
|
|||
|
||||
// The next section explicitly cancels all pending operations held in the pager
|
||||
debug_printf("DWALPager(%s) shutdown kill ioLock\n", self->filename.c_str());
|
||||
self->ioLock.kill();
|
||||
self->ioLock->kill();
|
||||
|
||||
debug_printf("DWALPager(%s) shutdown cancel recovery\n", self->filename.c_str());
|
||||
self->recoverFuture.cancel();
|
||||
|
@ -3802,7 +3803,7 @@ private:
|
|||
|
||||
Reference<IPageEncryptionKeyProvider> keyProvider;
|
||||
|
||||
PriorityMultiLock ioLock;
|
||||
Reference<PriorityMultiLock> ioLock;
|
||||
|
||||
int64_t pageCacheBytes;
|
||||
|
||||
|
@ -8894,32 +8895,25 @@ void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
|
|||
int maxPriority = ioLock->maxPriority();
|
||||
|
||||
if (e != nullptr) {
|
||||
e->detail("ActiveReads", ioLock->totalRunners());
|
||||
e->detail("AwaitReads", ioLock->totalWaiters());
|
||||
e->detail("IOActiveTotal", ioLock->getRunnersCount());
|
||||
e->detail("IOWaitingTotal", ioLock->getWaitersCount());
|
||||
|
||||
for (int priority = 0; priority <= maxPriority; ++priority) {
|
||||
e->detail(format("ActiveP%d", priority), ioLock->numRunners(priority));
|
||||
e->detail(format("AwaitP%d", priority), ioLock->numWaiters(priority));
|
||||
e->detail(format("IOActiveP%d", priority), ioLock->getRunnersCount(priority));
|
||||
e->detail(format("IOWaitingP%d", priority), ioLock->getWaitersCount(priority));
|
||||
}
|
||||
}
|
||||
|
||||
if (s != nullptr) {
|
||||
std::string active = "Active";
|
||||
std::string await = "Await";
|
||||
|
||||
*s += "\n";
|
||||
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalRunners());
|
||||
*s += format("%-15s %-8u ", "AwaitReads", ioLock->totalWaiters());
|
||||
*s += "\n";
|
||||
|
||||
*s += format("%-15s %-8u ", "IOActiveTotal", ioLock->getRunnersCount());
|
||||
for (int priority = 0; priority <= maxPriority; ++priority) {
|
||||
*s +=
|
||||
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numRunners(priority));
|
||||
*s += format("IOActiveP%-6d %-8u ", priority, ioLock->getRunnersCount(priority));
|
||||
}
|
||||
*s += "\n";
|
||||
*s += format("%-15s %-8u ", "IOWaitingTotal", ioLock->getWaitersCount());
|
||||
for (int priority = 0; priority <= maxPriority; ++priority) {
|
||||
*s +=
|
||||
format("%-15s %-8u ", (await + 'P' + std::to_string(priority)).c_str(), ioLock->numWaiters(priority));
|
||||
*s += format("IOWaitingP%-5d %-8u ", priority, ioLock->getWaitersCount(priority));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11407,57 +11401,3 @@ TEST_CASE(":/redwood/performance/histograms") {
|
|||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
|
||||
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
|
||||
wait(delay(deterministicRandom()->random01() * .1));
|
||||
++*pout;
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/redwood/PriorityMultiLock") {
|
||||
state std::vector<int> priorities = { 10, 20, 40 };
|
||||
state int concurrency = 25;
|
||||
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
|
||||
state std::vector<int> counts;
|
||||
counts.resize(priorities.size(), 0);
|
||||
|
||||
// Clog the lock buy taking concurrency locks at each level
|
||||
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
|
||||
for (int i = 0; i < priorities.size(); ++i) {
|
||||
for (int j = 0; j < concurrency; ++j) {
|
||||
lockFutures.push_back(pml->lock(i));
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for n = concurrency locks to be acquired
|
||||
wait(quorum(lockFutures, concurrency));
|
||||
|
||||
state std::vector<Future<Void>> futures;
|
||||
for (int i = 0; i < 10e3; ++i) {
|
||||
int p = i % priorities.size();
|
||||
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
|
||||
}
|
||||
|
||||
state Future<Void> f = waitForAll(futures);
|
||||
|
||||
// Release the locks
|
||||
lockFutures.clear();
|
||||
|
||||
// Print stats and wait for all futures to be ready
|
||||
loop {
|
||||
choose {
|
||||
when(wait(delay(1))) {
|
||||
printf("counts: ");
|
||||
for (auto c : counts) {
|
||||
printf("%d ", c);
|
||||
}
|
||||
printf(" pml: %s\n", pml->toString().c_str());
|
||||
}
|
||||
when(wait(f)) { break; }
|
||||
}
|
||||
}
|
||||
|
||||
delete pml;
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ class GrvProxyTagThrottler {
|
|||
void setRate(double rate);
|
||||
bool isMaxThrottled(double maxThrottleDuration) const;
|
||||
void rejectRequests(LatencyBandsMap&);
|
||||
void endReleaseWindow(int64_t numStarted, double elapsed);
|
||||
};
|
||||
|
||||
// Track the budgets for each tag
|
||||
|
|
|
@ -55,7 +55,7 @@ public:
|
|||
|
||||
// Updates the budget to accumulate any extra capacity available or remove any excess that was used.
|
||||
// Call at the end of a release window.
|
||||
void endReleaseWindow(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed);
|
||||
void endReleaseWindow(int64_t numStarted, bool queueEmpty, double elapsed);
|
||||
|
||||
// Smoothly sets rate. If currently disabled, reenable
|
||||
void setRate(double rate);
|
||||
|
|
|
@ -208,7 +208,7 @@ class Ratekeeper {
|
|||
Deque<std::pair<double, Version>> blobWorkerVersionHistory;
|
||||
Optional<Key> remoteDC;
|
||||
|
||||
double getRecoveryDuration(Version ver) {
|
||||
double getRecoveryDuration(Version ver) const {
|
||||
auto it = version_recovery.lower_bound(ver);
|
||||
double recoveryDuration = 0;
|
||||
while (it != version_recovery.end()) {
|
||||
|
|
|
@ -1110,15 +1110,13 @@ public:
|
|||
|
||||
FlowLock serveFetchCheckpointParallelismLock;
|
||||
|
||||
PriorityMultiLock ssLock;
|
||||
Reference<PriorityMultiLock> ssLock;
|
||||
std::vector<int> readPriorityRanks;
|
||||
|
||||
Future<PriorityMultiLock::Lock> getReadLock(const Optional<ReadOptions>& options) {
|
||||
// TODO: Fix perf regression in 100% cache read case where taking this lock adds too much overhead
|
||||
return PriorityMultiLock::Lock();
|
||||
// int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
|
||||
// readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
|
||||
// return ssLock.lock(readPriorityRanks[readType]);
|
||||
int readType = (int)(options.present() ? options.get().type : ReadType::NORMAL);
|
||||
readType = std::clamp<int>(readType, 0, readPriorityRanks.size() - 1);
|
||||
return ssLock->lock(readPriorityRanks[readType]);
|
||||
}
|
||||
|
||||
FlowLock serveAuditStorageParallelismLock;
|
||||
|
@ -1407,7 +1405,8 @@ public:
|
|||
fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL),
|
||||
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
||||
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
|
||||
ssLock(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
|
||||
ssLock(makeReference<PriorityMultiLock>(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY,
|
||||
SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES)),
|
||||
serveAuditStorageParallelismLock(SERVER_KNOBS->SERVE_AUDIT_STORAGE_PARALLELISM),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
|
||||
|
@ -1415,7 +1414,7 @@ public:
|
|||
busiestWriteTagContext(ssi.id()), counters(this),
|
||||
storageServerSourceTLogIDEventHolder(
|
||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
||||
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ',');
|
||||
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READTYPE_PRIORITY_MAP, ',');
|
||||
ASSERT(readPriorityRanks.size() > (int)ReadType::MAX);
|
||||
version.initMetric("StorageServer.Version"_sr, counters.cc.getId());
|
||||
oldestVersion.initMetric("StorageServer.OldestVersion"_sr, counters.cc.getId());
|
||||
|
@ -10431,20 +10430,20 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
|
|||
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
|
||||
te.detail("Tag", self->tag.toString());
|
||||
std::vector<int> rpr = self->readPriorityRanks;
|
||||
te.detail("ReadsActive", self->ssLock.totalRunners());
|
||||
te.detail("ReadsWaiting", self->ssLock.totalWaiters());
|
||||
te.detail("ReadsTotalActive", self->ssLock->getRunnersCount());
|
||||
te.detail("ReadsTotalWaiting", self->ssLock->getWaitersCount());
|
||||
int type = (int)ReadType::FETCH;
|
||||
te.detail("ReadFetchActive", self->ssLock.numRunners(rpr[type]));
|
||||
te.detail("ReadFetchWaiting", self->ssLock.numWaiters(rpr[type]));
|
||||
te.detail("ReadFetchActive", self->ssLock->getRunnersCount(rpr[type]));
|
||||
te.detail("ReadFetchWaiting", self->ssLock->getWaitersCount(rpr[type]));
|
||||
type = (int)ReadType::LOW;
|
||||
te.detail("ReadLowActive", self->ssLock.numRunners(rpr[type]));
|
||||
te.detail("ReadLowWaiting", self->ssLock.numWaiters(rpr[type]));
|
||||
te.detail("ReadLowActive", self->ssLock->getRunnersCount(rpr[type]));
|
||||
te.detail("ReadLowWaiting", self->ssLock->getWaitersCount(rpr[type]));
|
||||
type = (int)ReadType::NORMAL;
|
||||
te.detail("ReadNormalActive", self->ssLock.numRunners(rpr[type]));
|
||||
te.detail("ReadNormalWaiting", self->ssLock.numWaiters(rpr[type]));
|
||||
te.detail("ReadNormalActive", self->ssLock->getRunnersCount(rpr[type]));
|
||||
te.detail("ReadNormalWaiting", self->ssLock->getWaitersCount(rpr[type]));
|
||||
type = (int)ReadType::HIGH;
|
||||
te.detail("ReadHighActive", self->ssLock.numRunners(rpr[type]));
|
||||
te.detail("ReadHighWaiting", self->ssLock.numWaiters(rpr[type]));
|
||||
te.detail("ReadHighActive", self->ssLock->getRunnersCount(rpr[type]));
|
||||
te.detail("ReadHighWaiting", self->ssLock->getWaitersCount(rpr[type]));
|
||||
StorageBytes sb = self->storage.getStorageBytes();
|
||||
te.detail("KvstoreBytesUsed", sb.used);
|
||||
te.detail("KvstoreBytesFree", sb.free);
|
||||
|
@ -11260,7 +11259,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||
// If the storage server dies while something that uses self is still on the stack,
|
||||
// we want that actor to complete before we terminate and that memory goes out of scope
|
||||
|
||||
self.ssLock.kill();
|
||||
self.ssLock->kill();
|
||||
|
||||
state Error err = e;
|
||||
if (storageServerTerminated(self, persistentData, err)) {
|
||||
|
@ -11358,7 +11357,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||
throw internal_error();
|
||||
} catch (Error& e) {
|
||||
|
||||
self.ssLock.kill();
|
||||
self.ssLock->kill();
|
||||
|
||||
if (self.byteSampleRecovery.isValid()) {
|
||||
self.byteSampleRecovery.cancel();
|
||||
|
|
|
@ -97,9 +97,13 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
state bool rejected2 = wait(tryWrite(self, cx, self->emptyTenant, /*expectOk=*/false));
|
||||
ASSERT(rejected2);
|
||||
|
||||
// Increase the quota. Check that writes to both the tenants are now able to commit.
|
||||
quota = size * 2;
|
||||
wait(setStorageQuotaHelper(cx, self->group, quota));
|
||||
// Increase the quota or clear the quota. Check that writes to both the tenants are now able to commit.
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
quota = size * 2;
|
||||
wait(setStorageQuotaHelper(cx, self->group, quota));
|
||||
} else {
|
||||
wait(clearStorageQuotaHelper(cx, self->group));
|
||||
}
|
||||
state bool committed1 = wait(tryWrite(self, cx, self->tenant, /*expectOk=*/true));
|
||||
ASSERT(committed1);
|
||||
state bool committed2 = wait(tryWrite(self, cx, self->emptyTenant, /*expectOk=*/true));
|
||||
|
@ -144,12 +148,24 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> clearStorageQuotaHelper(Database cx, TenantGroupName tenantGroupName) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
clearStorageQuota(tr, tenantGroupName);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<int64_t>> getStorageQuotaHelper(Database cx, TenantGroupName tenantGroupName) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
state Optional<int64_t> quota = wait(getStorageQuota(&tr, tenantGroupName));
|
||||
wait(tr.commit());
|
||||
return quota;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
|
|
@ -26,8 +26,8 @@
|
|||
// This workload sets the throughput quota of a tag during the setup phase
|
||||
class ThroughputQuotaWorkload : public TestWorkload {
|
||||
TransactionTag transactionTag;
|
||||
double reservedQuota{ 0.0 };
|
||||
double totalQuota{ 0.0 };
|
||||
int64_t reservedQuota{ 0 };
|
||||
int64_t totalQuota{ 0 };
|
||||
|
||||
ACTOR static Future<Void> setup(ThroughputQuotaWorkload* self, Database cx) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
|
|
@ -29,21 +29,25 @@
|
|||
#define PRIORITYMULTILOCK_ACTOR_H
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include <boost/intrusive/list.hpp>
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define PRIORITYMULTILOCK_DEBUG 0
|
||||
|
||||
#if PRIORITYMULTILOCK_DEBUG || !defined(NO_INTELLISENSE)
|
||||
#define pml_debug_printf(...) \
|
||||
if (now() > 0) \
|
||||
printf(__VA_ARGS__)
|
||||
if (now() > 0) { \
|
||||
printf("pml line=%04d ", __LINE__); \
|
||||
printf(__VA_ARGS__); \
|
||||
}
|
||||
#else
|
||||
#define pml_debug_printf(...)
|
||||
#endif
|
||||
|
||||
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
|
||||
// id and are granted locks based on a total concurrency and relative weights of the current active
|
||||
// priorities. Priority id's must start at 0 and are sequential integers.
|
||||
// priorities. Priority id's must start at 0 and are sequential integers. Priority id numbers
|
||||
// are not related to the importance of the priority in execution.
|
||||
//
|
||||
// Scheduling logic
|
||||
// Let
|
||||
|
@ -64,17 +68,17 @@
|
|||
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
|
||||
//
|
||||
// Usage:
|
||||
// Lock lock = wait(prioritylock.lock(priorityLevel));
|
||||
// Lock lock = wait(prioritylock.lock(priority_id));
|
||||
// lock.release(); // Explicit release, or
|
||||
// // let lock and all copies of lock go out of scope to release
|
||||
class PriorityMultiLock {
|
||||
|
||||
class PriorityMultiLock : public ReferenceCounted<PriorityMultiLock> {
|
||||
public:
|
||||
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
|
||||
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
|
||||
// the Lock before it goes out of scope.
|
||||
struct Lock {
|
||||
void release() { promise.send(Void()); }
|
||||
bool isLocked() const { return promise.canBeSet(); }
|
||||
|
||||
// This is exposed in case the caller wants to use/copy it directly
|
||||
Promise<Void> promise;
|
||||
|
@ -84,10 +88,11 @@ public:
|
|||
: PriorityMultiLock(concurrency, parseStringToVector<int>(weights, ',')) {}
|
||||
|
||||
PriorityMultiLock(int concurrency, std::vector<int> weightsByPriority)
|
||||
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0), releaseDebugID(0) {
|
||||
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0) {
|
||||
|
||||
priorities.resize(weightsByPriority.size());
|
||||
for (int i = 0; i < priorities.size(); ++i) {
|
||||
priorities[i].priority = i;
|
||||
priorities[i].weight = weightsByPriority[i];
|
||||
}
|
||||
|
||||
|
@ -102,7 +107,8 @@ public:
|
|||
|
||||
// If this priority currently has no waiters
|
||||
if (q.empty()) {
|
||||
// Add this priority's weight to the total for priorities with pending work
|
||||
// Add this priority's weight to the total for priorities with pending work. This must be done
|
||||
// so that currenctCapacity() below will assign capacaity to this priority.
|
||||
totalPendingWeights += p.weight;
|
||||
|
||||
// If there are slots available and the priority has capacity then don't make the caller wait
|
||||
|
@ -114,80 +120,71 @@ public:
|
|||
Lock lock;
|
||||
addRunner(lock, &p);
|
||||
|
||||
pml_debug_printf("lock nowait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
|
||||
pml_debug_printf("lock nowait priority %d %s\n", priority, toString().c_str());
|
||||
return lock;
|
||||
}
|
||||
|
||||
// If we didn't return above then add the priority to the waitingPriorities list
|
||||
waitingPriorities.push_back(p);
|
||||
}
|
||||
|
||||
Waiter w;
|
||||
q.push_back(w);
|
||||
Waiter& w = q.emplace_back();
|
||||
++waiting;
|
||||
|
||||
pml_debug_printf("lock wait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
|
||||
pml_debug_printf("lock wait priority %d %s\n", priority, toString().c_str());
|
||||
return w.lockPromise.getFuture();
|
||||
}
|
||||
|
||||
void kill() {
|
||||
pml_debug_printf("kill %s\n", toString().c_str());
|
||||
brokenOnDestruct.reset();
|
||||
|
||||
// handleRelease will not free up any execution slots when it ends via cancel
|
||||
fRunner.cancel();
|
||||
available = 0;
|
||||
runners.clear();
|
||||
priorities.clear();
|
||||
|
||||
waitingPriorities.clear();
|
||||
for (auto& p : priorities) {
|
||||
p.queue.clear();
|
||||
}
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
int runnersDone = 0;
|
||||
for (int i = 0; i < runners.size(); ++i) {
|
||||
if (runners[i].isReady()) {
|
||||
++runnersDone;
|
||||
}
|
||||
}
|
||||
|
||||
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d "
|
||||
"runnersDone=%d pendingWeights=%d ",
|
||||
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d "
|
||||
"pendingWeights=%d ",
|
||||
this,
|
||||
concurrency,
|
||||
available,
|
||||
concurrency - available,
|
||||
waiting,
|
||||
runners.size(),
|
||||
runnersDone,
|
||||
totalPendingWeights);
|
||||
|
||||
for (int i = 0; i < priorities.size(); ++i) {
|
||||
s += format("p%d:{%s} ", i, priorities[i].toString(this).c_str());
|
||||
for (auto& p : priorities) {
|
||||
s += format("{%s} ", p.toString(this).c_str());
|
||||
}
|
||||
|
||||
s += "}";
|
||||
|
||||
if (concurrency - available != runners.size() - runnersDone) {
|
||||
pml_debug_printf("%s\n", s.c_str());
|
||||
ASSERT_EQ(concurrency - available, runners.size() - runnersDone);
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
int maxPriority() const { return priorities.size() - 1; }
|
||||
|
||||
int totalWaiters() const { return waiting; }
|
||||
int getRunnersCount() const { return concurrency - available; }
|
||||
int getWaitersCount() const { return waiting; }
|
||||
|
||||
int numWaiters(const unsigned int priority) const {
|
||||
int getWaitersCount(const unsigned int priority) const {
|
||||
ASSERT(priority < priorities.size());
|
||||
return priorities[priority].queue.size();
|
||||
}
|
||||
|
||||
int totalRunners() const { return concurrency - available; }
|
||||
|
||||
int numRunners(const unsigned int priority) const {
|
||||
int getRunnersCount(const unsigned int priority) const {
|
||||
ASSERT(priority < priorities.size());
|
||||
return priorities[priority].runners;
|
||||
}
|
||||
|
||||
private:
|
||||
struct Waiter {
|
||||
Waiter() {}
|
||||
Promise<Lock> lockPromise;
|
||||
};
|
||||
|
||||
|
@ -202,8 +199,8 @@ private:
|
|||
|
||||
typedef Deque<Waiter> Queue;
|
||||
|
||||
struct Priority {
|
||||
Priority() : runners(0), weight(0) {}
|
||||
struct Priority : boost::intrusive::list_base_hook<> {
|
||||
Priority() : runners(0), weight(0), priority(-1) {}
|
||||
|
||||
// Queue of waiters at this priority
|
||||
Queue queue;
|
||||
|
@ -211,9 +208,12 @@ private:
|
|||
int runners;
|
||||
// Configured weight for this priority
|
||||
int weight;
|
||||
// Priority number for convenience, matches *this's index in PML priorities vector
|
||||
int priority;
|
||||
|
||||
std::string toString(const PriorityMultiLock* pml) const {
|
||||
return format("weight=%d run=%d wait=%d cap=%d",
|
||||
return format("priority=%d weight=%d run=%d wait=%d cap=%d",
|
||||
priority,
|
||||
weight,
|
||||
runners,
|
||||
queue.size(),
|
||||
|
@ -222,51 +222,41 @@ private:
|
|||
};
|
||||
|
||||
std::vector<Priority> priorities;
|
||||
typedef boost::intrusive::list<Priority, boost::intrusive::constant_time_size<false>> WaitingPrioritiesList;
|
||||
|
||||
// Current or recent (ended) runners
|
||||
Deque<Future<Void>> runners;
|
||||
// List of all priorities with 1 or more waiters. This list exists so that the scheduling loop
|
||||
// does not have to iterage over the priorities vector checking priorities without waiters.
|
||||
WaitingPrioritiesList waitingPriorities;
|
||||
|
||||
Future<Void> fRunner;
|
||||
AsyncTrigger wakeRunner;
|
||||
Promise<Void> brokenOnDestruct;
|
||||
|
||||
// Used for debugging, can roll over without issue
|
||||
unsigned int releaseDebugID;
|
||||
|
||||
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Future<Void> f, Priority* priority) {
|
||||
state [[maybe_unused]] unsigned int id = self->releaseDebugID++;
|
||||
|
||||
pml_debug_printf("%f handleRelease self=%p id=%u start \n", now(), self, id);
|
||||
ACTOR static void handleRelease(Reference<PriorityMultiLock> self, Priority* priority, Future<Void> holder) {
|
||||
pml_debug_printf("%f handleRelease self=%p start\n", now(), self.getPtr());
|
||||
try {
|
||||
wait(f);
|
||||
pml_debug_printf("%f handleRelease self=%p id=%u success\n", now(), self, id);
|
||||
wait(holder);
|
||||
pml_debug_printf("%f handleRelease self=%p success\n", now(), self.getPtr());
|
||||
} catch (Error& e) {
|
||||
pml_debug_printf("%f handleRelease self=%p id=%u error %s\n", now(), self, id, e.what());
|
||||
if (e.code() == error_code_actor_cancelled) {
|
||||
throw;
|
||||
}
|
||||
pml_debug_printf("%f handleRelease self=%p error %s\n", now(), self.getPtr(), e.what());
|
||||
}
|
||||
|
||||
pml_debug_printf("lock release line %d priority %d %s\n",
|
||||
__LINE__,
|
||||
(int)(priority - &self->priorities.front()),
|
||||
self->toString().c_str());
|
||||
pml_debug_printf("lock release priority %d %s\n", (int)(priority->priority), self->toString().c_str());
|
||||
|
||||
pml_debug_printf("%f handleRelease self=%p id=%u releasing\n", now(), self, id);
|
||||
pml_debug_printf("%f handleRelease self=%p releasing\n", now(), self.getPtr());
|
||||
++self->available;
|
||||
priority->runners -= 1;
|
||||
|
||||
// If there are any waiters or if the runners array is getting large, trigger the runner loop
|
||||
if (self->waiting > 0 || self->runners.size() > 1000) {
|
||||
if (self->waiting > 0) {
|
||||
self->wakeRunner.trigger();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
void addRunner(Lock& lock, Priority* p) {
|
||||
p->runners += 1;
|
||||
void addRunner(Lock& lock, Priority* priority) {
|
||||
priority->runners += 1;
|
||||
--available;
|
||||
runners.push_back(handleRelease(this, lock.promise.getFuture(), p));
|
||||
handleRelease(Reference<PriorityMultiLock>::addRef(this), priority, lock.promise.getFuture());
|
||||
}
|
||||
|
||||
// Current maximum running tasks for the specified priority, which must have waiters
|
||||
|
@ -278,76 +268,50 @@ private:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
|
||||
state int sinceYield = 0;
|
||||
state Future<Void> error = self->brokenOnDestruct.getFuture();
|
||||
|
||||
// Priority to try to run tasks from next
|
||||
state int priority = 0;
|
||||
state WaitingPrioritiesList::iterator p = self->waitingPriorities.end();
|
||||
|
||||
loop {
|
||||
pml_debug_printf(
|
||||
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
|
||||
|
||||
// Cleanup finished runner futures at the front of the runner queue.
|
||||
while (!self->runners.empty() && self->runners.front().isReady()) {
|
||||
self->runners.pop_front();
|
||||
}
|
||||
pml_debug_printf("runner loop start priority=%d %s\n", p->priority, self->toString().c_str());
|
||||
|
||||
// Wait for a runner to release its lock
|
||||
pml_debug_printf(
|
||||
"runner loop waitTrigger line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
|
||||
pml_debug_printf("runner loop waitTrigger priority=%d %s\n", p->priority, self->toString().c_str());
|
||||
wait(self->wakeRunner.onTrigger());
|
||||
pml_debug_printf(
|
||||
"%f runner loop wake line %d priority=%d %s\n", now(), __LINE__, priority, self->toString().c_str());
|
||||
|
||||
if (++sinceYield == 100) {
|
||||
sinceYield = 0;
|
||||
pml_debug_printf(
|
||||
" runner waitDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
|
||||
wait(delay(0));
|
||||
pml_debug_printf(
|
||||
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
|
||||
}
|
||||
pml_debug_printf("%f runner loop wake priority=%d %s\n", now(), p->priority, self->toString().c_str());
|
||||
|
||||
// While there are available slots and there are waiters, launch tasks
|
||||
while (self->available > 0 && self->waiting > 0) {
|
||||
pml_debug_printf(
|
||||
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
|
||||
|
||||
Priority* pPriority;
|
||||
pml_debug_printf(" launch loop start priority=%d %s\n", p->priority, self->toString().c_str());
|
||||
|
||||
// Find the next priority with waiters and capacity. There must be at least one.
|
||||
loop {
|
||||
// Rotate to next priority
|
||||
if (++priority == self->priorities.size()) {
|
||||
priority = 0;
|
||||
if (p == self->waitingPriorities.end()) {
|
||||
p = self->waitingPriorities.begin();
|
||||
}
|
||||
|
||||
pPriority = &self->priorities[priority];
|
||||
pml_debug_printf(" launch loop scan priority=%d %s\n", p->priority, self->toString().c_str());
|
||||
|
||||
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
|
||||
__LINE__,
|
||||
priority,
|
||||
self->toString().c_str());
|
||||
|
||||
if (!pPriority->queue.empty() && pPriority->runners < self->currentCapacity(pPriority->weight)) {
|
||||
if (!p->queue.empty() && p->runners < self->currentCapacity(p->weight)) {
|
||||
break;
|
||||
}
|
||||
++p;
|
||||
}
|
||||
|
||||
Queue& queue = pPriority->queue;
|
||||
|
||||
Queue& queue = p->queue;
|
||||
Waiter w = queue.front();
|
||||
queue.pop_front();
|
||||
|
||||
// If this priority is now empty, subtract its weight from the total pending weights
|
||||
// If this priority is now empty, subtract its weight from the total pending weights an remove it
|
||||
// from the waitingPriorities list
|
||||
Priority* pPriority = &*p;
|
||||
if (queue.empty()) {
|
||||
p = self->waitingPriorities.erase(p);
|
||||
self->totalPendingWeights -= pPriority->weight;
|
||||
|
||||
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
|
||||
__LINE__,
|
||||
priority,
|
||||
self->toString().c_str());
|
||||
pml_debug_printf(
|
||||
" emptied priority priority=%d %s\n", pPriority->priority, self->toString().c_str());
|
||||
}
|
||||
|
||||
--self->waiting;
|
||||
|
@ -365,10 +329,9 @@ private:
|
|||
self->addRunner(lock, pPriority);
|
||||
}
|
||||
|
||||
pml_debug_printf(" launched line %d alreadyDone=%d priority=%d %s\n",
|
||||
__LINE__,
|
||||
pml_debug_printf(" launched alreadyDone=%d priority=%d %s\n",
|
||||
!lock.promise.canBeSet(),
|
||||
priority,
|
||||
pPriority->priority,
|
||||
self->toString().c_str());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* BenchBlobDeltaFiles.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "benchmark/benchmark.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
|
||||
#include "fdbclient/BlobGranuleFiles.h"
|
||||
#include "flow/flow.h"
|
||||
#include <cstdlib>
|
||||
#include <stdexcept>
|
||||
|
||||
// Pre-generated GranuleDelta size in bytes for benchmark.
|
||||
const static int PRE_GEN_TARGET_BYTES[] = { 128 * 1024, 512 * 1024, 1024 * 1024 };
|
||||
|
||||
// Generate GranuleDelta using a deterministic way. Change the seed if you would test a new data set
|
||||
class DeltaGenerator {
|
||||
public:
|
||||
DeltaGenerator(uint32_t seed = 12345678) {
|
||||
randGen = Reference<IRandom>(new DeterministicRandom(seed));
|
||||
// Generate key range
|
||||
prefix = StringRef(ar, randGen->randomUniqueID().toString() + "_");
|
||||
range = KeyRangeRef(prefix, StringRef(ar, strinc(prefix)));
|
||||
// Generate version jump size
|
||||
minVersionJump = randGen->randomExp(0, 25);
|
||||
maxVersionJump = minVersionJump + randGen->randomExp(0, 25);
|
||||
// Generate value size range
|
||||
maxValueSize = randGen->randomExp(7, 9);
|
||||
// Generate start version
|
||||
version = randGen->randomUInt32();
|
||||
// Generate probabilty of update existing keys
|
||||
updateExistingKeysProb = randGen->random01();
|
||||
// Generate deltas
|
||||
for (auto i : PRE_GEN_TARGET_BYTES) {
|
||||
genDeltas(i);
|
||||
}
|
||||
|
||||
fmt::print("key range: {} - {}\n", range.begin.printable(), range.end.printable());
|
||||
fmt::print("start version: {}\n", version);
|
||||
fmt::print("max value bytes: {}\n", maxValueSize);
|
||||
fmt::print("version jump range: {} - {}\n", minVersionJump, maxVersionJump);
|
||||
fmt::print("probability for update: {}\n", updateExistingKeysProb);
|
||||
fmt::print("unseed: {}\n", randGen->randomUInt32());
|
||||
}
|
||||
|
||||
KeyRange getRange() { return range; }
|
||||
|
||||
Standalone<GranuleDeltas> getDelta(int targetBytes) {
|
||||
if (deltas.find(targetBytes) != deltas.end()) {
|
||||
return deltas[targetBytes];
|
||||
}
|
||||
throw std::invalid_argument("Test delta file size is not pre-generated!");
|
||||
}
|
||||
|
||||
private:
|
||||
void genDeltas(int targetBytes) {
|
||||
Standalone<GranuleDeltas> data;
|
||||
int totalDataBytes = 0;
|
||||
while (totalDataBytes < targetBytes) {
|
||||
data.push_back(ar, newDelta());
|
||||
totalDataBytes += data.back().expectedSize();
|
||||
}
|
||||
deltas[targetBytes] = data;
|
||||
}
|
||||
|
||||
MutationRef newMutation() { return MutationRef(ar, MutationRef::SetValue, key(), value()); }
|
||||
|
||||
MutationsAndVersionRef newDelta() {
|
||||
version += randGen->randomInt(minVersionJump, maxVersionJump);
|
||||
MutationsAndVersionRef ret(version, version);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ret.mutations.push_back_deep(ar, newMutation());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
StringRef key() {
|
||||
// Pick an existing key
|
||||
if (randGen->random01() < updateExistingKeysProb && !usedKeys.empty()) {
|
||||
int r = randGen->randomUInt32() % usedKeys.size();
|
||||
auto it = usedKeys.begin();
|
||||
for (; r != 0; r--)
|
||||
it++;
|
||||
return StringRef(ar, *it);
|
||||
}
|
||||
|
||||
// Create a new key
|
||||
std::string key = prefix.toString() + randGen->randomUniqueID().toString();
|
||||
usedKeys.insert(key);
|
||||
return StringRef(ar, key);
|
||||
}
|
||||
|
||||
StringRef value() {
|
||||
int valueSize = randGen->randomInt(maxValueSize / 2, maxValueSize * 3 / 2);
|
||||
std::string value = randGen->randomUniqueID().toString();
|
||||
if (value.size() > valueSize) {
|
||||
value = value.substr(0, valueSize);
|
||||
}
|
||||
if (value.size() < valueSize) {
|
||||
// repeated string so it's compressible
|
||||
value += std::string(valueSize - value.size(), 'x');
|
||||
}
|
||||
return StringRef(ar, value);
|
||||
}
|
||||
|
||||
Reference<IRandom> randGen;
|
||||
Arena ar;
|
||||
KeyRangeRef range;
|
||||
Key prefix;
|
||||
int maxValueSize;
|
||||
Version version;
|
||||
int minVersionJump;
|
||||
int maxVersionJump;
|
||||
std::set<std::string> usedKeys;
|
||||
double updateExistingKeysProb;
|
||||
std::map<int, Standalone<GranuleDeltas>> deltas;
|
||||
};
|
||||
|
||||
static DeltaGenerator deltaGen; // Pre-generate deltas
|
||||
|
||||
// Benchmark serialization without compression/encryption. The main CPU cost should be sortDeltasByKey
|
||||
static void bench_serialize_deltas(benchmark::State& state) {
|
||||
int targetBytes = state.range(0);
|
||||
int chunkSize = state.range(1);
|
||||
|
||||
Standalone<GranuleDeltas> delta = deltaGen.getDelta(targetBytes);
|
||||
KeyRange range = deltaGen.getRange();
|
||||
|
||||
Standalone<StringRef> fileName = "testdelta"_sr; // unused
|
||||
Optional<CompressionFilter> compressFilter; // unused. no compression
|
||||
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx; // unused. no encryption
|
||||
|
||||
uint32_t serializedBytes = 0;
|
||||
for (auto _ : state) {
|
||||
Value serialized = serializeChunkedDeltaFile(fileName, delta, range, chunkSize, compressFilter, cipherKeysCtx);
|
||||
serializedBytes += serialized.size();
|
||||
}
|
||||
state.SetBytesProcessed(static_cast<long>(state.iterations()) * targetBytes);
|
||||
state.counters["serialized_bytes"] = serializedBytes;
|
||||
}
|
||||
|
||||
// Benchmark sorting deltas
|
||||
static void bench_sort_deltas(benchmark::State& state) {
|
||||
int targetBytes = state.range(0);
|
||||
Standalone<GranuleDeltas> delta = deltaGen.getDelta(targetBytes);
|
||||
KeyRange range = deltaGen.getRange();
|
||||
|
||||
for (auto _ : state) {
|
||||
sortDeltasByKey(delta, range);
|
||||
}
|
||||
state.SetBytesProcessed(static_cast<long>(state.iterations()) * targetBytes);
|
||||
}
|
||||
|
||||
// Benchmark serialization for granule deltas 128KB, 512KB and 1024KB. Chunk size 32KB
|
||||
BENCHMARK(bench_serialize_deltas)
|
||||
->Args({ 128 * 1024, 32 * 1024 })
|
||||
->Args({ 512 * 1024, 32 * 1024 })
|
||||
->Args({ 1024 * 1024, 32 * 1024 });
|
||||
|
||||
// Benchmark sorting for granule deltas 128KB, 512KB and 1024KB. Chunk size 32KB
|
||||
BENCHMARK(bench_sort_deltas)->Args({ 128 * 1024 })->Args({ 512 * 1024 })->Args({ 1024 * 1024 });
|
|
@ -25,26 +25,28 @@
|
|||
#include "flow/PriorityMultiLock.actor.h"
|
||||
#include <deque>
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
#include "fmt/printf.h"
|
||||
|
||||
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
|
||||
state std::vector<int> priorities;
|
||||
// Arg1 is the number of active priorities to use
|
||||
// Arg2 is the number of inactive priorities to use
|
||||
state int active = benchState->range(0);
|
||||
state int inactive = benchState->range(1);
|
||||
|
||||
// Set up priority list with limits 10, 20, 30, ...
|
||||
while (priorities.size() < benchState->range(0)) {
|
||||
state std::vector<int> priorities;
|
||||
while (priorities.size() < active + inactive) {
|
||||
priorities.push_back(10 * (priorities.size() + 1));
|
||||
}
|
||||
|
||||
state int concurrency = priorities.size() * 10;
|
||||
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
|
||||
state std::vector<int> counts;
|
||||
counts.resize(priorities.size(), 0);
|
||||
state Reference<PriorityMultiLock> pml = makeReference<PriorityMultiLock>(concurrency, priorities);
|
||||
|
||||
// Clog the lock buy taking concurrency locks
|
||||
// Clog the lock buy taking n=concurrency locks
|
||||
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
|
||||
for (int j = 0; j < concurrency; ++j) {
|
||||
lockFutures.push_back(pml->lock(j % priorities.size()));
|
||||
lockFutures.push_back(pml->lock(j % active));
|
||||
}
|
||||
|
||||
// Wait for all of the initial locks to be taken
|
||||
// This will work regardless of their priorities as there are only n = concurrency of them
|
||||
wait(waitForAll(std::vector<Future<PriorityMultiLock::Lock>>(lockFutures.begin(), lockFutures.end())));
|
||||
|
@ -64,7 +66,7 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
|
|||
PriorityMultiLock::Lock lock = wait(f);
|
||||
|
||||
// Rotate to another priority
|
||||
if (++p == priorities.size()) {
|
||||
if (++p == active) {
|
||||
p = 0;
|
||||
}
|
||||
|
||||
|
@ -76,7 +78,6 @@ ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
|
|||
|
||||
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
|
||||
|
||||
delete pml;
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -84,4 +85,4 @@ static void bench_priorityMultiLock(benchmark::State& benchState) {
|
|||
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
|
||||
}
|
||||
|
||||
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);
|
||||
BENCHMARK(bench_priorityMultiLock)->Args({ 5, 0 })->Ranges({ { 1, 64 }, { 0, 128 } })->ReportAggregatesOnly(true);
|
||||
|
|
|
@ -240,7 +240,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/RedwoodDeltaTree.toml)
|
||||
add_fdb_test(TEST_FILES rare/Throttling.toml)
|
||||
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES rare/ThroughputQuota.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionCost.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
|
||||
|
|
|
@ -334,9 +334,6 @@ logdir = {logdir}
|
|||
db_config += " blob_granules_enabled:=1"
|
||||
self.fdbcli_exec(db_config)
|
||||
|
||||
if self.blob_granules_enabled:
|
||||
self.fdbcli_exec("blobrange start \\x00 \\xff")
|
||||
|
||||
# Generate and install test certificate chains and keys
|
||||
def create_tls_cert(self):
|
||||
assert self.tls_config is not None, "TLS not enabled"
|
||||
|
|
|
@ -6,6 +6,7 @@ enable_encryption = true
|
|||
enable_tlog_encryption = true
|
||||
enable_storage_server_encryption = false
|
||||
enable_blob_granule_encryption = true
|
||||
max_write_transaction_life_versions = 5000000
|
||||
|
||||
[[test]]
|
||||
testTitle = 'EncryptedBackupAndRestore'
|
||||
|
|
|
@ -4,15 +4,10 @@ testTitle='ThroughputQuotaTest'
|
|||
[[test.workload]]
|
||||
testName='ThroughputQuota'
|
||||
transactionTag='a'
|
||||
totalQuota=1.0
|
||||
|
||||
[[test.workload]]
|
||||
testName='Status'
|
||||
enableLatencyBands = true
|
||||
testDuration = 60.0
|
||||
totalQuota=16384
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
transactionsPerSecond = 2500.0
|
||||
testDuration = 60.0
|
||||
transactionsPerSecond = 250.0
|
||||
testDuration = 30.0
|
||||
expectedRate = 0
|
||||
|
|
Loading…
Reference in New Issue