blob: verifyBlobRange() c api
This commit is contained in:
parent
045076339d
commit
3d400cff64
|
@ -533,6 +533,19 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase*
|
|||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version) {
|
||||
return (FDBFuture*)(DB(db)
|
||||
->verifyBlobRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
|
||||
StringRef(end_key_name, end_key_name_length)),
|
||||
version)
|
||||
.extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
|
||||
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
|
||||
}
|
||||
|
|
|
@ -342,6 +342,13 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_list_blobbified_ranges(FDBD
|
|||
int end_key_name_length,
|
||||
int rangeLimit);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
int64_t version);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
|
||||
FDBTransaction** out_transaction);
|
||||
|
||||
|
|
|
@ -73,70 +73,10 @@ ACTOR Future<Void> doBlobPurge(Database db, Key startKey, Key endKey, Optional<V
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional<Version> version) {
|
||||
state Transaction tr(db);
|
||||
state Version readVersionOut = invalidVersion;
|
||||
loop {
|
||||
try {
|
||||
wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut)));
|
||||
return readVersionOut;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> verifyBlobRange(Database db, KeyRange range, Optional<Version> version) {
|
||||
state Transaction tr(db);
|
||||
state Standalone<VectorRef<KeyRangeRef>> allRanges;
|
||||
state KeyRange curRegion = KeyRangeRef(range.begin, range.begin);
|
||||
state Version readVersionOut = invalidVersion;
|
||||
state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
|
||||
loop {
|
||||
try {
|
||||
wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize)));
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
if (allRanges.empty()) {
|
||||
if (curRegion.begin < range.end) {
|
||||
return invalidVersion;
|
||||
}
|
||||
return readVersionOut;
|
||||
}
|
||||
|
||||
state std::vector<Future<Version>> checkParts;
|
||||
// Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit
|
||||
int batchCount = 0;
|
||||
for (auto& it : allRanges) {
|
||||
if (it.begin != curRegion.end) {
|
||||
return invalidVersion;
|
||||
}
|
||||
|
||||
curRegion = KeyRangeRef(curRegion.begin, it.end);
|
||||
batchCount++;
|
||||
|
||||
if (batchCount == batchSize) {
|
||||
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
|
||||
batchCount = 0;
|
||||
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
|
||||
}
|
||||
}
|
||||
if (!curRegion.empty()) {
|
||||
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
|
||||
}
|
||||
|
||||
wait(waitForAll(checkParts));
|
||||
readVersionOut = checkParts.back().get();
|
||||
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doBlobCheck(Database db, Key startKey, Key endKey, Optional<Version> version) {
|
||||
state double elapsed = -timer_monotonic();
|
||||
|
||||
state Version readVersionOut = wait(verifyBlobRange(db, KeyRangeRef(startKey, endKey), version));
|
||||
state Version readVersionOut = wait(db->verifyBlobRange(KeyRangeRef(startKey, endKey), version));
|
||||
|
||||
elapsed += timer_monotonic();
|
||||
|
||||
|
|
|
@ -634,6 +634,21 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLDatabase::listBlobbifiedRange
|
|||
});
|
||||
}
|
||||
|
||||
ThreadFuture<Version> DLDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
if (!api->databaseVerifyBlobRange) {
|
||||
return unsupported_operation();
|
||||
}
|
||||
|
||||
FdbCApi::FDBFuture* f = api->databaseVerifyBlobRange(
|
||||
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), version);
|
||||
|
||||
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
|
||||
Version version = invalidVersion;
|
||||
ASSERT(!api->futureGetInt64(f, &version));
|
||||
return version;
|
||||
});
|
||||
}
|
||||
|
||||
// DLApi
|
||||
|
||||
// Loads the specified function from a dynamic library
|
||||
|
@ -726,6 +741,8 @@ void DLApi::init() {
|
|||
&api->databaseUnblobbifyRange, lib, fdbCPath, "fdb_database_unblobbify_range", headerVersion >= 720);
|
||||
loadClientFunction(
|
||||
&api->databaseListBlobbifiedRanges, lib, fdbCPath, "fdb_database_list_blobbified_ranges", headerVersion >= 720);
|
||||
loadClientFunction(
|
||||
&api->databaseVerifyBlobRange, lib, fdbCPath, "fdb_database_verify_blob_range", headerVersion >= 720);
|
||||
|
||||
loadClientFunction(
|
||||
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
|
||||
|
@ -1667,6 +1684,12 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> MultiVersionDatabase::listBlobb
|
|||
return abortableFuture(f, dbVar.onChange);
|
||||
}
|
||||
|
||||
ThreadFuture<Version> MultiVersionDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
auto dbVar = dbState->dbVar->get();
|
||||
auto f = dbVar.value ? dbVar.value->verifyBlobRange(keyRange, version) : ThreadFuture<Version>(Never());
|
||||
return abortableFuture(f, dbVar.onChange);
|
||||
}
|
||||
|
||||
// Returns the protocol version reported by the coordinator this client is connected to
|
||||
// If an expected version is given, the future won't return until the protocol version is different than expected
|
||||
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
||||
|
|
|
@ -8012,6 +8012,71 @@ ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAw
|
|||
return version;
|
||||
}
|
||||
|
||||
ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional<Version> version) {
|
||||
state Transaction tr(db);
|
||||
state Version readVersionOut = invalidVersion;
|
||||
loop {
|
||||
try {
|
||||
wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut)));
|
||||
return readVersionOut;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, Optional<Version> version) {
|
||||
state Database db(cx);
|
||||
state Transaction tr(db);
|
||||
state Standalone<VectorRef<KeyRangeRef>> allRanges;
|
||||
state KeyRange curRegion = KeyRangeRef(range.begin, range.begin);
|
||||
state Version readVersionOut = invalidVersion;
|
||||
state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
|
||||
loop {
|
||||
try {
|
||||
wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize)));
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
if (allRanges.empty()) {
|
||||
if (curRegion.begin < range.end) {
|
||||
return invalidVersion;
|
||||
}
|
||||
return readVersionOut;
|
||||
}
|
||||
|
||||
state std::vector<Future<Version>> checkParts;
|
||||
// Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit
|
||||
int batchCount = 0;
|
||||
for (auto& it : allRanges) {
|
||||
if (it.begin != curRegion.end) {
|
||||
return invalidVersion;
|
||||
}
|
||||
|
||||
curRegion = KeyRangeRef(curRegion.begin, it.end);
|
||||
batchCount++;
|
||||
|
||||
if (batchCount == batchSize) {
|
||||
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
|
||||
batchCount = 0;
|
||||
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
|
||||
}
|
||||
}
|
||||
if (!curRegion.empty()) {
|
||||
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
|
||||
}
|
||||
|
||||
wait(waitForAll(checkParts));
|
||||
readVersionOut = checkParts.back().get();
|
||||
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
|
||||
}
|
||||
}
|
||||
|
||||
Future<Version> DatabaseContext::verifyBlobRange(const KeyRange& range, Optional<Version> version) {
|
||||
return verifyBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, version);
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
|
||||
bool primary,
|
||||
bool use_system_priority) {
|
||||
|
|
|
@ -164,6 +164,12 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeDatabase::listBlobbif
|
|||
[=]() -> Future<Standalone<VectorRef<KeyRangeRef>>> { return db->listBlobbifiedRanges(range, rangeLimit); });
|
||||
}
|
||||
|
||||
ThreadFuture<Version> ThreadSafeDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
|
||||
DatabaseContext* db = this->db;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<Version> { return db->verifyBlobRange(range, version); });
|
||||
}
|
||||
|
||||
ThreadSafeDatabase::ThreadSafeDatabase(ConnectionRecordType connectionRecordType,
|
||||
std::string connectionRecordString,
|
||||
int apiVersion) {
|
||||
|
|
|
@ -388,6 +388,7 @@ public:
|
|||
Future<bool> blobbifyRange(KeyRange range);
|
||||
Future<bool> unblobbifyRange(KeyRange range);
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(KeyRange range, int rangeLimit);
|
||||
Future<Version> verifyBlobRange(const KeyRange& range, Optional<Version> version);
|
||||
|
||||
// private:
|
||||
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
|
||||
|
|
|
@ -178,6 +178,8 @@ public:
|
|||
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) = 0;
|
||||
|
||||
virtual ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) = 0;
|
||||
|
||||
// Interface to manage shared state across multiple connections to the same Database
|
||||
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
|
||||
virtual void setSharedState(DatabaseSharedState* p) = 0;
|
||||
|
|
|
@ -190,6 +190,13 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
|||
int end_key_name_length,
|
||||
int rangeLimit);
|
||||
|
||||
FDBFuture* (*databaseVerifyBlobRange)(FDBDatabase* db,
|
||||
uint8_t const* begin_key_name,
|
||||
int begin_key_name_length,
|
||||
uint8_t const* end_key_name,
|
||||
int end_key_name_length,
|
||||
Optional<Version> version);
|
||||
|
||||
// Tenant
|
||||
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
|
||||
|
||||
|
@ -501,6 +508,7 @@ public:
|
|||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
@ -848,6 +856,7 @@ public:
|
|||
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
|
||||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
|
|
@ -67,6 +67,8 @@ public:
|
|||
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
|
||||
int rangeLimit) override;
|
||||
|
||||
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
|
||||
|
||||
ThreadFuture<DatabaseSharedState*> createSharedState() override;
|
||||
void setSharedState(DatabaseSharedState* p) override;
|
||||
|
||||
|
|
Loading…
Reference in New Issue