blob: move blobrange command implementation to native api
This commit is contained in:
parent
8c33aa7b1d
commit
736ef4f2c9
|
@ -23,6 +23,7 @@
|
||||||
#include "fdbclient/FDBOptions.g.h"
|
#include "fdbclient/FDBOptions.g.h"
|
||||||
#include "fdbclient/IClientApi.h"
|
#include "fdbclient/IClientApi.h"
|
||||||
#include "fdbclient/ManagementAPI.actor.h"
|
#include "fdbclient/ManagementAPI.actor.h"
|
||||||
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
|
||||||
#include "flow/Arena.h"
|
#include "flow/Arena.h"
|
||||||
#include "flow/FastRef.h"
|
#include "flow/FastRef.h"
|
||||||
|
@ -31,33 +32,6 @@
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
// copy to standalones for krm
|
|
||||||
ACTOR Future<Void> setBlobRange(Database db, Key startKey, Key endKey, Value value) {
|
|
||||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
||||||
|
|
||||||
// FIXME: check that the set range is currently inactive, and that a revoked range is currently its own
|
|
||||||
// range in the map and fully set.
|
|
||||||
|
|
||||||
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
|
|
||||||
// This is not coalescing because we want to keep each range logically separate.
|
|
||||||
wait(krmSetRange(tr, blobRangeKeys.begin, KeyRange(KeyRangeRef(startKey, endKey)), value));
|
|
||||||
wait(tr->commit());
|
|
||||||
printf("Successfully updated blob range [%s - %s) to %s\n",
|
|
||||||
startKey.printable().c_str(),
|
|
||||||
endKey.printable().c_str(),
|
|
||||||
value.printable().c_str());
|
|
||||||
return Void();
|
|
||||||
} catch (Error& e) {
|
|
||||||
wait(tr->onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Version> getLatestReadVersion(Database db) {
|
ACTOR Future<Version> getLatestReadVersion(Database db) {
|
||||||
state Transaction tr(db);
|
state Transaction tr(db);
|
||||||
loop {
|
loop {
|
||||||
|
@ -210,7 +184,11 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
|
||||||
starting ? "Starting" : "Stopping",
|
starting ? "Starting" : "Stopping",
|
||||||
tokens[2].printable().c_str(),
|
tokens[2].printable().c_str(),
|
||||||
tokens[3].printable().c_str());
|
tokens[3].printable().c_str());
|
||||||
wait(setBlobRange(localDb, begin, end, starting ? LiteralStringRef("1") : StringRef()));
|
if (starting) {
|
||||||
|
wait(localDb->blobbifyRange(KeyRangeRef(begin, end)));
|
||||||
|
} else {
|
||||||
|
wait(localDb->unblobbifyRange(KeyRangeRef(begin, end)));
|
||||||
|
}
|
||||||
} else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) {
|
} else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) {
|
||||||
bool purge = tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge");
|
bool purge = tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge");
|
||||||
bool forcePurge = tokencmp(tokens[1], "forcepurge");
|
bool forcePurge = tokencmp(tokens[1], "forcepurge");
|
||||||
|
|
|
@ -9716,6 +9716,7 @@ Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
|
||||||
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
|
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlobGranule API.
|
||||||
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
||||||
KeyRange range,
|
KeyRange range,
|
||||||
Version purgeVersion,
|
Version purgeVersion,
|
||||||
|
@ -9807,6 +9808,43 @@ Future<Void> DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) {
|
||||||
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
|
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Void> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, bool active) {
|
||||||
|
state Database db(cx);
|
||||||
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
||||||
|
|
||||||
|
state Value value = active ? LiteralStringRef("1") : LiteralStringRef("0");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
|
||||||
|
// FIXME: check that the set range is currently inactive, and that a revoked range is currently its own
|
||||||
|
// range in the map and fully set.
|
||||||
|
|
||||||
|
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
|
||||||
|
// This is not coalescing because we want to keep each range logically separate.
|
||||||
|
wait(krmSetRange(tr, blobRangeKeys.begin, range, value));
|
||||||
|
wait(tr->commit());
|
||||||
|
printf("Successfully updated blob range [%s - %s) to %s\n",
|
||||||
|
range.begin.printable().c_str(),
|
||||||
|
range.end.printable().c_str(),
|
||||||
|
value.printable().c_str());
|
||||||
|
return Void();
|
||||||
|
} catch (Error& e) {
|
||||||
|
wait(tr->onError(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> DatabaseContext::blobbifyRange(KeyRange range) {
|
||||||
|
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> DatabaseContext::unblobbifyRange(KeyRange range) {
|
||||||
|
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, false);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t getMaxKeySize(KeyRef const& key) {
|
int64_t getMaxKeySize(KeyRef const& key) {
|
||||||
return getMaxWriteKeySize(key, true);
|
return getMaxWriteKeySize(key, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -378,12 +378,16 @@ public:
|
||||||
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
|
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
|
||||||
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
|
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
|
||||||
|
|
||||||
|
// BlobGranule API.
|
||||||
Future<Key> purgeBlobGranules(KeyRange keyRange,
|
Future<Key> purgeBlobGranules(KeyRange keyRange,
|
||||||
Version purgeVersion,
|
Version purgeVersion,
|
||||||
Optional<TenantName> tenant,
|
Optional<TenantName> tenant,
|
||||||
bool force = false);
|
bool force = false);
|
||||||
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
|
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
|
||||||
|
|
||||||
|
Future<Void> blobbifyRange(KeyRange range);
|
||||||
|
Future<Void> unblobbifyRange(KeyRange range);
|
||||||
|
|
||||||
// private:
|
// private:
|
||||||
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
|
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
|
||||||
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
|
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
|
||||||
|
|
|
@ -230,27 +230,6 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> setUpBlobRange(Database cx, KeyRange keyRange) {
|
|
||||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
||||||
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
|
|
||||||
wait(krmSetRange(tr, blobRangeKeys.begin, keyRange, LiteralStringRef("1")));
|
|
||||||
wait(tr->commit());
|
|
||||||
if (BGW_DEBUG) {
|
|
||||||
fmt::print("Successfully set up blob granule range for tenant range [{0} - {1})\n",
|
|
||||||
keyRange.begin.printable(),
|
|
||||||
keyRange.end.printable());
|
|
||||||
}
|
|
||||||
return Void();
|
|
||||||
} catch (Error& e) {
|
|
||||||
wait(tr->onError(e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<TenantMapEntry> setUpTenant(Database cx, TenantName name) {
|
ACTOR Future<TenantMapEntry> setUpTenant(Database cx, TenantName name) {
|
||||||
if (BGW_DEBUG) {
|
if (BGW_DEBUG) {
|
||||||
fmt::print("Setting up blob granule range for tenant {0}\n", name.printable());
|
fmt::print("Setting up blob granule range for tenant {0}\n", name.printable());
|
||||||
|
@ -291,7 +270,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
||||||
self->directories[directoryIdx]->directoryRange =
|
self->directories[directoryIdx]->directoryRange =
|
||||||
KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end));
|
KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end));
|
||||||
tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry });
|
tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry });
|
||||||
wait(self->setUpBlobRange(cx, self->directories[directoryIdx]->directoryRange));
|
wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange));
|
||||||
}
|
}
|
||||||
tenantData.addTenants(tenants);
|
tenantData.addTenants(tenants);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue