Added plumbing for tenant-aware purge granules

This commit is contained in:
Josh Slocum 2022-06-16 08:42:56 -05:00
parent a3289f9cab
commit b3597ef3a8
12 changed files with 195 additions and 27 deletions

View File

@ -491,6 +491,28 @@ extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}
extern "C" DLLEXPORT FDBFuture* fdb_tenant_purge_blob_granules(FDBTenant* tenant,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force) {
return (FDBFuture*)(TENANT(tenant)
->purgeBlobGranules(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
purge_version,
force)
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_tenant_wait_purge_granules_complete(FDBTenant* tenant,
uint8_t const* purge_key_name,
int purge_key_name_length) {
return (FDBFuture*)(TENANT(tenant)
->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length))
.extractPtr());
}
extern "C" DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant) {
try {
TENANT(tenant)->delref();

View File

@ -315,6 +315,18 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complet
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_purge_blob_granules(FDBTenant* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_tenant_wait_purge_granules_complete(FDBTenant* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);

View File

@ -157,6 +157,25 @@ Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) {
}
}
KeyFuture Tenant::purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force) {
return KeyFuture(fdb_database_purge_blob_granules(db,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),
end_key.size(),
purge_version,
force));
}
EmptyFuture Tenant::wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key) {
return EmptyFuture(
fdb_database_wait_purge_granules_complete(db, (const uint8_t*)purge_key.data(), purge_key.size()));
}
Tenant::~Tenant() {
if (tenant != nullptr) {
fdb_tenant_destroy(tenant);

View File

@ -98,6 +98,7 @@ public:
private:
friend class Transaction;
friend class Database;
friend class Tenant;
KeyFuture(FDBFuture* f) : Future(f) {}
};
@ -164,6 +165,7 @@ class EmptyFuture : public Future {
private:
friend class Transaction;
friend class Database;
friend class Tenant;
EmptyFuture(FDBFuture* f) : Future(f) {}
};
@ -221,6 +223,14 @@ public:
Tenant(Tenant&&) = delete;
Tenant& operator=(Tenant&&) = delete;
static KeyFuture purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force);
static EmptyFuture wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key);
private:
friend class Transaction;
FDBTenant* tenant;

View File

@ -378,7 +378,10 @@ public:
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange, Version purgeVersion, bool force = false);
Future<Key> purgeBlobGranules(KeyRange keyRange,
Version purgeVersion,
Optional<TenantName> tenant,
bool force = false);
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
// private:

View File

@ -130,6 +130,9 @@ public:
virtual Reference<ITransaction> createTransaction() = 0;
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};

View File

@ -398,6 +398,38 @@ Reference<ITransaction> DLTenant::createTransaction() {
return Reference<ITransaction>(new DLTransaction(api, tr));
}
ThreadFuture<Key> DLTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->tenantPurgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->tenantPurgeBlobGranules(tenant,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
int keyLength;
FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
return Key(KeyRef(key, keyLength), Arena());
});
}
ThreadFuture<Void> DLTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->tenantWaitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->tenantWaitPurgeGranulesComplete(tenant, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// DLDatabase
DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
addref();
@ -520,16 +552,16 @@ ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVer
}
ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->purgeBlobGranules) {
if (!api->databasePurgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->purgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
FdbCApi::FDBFuture* f = api->databasePurgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
@ -543,11 +575,11 @@ ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Ver
}
ThreadFuture<Void> DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->waitPurgeGranulesComplete) {
if (!api->databaseWaitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->waitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
FdbCApi::FDBFuture* f = api->databaseWaitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
@ -624,11 +656,9 @@ void DLApi::init() {
headerVersion >= 700);
loadClientFunction(
&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
loadClientFunction(
&api->purgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->waitPurgeGranulesComplete,
&api->databasePurgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->databaseWaitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_database_wait_purge_granules_complete",
@ -636,6 +666,13 @@ void DLApi::init() {
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
loadClientFunction(
&api->tenantPurgeBlobGranules, lib, fdbCPath, "fdb_tenant_purge_blob_granules", headerVersion >= 720);
loadClientFunction(&api->tenantWaitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_tenant_wait_purge_granules_complete",
headerVersion >= 720);
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option", headerVersion >= 0);
@ -1316,6 +1353,16 @@ Reference<ITransaction> MultiVersionTenant::createTransaction() {
tenantState->db->dbState->transactionDefaultOptions));
}
ThreadFuture<Key> MultiVersionTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
auto f = tenantState->db ? tenantState->db->purgeBlobGranules(keyRange, purgeVersion, force)
: ThreadFuture<Key>(Never());
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
}
ThreadFuture<Void> MultiVersionTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
auto f = tenantState->db ? tenantState->db->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
return abortableFuture(f, tenantState->db->dbState->dbVar->get().onChange);
}
MultiVersionTenant::TenantState::TenantState(Reference<MultiVersionDatabase> db, StringRef tenantName)
: tenantVar(new ThreadSafeAsyncVar<Reference<ITenant>>(Reference<ITenant>(nullptr))), tenantName(tenantName), db(db),
closed(false) {

View File

@ -157,18 +157,33 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
FDBFuture* (*purgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*databasePurgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*waitPurgeGranulesComplete)(FDBDatabase* db, uint8_t const* purge_key_name, int purge_key_name_length);
FDBFuture* (*databaseWaitPurgeGranulesComplete)(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
FDBFuture* (*tenantPurgeBlobGranules)(FDBTenant* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*tenantWaitPurgeGranulesComplete)(FDBTenant* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
void (*tenantDestroy)(FDBTenant* tenant);
// Transaction
@ -413,6 +428,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<DLTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTenant>::delref(); }
@ -672,6 +690,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::delref(); }

View File

@ -9476,20 +9476,30 @@ Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
KeyRange range,
Version purgeVersion,
Optional<TenantName> tenant,
bool force) {
state Database cx(db);
state Transaction tr(cx);
state Key purgeKey;
state KeyRange purgeRange = range;
state bool loadedTenantPrefix = false;
// FIXME: implement force
if (!force) {
throw unsupported_operation();
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if (tenant.present() && !loadedTenantPrefix) {
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin));
loadedTenantPrefix = true;
purgeRange = purgeRange.withPrefix(tenantEntry.prefix);
}
Value purgeValue = blobGranulePurgeValueFor(purgeVersion, range, force);
tr.atomicOp(
addVersionStampAtEnd(blobGranulePurgeKeys.begin), purgeValue, MutationRef::SetVersionstampedKey);
@ -9520,8 +9530,11 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
return purgeKey;
}
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range, Version purgeVersion, bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, force);
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range,
Version purgeVersion,
Optional<TenantName> tenant,
bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, tenant, force);
}
ACTOR Future<Void> waitPurgeGranulesCompleteActor(Reference<DatabaseContext> db, Key purgeKey) {

View File

@ -131,7 +131,7 @@ ThreadFuture<Key> ThreadSafeDatabase::purgeBlobGranules(const KeyRangeRef& keyRa
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, force);
return db->purgeBlobGranules(range, purgeVersion, {}, force);
});
}
@ -174,6 +174,21 @@ Reference<ITransaction> ThreadSafeTenant::createTransaction() {
return Reference<ITransaction>(new ThreadSafeTransaction(db->db, type, name));
}
ThreadFuture<Key> ThreadSafeTenant::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
DatabaseContext* db = this->db->db;
Standalone<StringRef> tenantName = this->name;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, tenantName, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, tenantName, force);
});
}
ThreadFuture<Void> ThreadSafeTenant::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
DatabaseContext* db = this->db->db;
Key key = purgeKey;
return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); });
}
ThreadSafeTenant::~ThreadSafeTenant() {}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,

View File

@ -84,6 +84,9 @@ public:
Reference<ITransaction> createTransaction() override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::delref(); }

View File

@ -254,7 +254,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (prevPurgeVersion < maxPurgeVersion) {
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, false));
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, {}, false));
wait(cx->waitPurgeGranulesComplete(purgeKey));
self->purges++;
} else {