From 8c33aa7b1dcc65344a1d30cbbe437ddde64d6e0b Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Mon, 8 Aug 2022 15:45:52 +0100 Subject: [PATCH 01/15] blob: create named values for blobRangeActive/blobRangeInactive blobRangeActive = LiteralStringRef("1") blobRangeInactive = LiteralStringRef("") --- fdbclient/SystemData.cpp | 3 ++ fdbclient/include/fdbclient/SystemData.h | 2 ++ fdbserver/BlobManager.actor.cpp | 39 +++++++++++------------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index c2c59f14b2..f912d691bc 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1331,6 +1331,9 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value) { } // blob granule data +const KeyRef blobRangeActive = LiteralStringRef("1"); +const KeyRef blobRangeInactive = LiteralStringRef("0"); + const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0")); const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0")); const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0")); diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 4f028087bb..b41809691e 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -594,6 +594,8 @@ const Value blobManagerEpochValueFor(int64_t epoch); int64_t decodeBlobManagerEpochValue(ValueRef const& value); // blob granule keys +extern const StringRef blobRangeActive; +extern const StringRef blobRangeInactive; extern const uint8_t BG_FILE_TYPE_DELTA; extern const uint8_t BG_FILE_TYPE_SNAPSHOT; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 7a11eae30a..0b38703808 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -130,7 +130,7 @@ void updateClientBlobRanges(KeyRangeMap* knownBlobRanges, } break; } - bool active = dbBlobRanges[i].value == LiteralStringRef("1"); + bool active = dbBlobRanges[i].value == blobRangeActive; if (active) { if (BM_DEBUG) { fmt::print("BM sees client range [{0} - {1})\n", @@ -1280,7 +1280,7 @@ ACTOR Future monitorClientRanges(Reference bmData) { needToCoalesce = false; for (int i = 0; i < results.size() - 1; i++) { - bool active = results[i].value == LiteralStringRef("1"); + bool active = results[i].value == blobRangeActive; bmData->knownBlobRanges.insert(KeyRangeRef(results[i].key, results[i + 1].key), active); } } @@ -5013,9 +5013,6 @@ TEST_CASE("/blobmanager/updateranges") { VectorRef added; VectorRef removed; - StringRef active = LiteralStringRef("1"); - StringRef inactive = StringRef(); - RangeResult dbDataEmpty; std::vector> kbrRanges; @@ -5026,34 +5023,34 @@ TEST_CASE("/blobmanager/updateranges") { // db data setup RangeResult dbDataAB; - dbDataAB.emplace_back(ar, keyA, active); - dbDataAB.emplace_back(ar, keyB, inactive); + dbDataAB.emplace_back(ar, keyA, blobRangeActive); + dbDataAB.emplace_back(ar, keyB, blobRangeInactive); RangeResult dbDataAC; - dbDataAC.emplace_back(ar, keyA, active); - dbDataAC.emplace_back(ar, keyC, inactive); + dbDataAC.emplace_back(ar, keyA, blobRangeActive); + dbDataAC.emplace_back(ar, keyC, blobRangeInactive); RangeResult dbDataAD; - dbDataAD.emplace_back(ar, keyA, active); - dbDataAD.emplace_back(ar, keyD, inactive); + dbDataAD.emplace_back(ar, keyA, blobRangeActive); + dbDataAD.emplace_back(ar, keyD, blobRangeInactive); RangeResult dbDataBC; - dbDataBC.emplace_back(ar, keyB, active); - dbDataBC.emplace_back(ar, keyC, inactive); + dbDataBC.emplace_back(ar, keyB, blobRangeActive); + dbDataBC.emplace_back(ar, keyC, blobRangeInactive); RangeResult dbDataBD; - dbDataBD.emplace_back(ar, keyB, active); - dbDataBD.emplace_back(ar, keyD, inactive); + dbDataBD.emplace_back(ar, keyB, blobRangeActive); + dbDataBD.emplace_back(ar, keyD, blobRangeInactive); RangeResult dbDataCD; - dbDataCD.emplace_back(ar, keyC, active); - dbDataCD.emplace_back(ar, keyD, inactive); + dbDataCD.emplace_back(ar, keyC, blobRangeActive); + dbDataCD.emplace_back(ar, keyD, blobRangeInactive); RangeResult dbDataAB_CD; - dbDataAB_CD.emplace_back(ar, keyA, active); - dbDataAB_CD.emplace_back(ar, keyB, inactive); - dbDataAB_CD.emplace_back(ar, keyC, active); - dbDataAB_CD.emplace_back(ar, keyD, inactive); + dbDataAB_CD.emplace_back(ar, keyA, blobRangeActive); + dbDataAB_CD.emplace_back(ar, keyB, blobRangeInactive); + dbDataAB_CD.emplace_back(ar, keyC, blobRangeActive); + dbDataAB_CD.emplace_back(ar, keyD, blobRangeInactive); // key ranges setup KeyRangeRef rangeAB = KeyRangeRef(keyA, keyB); From 736ef4f2c9a70a957888158febb8019203ab7801 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Thu, 4 Aug 2022 11:21:06 +0100 Subject: [PATCH 02/15] blob: move blobrange command implementation to native api --- fdbcli/BlobRangeCommand.actor.cpp | 34 +++-------------- fdbclient/NativeAPI.actor.cpp | 38 +++++++++++++++++++ fdbclient/include/fdbclient/DatabaseContext.h | 4 ++ .../BlobGranuleCorrectnessWorkload.actor.cpp | 23 +---------- 4 files changed, 49 insertions(+), 50 deletions(-) diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index a2bbb21ce5..a05dff4b44 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -23,6 +23,7 @@ #include "fdbclient/FDBOptions.g.h" #include "fdbclient/IClientApi.h" #include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/NativeAPI.actor.h" #include "flow/Arena.h" #include "flow/FastRef.h" @@ -31,33 +32,6 @@ namespace { -// copy to standalones for krm -ACTOR Future setBlobRange(Database db, Key startKey, Key endKey, Value value) { - state Reference tr = makeReference(db); - - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - // FIXME: check that the set range is currently inactive, and that a revoked range is currently its own - // range in the map and fully set. - - tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString()); - // This is not coalescing because we want to keep each range logically separate. - wait(krmSetRange(tr, blobRangeKeys.begin, KeyRange(KeyRangeRef(startKey, endKey)), value)); - wait(tr->commit()); - printf("Successfully updated blob range [%s - %s) to %s\n", - startKey.printable().c_str(), - endKey.printable().c_str(), - value.printable().c_str()); - return Void(); - } catch (Error& e) { - wait(tr->onError(e)); - } - } -} - ACTOR Future getLatestReadVersion(Database db) { state Transaction tr(db); loop { @@ -210,7 +184,11 @@ ACTOR Future blobRangeCommandActor(Database localDb, starting ? "Starting" : "Stopping", tokens[2].printable().c_str(), tokens[3].printable().c_str()); - wait(setBlobRange(localDb, begin, end, starting ? LiteralStringRef("1") : StringRef())); + if (starting) { + wait(localDb->blobbifyRange(KeyRangeRef(begin, end))); + } else { + wait(localDb->unblobbifyRange(KeyRangeRef(begin, end))); + } } else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) { bool purge = tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge"); bool forcePurge = tokencmp(tokens[1], "forcepurge"); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index c38ca04150..04c4062ff8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -9716,6 +9716,7 @@ Reference DatabaseContext::createTransaction() { return makeReference(Database(Reference::addRef(this))); } +// BlobGranule API. ACTOR Future purgeBlobGranulesActor(Reference db, KeyRange range, Version purgeVersion, @@ -9807,6 +9808,43 @@ Future DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) { return waitPurgeGranulesCompleteActor(Reference::addRef(this), purgeKey); } +ACTOR Future setBlobRangeActor(Reference cx, KeyRange range, bool active) { + state Database db(cx); + state Reference tr = makeReference(db); + + state Value value = active ? LiteralStringRef("1") : LiteralStringRef("0"); + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + // FIXME: check that the set range is currently inactive, and that a revoked range is currently its own + // range in the map and fully set. + + tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString()); + // This is not coalescing because we want to keep each range logically separate. + wait(krmSetRange(tr, blobRangeKeys.begin, range, value)); + wait(tr->commit()); + printf("Successfully updated blob range [%s - %s) to %s\n", + range.begin.printable().c_str(), + range.end.printable().c_str(), + value.printable().c_str()); + return Void(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + +Future DatabaseContext::blobbifyRange(KeyRange range) { + return setBlobRangeActor(Reference::addRef(this), range, true); +} + +Future DatabaseContext::unblobbifyRange(KeyRange range) { + return setBlobRangeActor(Reference::addRef(this), range, false); +} + int64_t getMaxKeySize(KeyRef const& key) { return getMaxWriteKeySize(key, true); } diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 086ae8930c..5f2d0c02e8 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -378,12 +378,16 @@ public: Future getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion); Future popChangeFeedMutations(Key rangeID, Version version); + // BlobGranule API. Future purgeBlobGranules(KeyRange keyRange, Version purgeVersion, Optional tenant, bool force = false); Future waitPurgeGranulesComplete(Key purgeKey); + Future blobbifyRange(KeyRange range); + Future unblobbifyRange(KeyRange range); + // private: explicit DatabaseContext(Reference>> connectionRecord, Reference> clientDBInfo, diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index 7d275b544e..498d89b04e 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -230,27 +230,6 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { } } - ACTOR Future setUpBlobRange(Database cx, KeyRange keyRange) { - state Reference tr = makeReference(cx); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString()); - wait(krmSetRange(tr, blobRangeKeys.begin, keyRange, LiteralStringRef("1"))); - wait(tr->commit()); - if (BGW_DEBUG) { - fmt::print("Successfully set up blob granule range for tenant range [{0} - {1})\n", - keyRange.begin.printable(), - keyRange.end.printable()); - } - return Void(); - } catch (Error& e) { - wait(tr->onError(e)); - } - } - } - ACTOR Future setUpTenant(Database cx, TenantName name) { if (BGW_DEBUG) { fmt::print("Setting up blob granule range for tenant {0}\n", name.printable()); @@ -291,7 +270,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { self->directories[directoryIdx]->directoryRange = KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end)); tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry }); - wait(self->setUpBlobRange(cx, self->directories[directoryIdx]->directoryRange)); + wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange)); } tenantData.addTenants(tenants); From 5085c21d0d67f4638ff886067467f02038702542 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Thu, 4 Aug 2022 12:00:23 +0100 Subject: [PATCH 03/15] blob: check if blob range exists and fail accordingly --- fdbcli/BlobRangeCommand.actor.cpp | 14 +++++++--- fdbclient/NativeAPI.actor.cpp | 26 ++++++++++++++----- fdbclient/include/fdbclient/DatabaseContext.h | 4 +-- .../BlobGranuleCorrectnessWorkload.actor.cpp | 3 ++- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index a05dff4b44..56e75578bc 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -175,7 +175,7 @@ ACTOR Future blobRangeCommandActor(Database localDb, fmt::print("Invalid blob range [{0} - {1})\n", tokens[2].printable(), tokens[3].printable()); } else { if (tokencmp(tokens[1], "start") || tokencmp(tokens[1], "stop")) { - bool starting = tokencmp(tokens[1], "start"); + state bool starting = tokencmp(tokens[1], "start"); if (tokens.size() > 4) { printUsage(tokens[0]); return false; @@ -184,11 +184,19 @@ ACTOR Future blobRangeCommandActor(Database localDb, starting ? "Starting" : "Stopping", tokens[2].printable().c_str(), tokens[3].printable().c_str()); + state bool success = false; if (starting) { - wait(localDb->blobbifyRange(KeyRangeRef(begin, end))); + wait(store(success, localDb->blobbifyRange(KeyRangeRef(begin, end)))); } else { - wait(localDb->unblobbifyRange(KeyRangeRef(begin, end))); + wait(store(success, localDb->unblobbifyRange(KeyRangeRef(begin, end)))); } + if (!success) { + fmt::print("{0} blobbify range for [{1} - {2}) failed\n", + starting ? "Starting" : "Stopping", + tokens[2].printable().c_str(), + tokens[3].printable().c_str()); + } + return success; } else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) { bool purge = tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge"); bool forcePurge = tokencmp(tokens[1], "forcepurge"); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 04c4062ff8..7470957470 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -9808,19 +9808,31 @@ Future DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) { return waitPurgeGranulesCompleteActor(Reference::addRef(this), purgeKey); } -ACTOR Future setBlobRangeActor(Reference cx, KeyRange range, bool active) { +ACTOR Future setBlobRangeActor(Reference cx, KeyRange range, bool active) { state Database db(cx); state Reference tr = makeReference(db); - state Value value = active ? LiteralStringRef("1") : LiteralStringRef("0"); + state Value value = active ? blobRangeActive : blobRangeInactive; loop { try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - // FIXME: check that the set range is currently inactive, and that a revoked range is currently its own - // range in the map and fully set. + if (active) { + state RangeResult results = wait(krmGetRanges(tr, blobRangeKeys.begin, range)); + ASSERT(results.size() >= 2); + if (results[0].key == range.begin && results[1].key == range.end && + results[0].value == blobRangeActive) { + return true; + } else { + for (int i = 0; i < results.size(); i++) { + if (results[i].value == blobRangeActive) { + return false; + } + } + } + } tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString()); // This is not coalescing because we want to keep each range logically separate. @@ -9830,18 +9842,18 @@ ACTOR Future setBlobRangeActor(Reference cx, KeyRange ran range.begin.printable().c_str(), range.end.printable().c_str(), value.printable().c_str()); - return Void(); + return true; } catch (Error& e) { wait(tr->onError(e)); } } } -Future DatabaseContext::blobbifyRange(KeyRange range) { +Future DatabaseContext::blobbifyRange(KeyRange range) { return setBlobRangeActor(Reference::addRef(this), range, true); } -Future DatabaseContext::unblobbifyRange(KeyRange range) { +Future DatabaseContext::unblobbifyRange(KeyRange range) { return setBlobRangeActor(Reference::addRef(this), range, false); } diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 5f2d0c02e8..cf5cc6eee2 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -385,8 +385,8 @@ public: bool force = false); Future waitPurgeGranulesComplete(Key purgeKey); - Future blobbifyRange(KeyRange range); - Future unblobbifyRange(KeyRange range); + Future blobbifyRange(KeyRange range); + Future unblobbifyRange(KeyRange range); // private: explicit DatabaseContext(Reference>> connectionRecord, diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index 498d89b04e..aa70e31748 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -270,7 +270,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { self->directories[directoryIdx]->directoryRange = KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end)); tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry }); - wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange)); + bool _success = wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange)); + ASSERT(_success); } tenantData.addTenants(tenants); From de732c2603ce99f772bf71573ccb170de1443730 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Tue, 9 Aug 2022 09:53:13 +0100 Subject: [PATCH 04/15] bindings: add FutureBool future type --- bindings/c/fdb_c.cpp | 4 ++ bindings/c/foundationdb/fdb_c.h | 2 + bindings/java/CMakeLists.txt | 1 + bindings/java/fdbJNI.cpp | 18 +++++++++ .../com/apple/foundationdb/FutureBool.java | 37 +++++++++++++++++++ fdbclient/MultiVersionTransaction.actor.cpp | 1 + 6 files changed, 63 insertions(+) create mode 100644 bindings/java/src/main/com/apple/foundationdb/FutureBool.java diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 751698ab67..6adfe14bfe 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -239,6 +239,10 @@ fdb_error_t fdb_future_get_version_v619(FDBFuture* f, int64_t* out_version) { CATCH_AND_RETURN(*out_version = TSAV(Version, f)->get();); } +extern "C" DLLEXPORT fdb_error_t fdb_future_get_bool(FDBFuture* f, fdb_bool_t* out_value) { + CATCH_AND_RETURN(*out_value = TSAV(bool, f)->get();); +} + extern "C" DLLEXPORT fdb_error_t fdb_future_get_int64(FDBFuture* f, int64_t* out_value) { CATCH_AND_RETURN(*out_value = TSAV(int64_t, f)->get();); } diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 409fd8ef55..bb14efe6bb 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -227,6 +227,8 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_set_callback(FDBFuture* f, DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_error(FDBFuture* f); #endif +DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_bool(FDBFuture* f, fdb_bool_t* out); + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_int64(FDBFuture* f, int64_t* out); DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_uint64(FDBFuture* f, uint64_t* out); diff --git a/bindings/java/CMakeLists.txt b/bindings/java/CMakeLists.txt index 22564dccc8..c6361a34c7 100644 --- a/bindings/java/CMakeLists.txt +++ b/bindings/java/CMakeLists.txt @@ -34,6 +34,7 @@ set(JAVA_BINDING_SRCS src/main/com/apple/foundationdb/FDBDatabase.java src/main/com/apple/foundationdb/FDBTenant.java src/main/com/apple/foundationdb/FDBTransaction.java + src/main/com/apple/foundationdb/FutureBool.java src/main/com/apple/foundationdb/FutureInt64.java src/main/com/apple/foundationdb/FutureKey.java src/main/com/apple/foundationdb/FutureKeyArray.java diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index e685d3ee53..921a1117e9 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -25,6 +25,7 @@ #include "com_apple_foundationdb_FDB.h" #include "com_apple_foundationdb_FDBDatabase.h" #include "com_apple_foundationdb_FDBTransaction.h" +#include "com_apple_foundationdb_FutureBool.h" #include "com_apple_foundationdb_FutureInt64.h" #include "com_apple_foundationdb_FutureKey.h" #include "com_apple_foundationdb_FutureKeyArray.h" @@ -278,6 +279,23 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_NativeFuture_Future_1releaseM fdb_future_release_memory(var); } +JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FutureBool_FutureBool_1get(JNIEnv* jenv, jobject, jlong future) { + if (!future) { + throwParamNotNull(jenv); + return 0; + } + FDBFuture* f = (FDBFuture*)future; + + fdb_bool_t value = false; + fdb_error_t err = fdb_future_get_bool(f, &value); + if (err) { + safeThrow(jenv, getThrowable(jenv, err)); + return 0; + } + + return (jboolean)value; +} + JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureInt64_FutureInt64_1get(JNIEnv* jenv, jobject, jlong future) { if (!future) { throwParamNotNull(jenv); diff --git a/bindings/java/src/main/com/apple/foundationdb/FutureBool.java b/bindings/java/src/main/com/apple/foundationdb/FutureBool.java new file mode 100644 index 0000000000..ddbbd02649 --- /dev/null +++ b/bindings/java/src/main/com/apple/foundationdb/FutureBool.java @@ -0,0 +1,37 @@ +/* + * FutureBool.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb; + +import java.util.concurrent.Executor; + +class FutureBool extends NativeFuture { + FutureBool(long cPtr, Executor executor) { + super(cPtr); + registerMarshalCallback(executor); + } + + @Override + protected Boolean getIfDone_internal(long cPtr) throws FDBException { + return FutureBool_get(cPtr); + } + + private native boolean FutureBool_get(long cPtr) throws FDBException; +} diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 039224b50c..d6147910f1 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -744,6 +744,7 @@ void DLApi::init() { fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version", headerVersion >= 0); + loadClientFunction(&api->futureGetBool, lib, fdbCPath, "fdb_future_get_bool", headerVersion >= 720); loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64", headerVersion >= 700); loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error", headerVersion >= 0); loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key", headerVersion >= 0); From cbe9fba5e9de656d68ae25e589838ed011407332 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Sat, 6 Aug 2022 21:40:38 +0100 Subject: [PATCH 05/15] blob: (un)blobbifyRange() c api --- bindings/c/fdb_c.cpp | 22 +++++++++ bindings/c/foundationdb/fdb_c.h | 12 +++++ fdbclient/MultiVersionTransaction.actor.cpp | 45 +++++++++++++++++++ fdbclient/ThreadSafeTransaction.cpp | 12 +++++ fdbclient/include/fdbclient/IClientApi.h | 3 ++ .../fdbclient/MultiVersionTransaction.h | 18 ++++++++ .../include/fdbclient/ThreadSafeTransaction.h | 3 ++ 7 files changed, 115 insertions(+) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 6adfe14bfe..d0bf2c6a83 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -498,6 +498,28 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDat FDBFuture*)(DB(db)->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length)).extractPtr()); } +extern "C" DLLEXPORT FDBFuture* fdb_database_blobbify_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length) { + return (FDBFuture*)(DB(db) + ->blobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length), + StringRef(end_key_name, end_key_name_length))) + .extractPtr()); +} + +extern "C" DLLEXPORT FDBFuture* fdb_database_unblobbify_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length) { + return (FDBFuture*)(DB(db) + ->unblobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length), + StringRef(end_key_name, end_key_name_length))) + .extractPtr()); +} + extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) { CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr();); } diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index bb14efe6bb..1d6d464fbd 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -323,6 +323,18 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complet uint8_t const* purge_key_name, int purge_key_name_length); +DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_blobbify_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length); + +DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_unblobbify_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length); + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index d6147910f1..e35f49d7ef 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -583,6 +583,36 @@ ThreadFuture DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); }); } +ThreadFuture DLDatabase::blobbifyRange(const KeyRangeRef& keyRange) { + if (!api->databaseBlobbifyRange) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture* f = api->databaseBlobbifyRange( + db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size()); + + return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + bool ret = false; + ASSERT(!api->futureGetBool(f, &ret)); + return ret; + }); +} + +ThreadFuture DLDatabase::unblobbifyRange(const KeyRangeRef& keyRange) { + if (!api->databaseUnblobbifyRange) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture* f = api->databaseUnblobbifyRange( + db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size()); + + return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + bool ret = false; + ASSERT(!api->futureGetBool(f, &ret)); + return ret; + }); +} + // DLApi // Loads the specified function from a dynamic library @@ -670,6 +700,9 @@ void DLApi::init() { fdbCPath, "fdb_database_wait_purge_granules_complete", headerVersion >= 710); + loadClientFunction(&api->databaseBlobbifyRange, lib, fdbCPath, "fdb_database_blobbify_range", headerVersion >= 720); + loadClientFunction( + &api->databaseUnblobbifyRange, lib, fdbCPath, "fdb_database_unblobbify_range", headerVersion >= 720); loadClientFunction( &api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710); @@ -1590,6 +1623,18 @@ ThreadFuture MultiVersionDatabase::waitPurgeGranulesComplete(const KeyRef& return abortableFuture(f, dbState->dbVar->get().onChange); } +ThreadFuture MultiVersionDatabase::blobbifyRange(const KeyRangeRef& keyRange) { + auto dbVar = dbState->dbVar->get(); + auto f = dbVar.value ? dbVar.value->blobbifyRange(keyRange) : ThreadFuture(Never()); + return abortableFuture(f, dbVar.onChange); +} + +ThreadFuture MultiVersionDatabase::unblobbifyRange(const KeyRangeRef& keyRange) { + auto dbVar = dbState->dbVar->get(); + auto f = dbVar.value ? dbVar.value->unblobbifyRange(keyRange) : ThreadFuture(Never()); + return abortableFuture(f, dbVar.onChange); +} + // Returns the protocol version reported by the coordinator this client is connected to // If an expected version is given, the future won't return until the protocol version is different than expected // Note: this will never return if the server is running a protocol from FDB 5.0 or older diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index f0e6ea0c43..57f4b8aebc 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -144,6 +144,18 @@ ThreadFuture ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& p return onMainThread([db, key]() -> Future { return db->waitPurgeGranulesComplete(key); }); } +ThreadFuture ThreadSafeDatabase::blobbifyRange(const KeyRangeRef& keyRange) { + DatabaseContext* db = this->db; + KeyRange range = keyRange; + return onMainThread([=]() -> Future { return db->blobbifyRange(range); }); +} + +ThreadFuture ThreadSafeDatabase::unblobbifyRange(const KeyRangeRef& keyRange) { + DatabaseContext* db = this->db; + KeyRange range = keyRange; + return onMainThread([=]() -> Future { return db->blobbifyRange(range); }); +} + ThreadSafeDatabase::ThreadSafeDatabase(ConnectionRecordType connectionRecordType, std::string connectionRecordString, int apiVersion) { diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 06c54b7b39..65ae304ec2 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -172,6 +172,9 @@ public: virtual ThreadFuture purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0; virtual ThreadFuture waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0; + virtual ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) = 0; + virtual ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) = 0; + // Interface to manage shared state across multiple connections to the same Database virtual ThreadFuture createSharedState() = 0; virtual void setSharedState(DatabaseSharedState* p) = 0; diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index afc3112d9d..be2cac36e4 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -171,6 +171,18 @@ struct FdbCApi : public ThreadSafeReferenceCounted { uint8_t const* purge_key_name, int purge_key_name_length); + FDBFuture* (*databaseBlobbifyRange)(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length); + + FDBFuture* (*databaseUnblobbifyRange)(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length); + // Tenant fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction); @@ -476,6 +488,9 @@ public: ThreadFuture purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override; ThreadFuture waitPurgeGranulesComplete(const KeyRef& purgeKey) override; + ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; @@ -817,6 +832,9 @@ public: ThreadFuture purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override; ThreadFuture waitPurgeGranulesComplete(const KeyRef& purgeKey) override; + ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 8e9e0cb960..1fbb543292 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -62,6 +62,9 @@ public: ThreadFuture purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override; ThreadFuture waitPurgeGranulesComplete(const KeyRef& purgeKey) override; + ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; From 96f3dd67b0976991064789db7d5c44d71d3ec3e1 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Mon, 8 Aug 2022 12:47:20 +0100 Subject: [PATCH 06/15] blob: add listBlobbifiedRanges() api --- bindings/c/fdb_c.cpp | 13 +++++++ bindings/c/foundationdb/fdb_c.h | 7 ++++ fdbclient/MultiVersionTransaction.actor.cpp | 30 ++++++++++++++++ fdbclient/NativeAPI.actor.cpp | 34 +++++++++++++++++++ fdbclient/ThreadSafeTransaction.cpp | 8 +++++ fdbclient/include/fdbclient/DatabaseContext.h | 1 + fdbclient/include/fdbclient/IClientApi.h | 2 ++ .../fdbclient/MultiVersionTransaction.h | 11 ++++++ .../include/fdbclient/ThreadSafeTransaction.h | 2 ++ 9 files changed, 108 insertions(+) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index d0bf2c6a83..e0bf0c44ae 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -520,6 +520,19 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_unblobbify_range(FDBDatabase* db, .extractPtr()); } +extern "C" DLLEXPORT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + int rangeLimit) { + return (FDBFuture*)(DB(db) + ->listBlobbifiedRanges(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length), + StringRef(end_key_name, end_key_name_length)), + rangeLimit) + .extractPtr()); +} + extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) { CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr();); } diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 1d6d464fbd..238510a6f9 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -335,6 +335,13 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_unblobbify_range(FDBDatabas uint8_t const* end_key_name, int end_key_name_length); +DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + int rangeLimit); + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index e35f49d7ef..341d210d55 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -613,6 +613,26 @@ ThreadFuture DLDatabase::unblobbifyRange(const KeyRangeRef& keyRange) { }); } +ThreadFuture>> DLDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) { + if (!api->databaseListBlobbifiedRanges) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture* f = api->databaseListBlobbifiedRanges( + db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), rangeLimit); + + return toThreadFuture>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + const FdbCApi::FDBKeyRange* keyRanges; + int keyRangesLength; + FdbCApi::fdb_error_t error = api->futureGetKeyRangeArray(f, &keyRanges, &keyRangesLength); + ASSERT(!error); + // The memory for this is stored in the FDBFuture and is released when the future gets destroyed. + return Standalone>(VectorRef((KeyRangeRef*)keyRanges, keyRangesLength), + Arena()); + }); +} + // DLApi // Loads the specified function from a dynamic library @@ -703,6 +723,8 @@ void DLApi::init() { loadClientFunction(&api->databaseBlobbifyRange, lib, fdbCPath, "fdb_database_blobbify_range", headerVersion >= 720); loadClientFunction( &api->databaseUnblobbifyRange, lib, fdbCPath, "fdb_database_unblobbify_range", headerVersion >= 720); + loadClientFunction( + &api->databaseListBlobbifiedRanges, lib, fdbCPath, "fdb_database_list_blobbified_ranges", headerVersion >= 720); loadClientFunction( &api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710); @@ -1635,6 +1657,14 @@ ThreadFuture MultiVersionDatabase::unblobbifyRange(const KeyRangeRef& keyR return abortableFuture(f, dbVar.onChange); } +ThreadFuture>> MultiVersionDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) { + auto dbVar = dbState->dbVar->get(); + auto f = dbVar.value ? dbVar.value->listBlobbifiedRanges(keyRange, rangeLimit) + : ThreadFuture>>(Never()); + return abortableFuture(f, dbVar.onChange); +} + // Returns the protocol version reported by the coordinator this client is connected to // If an expected version is given, the future won't return until the protocol version is different than expected // Note: this will never return if the server is running a protocol from FDB 5.0 or older diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 7470957470..393a1d5537 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -9857,6 +9857,40 @@ Future DatabaseContext::unblobbifyRange(KeyRange range) { return setBlobRangeActor(Reference::addRef(this), range, false); } +ACTOR Future>> listBlobbifiedRangesActor(Reference cx, + KeyRange range, + int rangeLimit) { + state Database db(cx); + state Reference tr = makeReference(db); + state Standalone> blobRanges; + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + state RangeResult results = wait(krmGetRanges(tr, blobRangeKeys.begin, range, 2 * rangeLimit + 2)); + + blobRanges.arena().dependsOn(results.arena()); + for (int i = 0; i < results.size() - 1; i++) { + if (results[i].value == LiteralStringRef("1")) { + blobRanges.push_back(blobRanges.arena(), KeyRangeRef(results[i].value, results[i + 1].value)); + } + if (blobRanges.size() == rangeLimit) { + return blobRanges; + } + } + + return blobRanges; + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + +Future>> DatabaseContext::listBlobbifiedRanges(KeyRange range, int rowLimit) { + return listBlobbifiedRangesActor(Reference::addRef(this), range, rowLimit); +} + int64_t getMaxKeySize(KeyRef const& key) { return getMaxWriteKeySize(key, true); } diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 57f4b8aebc..831cb6d67b 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -156,6 +156,14 @@ ThreadFuture ThreadSafeDatabase::unblobbifyRange(const KeyRangeRef& keyRan return onMainThread([=]() -> Future { return db->blobbifyRange(range); }); } +ThreadFuture>> ThreadSafeDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) { + DatabaseContext* db = this->db; + KeyRange range = keyRange; + return onMainThread( + [=]() -> Future>> { return db->listBlobbifiedRanges(range, rangeLimit); }); +} + ThreadSafeDatabase::ThreadSafeDatabase(ConnectionRecordType connectionRecordType, std::string connectionRecordString, int apiVersion) { diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index cf5cc6eee2..5a04e9325a 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -387,6 +387,7 @@ public: Future blobbifyRange(KeyRange range); Future unblobbifyRange(KeyRange range); + Future>> listBlobbifiedRanges(KeyRange range, int rangeLimit); // private: explicit DatabaseContext(Reference>> connectionRecord, diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 65ae304ec2..794a2bb59c 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -174,6 +174,8 @@ public: virtual ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) = 0; virtual ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) = 0; + virtual ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) = 0; // Interface to manage shared state across multiple connections to the same Database virtual ThreadFuture createSharedState() = 0; diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index be2cac36e4..f82302bd2a 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -183,6 +183,13 @@ struct FdbCApi : public ThreadSafeReferenceCounted { uint8_t const* end_key_name, int end_key_name_length); + FDBFuture* (*databaseListBlobbifiedRanges)(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + int rangeLimit); + // Tenant fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction); @@ -490,6 +497,8 @@ public: ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; @@ -834,6 +843,8 @@ public: ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 1fbb543292..0abe57692a 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -64,6 +64,8 @@ public: ThreadFuture blobbifyRange(const KeyRangeRef& keyRange) override; ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; + ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; From ba148ba6f1d634bec1c5dfbc53f1e123e4d56d8d Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Mon, 8 Aug 2022 14:48:00 +0100 Subject: [PATCH 07/15] blob/java: add default executor functions for purgeBlobGranules --- .../main/com/apple/foundationdb/Database.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/bindings/java/src/main/com/apple/foundationdb/Database.java b/bindings/java/src/main/com/apple/foundationdb/Database.java index 8608effe53..f3da6bf2d6 100644 --- a/bindings/java/src/main/com/apple/foundationdb/Database.java +++ b/bindings/java/src/main/com/apple/foundationdb/Database.java @@ -161,6 +161,20 @@ public interface Database extends AutoCloseable, TransactionContext { */ double getMainThreadBusyness(); + /** + * Runs {@link #purgeBlobGranules(Function)} on the default executor. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param purgeVersion version to purge at + * @param force if true delete all data, if not keep data >= purgeVersion + * + * @return the key to watch for purge complete + */ + default CompletableFuture purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force) { + return purgeBlobGranules(beginKey, endKey, purgeVersion, force, getExecutor()); + } + /** * Queues a purge of blob granules for the specified key range, at the specified version. * @@ -168,15 +182,28 @@ public interface Database extends AutoCloseable, TransactionContext { * @param endKey end of the key range * @param purgeVersion version to purge at * @param force if true delete all data, if not keep data >= purgeVersion + * @param e the {@link Executor} to use for asynchronous callbacks + * @return the key to watch for purge complete */ CompletableFuture purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e); + /** - * Wait for a previous call to purgeBlobGranules to complete + * Runs {@link #waitPurgeGranulesComplete(Function)} on the default executor. * * @param purgeKey key to watch */ + default CompletableFuture waitPurgeGranulesComplete(byte[] purgeKey) { + return waitPurgeGranulesComplete(purgeKey, getExecutor()); + } + + /** + * Wait for a previous call to purgeBlobGranules to complete. + * + * @param purgeKey key to watch + * @param e the {@link Executor} to use for asynchronous callbacks + */ CompletableFuture waitPurgeGranulesComplete(byte[] purgeKey, Executor e); /** From 2d6b2e490c9069d2b0e5952dd03c1a68789b7efc Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Mon, 8 Aug 2022 14:55:21 +0100 Subject: [PATCH 08/15] blob/java: make purgeBlobGranules signatures match --- .../java/src/main/com/apple/foundationdb/FDBDatabase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java index 50a63cc910..62974fb8ee 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java @@ -201,20 +201,20 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume } @Override - public CompletableFuture purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor executor) { + public CompletableFuture purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e) { pointerReadLock.lock(); try { - return new FutureKey(Database_purgeBlobGranules(getPtr(), beginKey, endKey, purgeVersion, force), executor, eventKeeper); + return new FutureKey(Database_purgeBlobGranules(getPtr(), beginKey, endKey, purgeVersion, force), e, eventKeeper); } finally { pointerReadLock.unlock(); } } @Override - public CompletableFuture waitPurgeGranulesComplete(byte[] purgeKey, Executor executor) { + public CompletableFuture waitPurgeGranulesComplete(byte[] purgeKey, Executor e) { pointerReadLock.lock(); try { - return new FutureVoid(Database_waitPurgeGranulesComplete(getPtr(), purgeKey), executor); + return new FutureVoid(Database_waitPurgeGranulesComplete(getPtr(), purgeKey), e); } finally { pointerReadLock.unlock(); } From d2fd29dc52ccd5664f09c347cb05d8fa9b0db9c5 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Mon, 8 Aug 2022 15:30:37 +0100 Subject: [PATCH 09/15] blob/java: add (un)blobbifyRange api --- bindings/java/fdbJNI.cpp | 68 +++++++++++++++++++ .../main/com/apple/foundationdb/Database.java | 46 +++++++++++++ .../com/apple/foundationdb/FDBDatabase.java | 22 ++++++ 3 files changed, 136 insertions(+) diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 921a1117e9..0415f094bd 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -848,6 +848,74 @@ Java_com_apple_foundationdb_FDBDatabase_Database_1waitPurgeGranulesComplete(JNIE return (jlong)f; } +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1blobbifyRange(JNIEnv* jenv, + jobject, + jlong dbPtr, + jbyteArray beginKeyBytes, + jbyteArray endKeyBytes) { + if (!dbPtr || !beginKeyBytes || !endKeyBytes) { + throwParamNotNull(jenv); + return 0; + } + + FDBDatabase* database = (FDBDatabase*)dbPtr; + + uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL); + if (!beginKeyArr) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKeyArr) { + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_database_blobbify_range( + database, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes)); + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT); + jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT); + return (jlong)f; +} + +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1unblobbifyRange(JNIEnv* jenv, + jobject, + jlong dbPtr, + jbyteArray beginKeyBytes, + jbyteArray endKeyBytes) { + if (!dbPtr || !beginKeyBytes || !endKeyBytes) { + throwParamNotNull(jenv); + return 0; + } + + FDBDatabase* database = (FDBDatabase*)dbPtr; + + uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL); + if (!beginKeyArr) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKeyArr) { + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_database_unblobbify_range( + database, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes)); + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT); + jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT); + return (jlong)f; +} + JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FDB_Error_1predicate(JNIEnv* jenv, jobject, jint predicate, diff --git a/bindings/java/src/main/com/apple/foundationdb/Database.java b/bindings/java/src/main/com/apple/foundationdb/Database.java index f3da6bf2d6..fdec34efff 100644 --- a/bindings/java/src/main/com/apple/foundationdb/Database.java +++ b/bindings/java/src/main/com/apple/foundationdb/Database.java @@ -206,6 +206,52 @@ public interface Database extends AutoCloseable, TransactionContext { */ CompletableFuture waitPurgeGranulesComplete(byte[] purgeKey, Executor e); + /** + * Runs {@link #blobbifyRange(Function)} on the default executor. + * + * @param beginKey start of the key range + * @param endKey end of the key range + + * @return if the recording of the range was successful + */ + default CompletableFuture blobbifyRange(byte[] beginKey, byte[] endKey) { + return blobbifyRange(beginKey, endKey, getExecutor()); + } + + /** + * Sets a range to be blobbified in the database. Must be a completely unblobbified range. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param e the {@link Executor} to use for asynchronous callbacks + + * @return if the recording of the range was successful + */ + CompletableFuture blobbifyRange(byte[] beginKey, byte[] endKey, Executor e); + + /** + * Runs {@link #unblobbifyRange(Function)} on the default executor. + * + * @param beginKey start of the key range + * @param endKey end of the key range + + * @return if the recording of the range was successful + */ + default CompletableFuture unblobbifyRange(byte[] beginKey, byte[] endKey) { + return unblobbifyRange(beginKey, endKey, getExecutor()); + } + + /** + * Sets a range to be unblobbified in the database. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param e the {@link Executor} to use for asynchronous callbacks + + * @return if the recording of the range was successful + */ + CompletableFuture unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e); + /** * Runs a read-only transactional function against this {@code Database} with retry logic. * {@link Function#apply(Object) apply(ReadTransaction)} will be called on the diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java index 62974fb8ee..340495d0d0 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java @@ -220,6 +220,26 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume } } + @Override + public CompletableFuture blobbifyRange(byte[] beginKey, byte[] endKey, Executor e) { + pointerReadLock.lock(); + try { + return new FutureBool(Database_blobbifyRange(getPtr(), beginKey, endKey), e); + } finally { + pointerReadLock.unlock(); + } + } + + @Override + public CompletableFuture unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e) { + pointerReadLock.lock(); + try { + return new FutureBool(Database_unblobbifyRange(getPtr(), beginKey, endKey), e); + } finally { + pointerReadLock.unlock(); + } + } + @Override public Executor getExecutor() { return executor; @@ -237,4 +257,6 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume private native double Database_getMainThreadBusyness(long cPtr); private native long Database_purgeBlobGranules(long cPtr, byte[] beginKey, byte[] endKey, long purgeVersion, boolean force); private native long Database_waitPurgeGranulesComplete(long cPtr, byte[] purgeKey); + private native long Database_blobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); + private native long Database_unblobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); } \ No newline at end of file From e4f433a480bed0a774b4c8b0503bf880e843c82a Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Tue, 9 Aug 2022 12:52:42 +0100 Subject: [PATCH 10/15] bindings/java: add KeyRange bindings --- bindings/java/CMakeLists.txt | 2 + bindings/java/fdbJNI.cpp | 75 +++++++++++++++++++ .../foundationdb/FutureKeyRangeArray.java | 37 +++++++++ .../foundationdb/KeyRangeArrayResult.java | 36 +++++++++ 4 files changed, 150 insertions(+) create mode 100644 bindings/java/src/main/com/apple/foundationdb/FutureKeyRangeArray.java create mode 100644 bindings/java/src/main/com/apple/foundationdb/KeyRangeArrayResult.java diff --git a/bindings/java/CMakeLists.txt b/bindings/java/CMakeLists.txt index c6361a34c7..7057f22384 100644 --- a/bindings/java/CMakeLists.txt +++ b/bindings/java/CMakeLists.txt @@ -38,6 +38,7 @@ set(JAVA_BINDING_SRCS src/main/com/apple/foundationdb/FutureInt64.java src/main/com/apple/foundationdb/FutureKey.java src/main/com/apple/foundationdb/FutureKeyArray.java + src/main/com/apple/foundationdb/FutureKeyRangeArray.java src/main/com/apple/foundationdb/FutureResult.java src/main/com/apple/foundationdb/FutureResults.java src/main/com/apple/foundationdb/FutureMappedResults.java @@ -57,6 +58,7 @@ set(JAVA_BINDING_SRCS src/main/com/apple/foundationdb/RangeQuery.java src/main/com/apple/foundationdb/MappedRangeQuery.java src/main/com/apple/foundationdb/KeyArrayResult.java + src/main/com/apple/foundationdb/KeyRangeArrayResult.java src/main/com/apple/foundationdb/RangeResult.java src/main/com/apple/foundationdb/MappedRangeResult.java src/main/com/apple/foundationdb/RangeResultInfo.java diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 0415f094bd..bb935ccf62 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -29,6 +29,7 @@ #include "com_apple_foundationdb_FutureInt64.h" #include "com_apple_foundationdb_FutureKey.h" #include "com_apple_foundationdb_FutureKeyArray.h" +#include "com_apple_foundationdb_FutureKeyRangeArray.h" #include "com_apple_foundationdb_FutureResult.h" #include "com_apple_foundationdb_FutureResults.h" #include "com_apple_foundationdb_FutureStrings.h" @@ -56,7 +57,11 @@ static jclass mapped_range_result_class; static jclass mapped_key_value_class; static jclass string_class; static jclass key_array_result_class; +static jclass keyrange_class; +static jclass keyrange_array_result_class; static jmethodID key_array_result_init; +static jmethodID keyrange_init; +static jmethodID keyrange_array_result_init; static jmethodID range_result_init; static jmethodID mapped_range_result_init; static jmethodID mapped_key_value_from_bytes; @@ -425,6 +430,61 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureKeyArray_FutureKeyAr return result; } +JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureKeyRangeArray_FutureKeyRangeArray_1get(JNIEnv* jenv, + jobject, + jlong future) { + if (!future) { + throwParamNotNull(jenv); + return JNI_NULL; + } + + FDBFuture* f = (FDBFuture*)future; + + const FDBKeyRange* fdbKr; + int count; + fdb_error_t err = fdb_future_get_keyrange_array(f, &fdbKr, &count); + if (err) { + safeThrow(jenv, getThrowable(jenv, err)); + return JNI_NULL; + } + + jobjectArray kr_values = jenv->NewObjectArray(count, keyrange_class, NULL); + if (!kr_values) { + if (!jenv->ExceptionOccurred()) + throwOutOfMem(jenv); + return JNI_NULL; + } + + for (int i = 0; i < count; i++) { + jbyteArray beginArr = jenv->NewByteArray(fdbKr[i].begin_key_length); + if (!beginArr) { + if (!jenv->ExceptionOccurred()) + throwOutOfMem(jenv); + return JNI_NULL; + } + jbyteArray endArr = jenv->NewByteArray(fdbKr[i].end_key_length); + if (!endArr) { + if (!jenv->ExceptionOccurred()) + throwOutOfMem(jenv); + return JNI_NULL; + } + jenv->SetByteArrayRegion(beginArr, 0, fdbKr[i].begin_key_length, (const jbyte*)fdbKr[i].begin_key); + jenv->SetByteArrayRegion(endArr, 0, fdbKr[i].end_key_length, (const jbyte*)fdbKr[i].end_key); + + jobject kr = jenv->NewObject(keyrange_class, keyrange_init, beginArr, endArr); + if (jenv->ExceptionOccurred()) + return JNI_NULL; + jenv->SetObjectArrayElement(kr_values, i, kr); + if (jenv->ExceptionOccurred()) + return JNI_NULL; + } + jobject krarr = jenv->NewObject(keyrange_array_result_class, keyrange_array_result_init, kr_values); + if (jenv->ExceptionOccurred()) + return JNI_NULL; + + return krarr; +} + // SOMEDAY: explore doing this more efficiently with Direct ByteBuffers JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1get(JNIEnv* jenv, jobject, @@ -1832,6 +1892,15 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { key_array_result_init = env->GetMethodID(local_key_array_result_class, "", "([B[I)V"); key_array_result_class = (jclass)(env)->NewGlobalRef(local_key_array_result_class); + jclass local_keyrange_class = env->FindClass("com/apple/foundationdb/Range"); + keyrange_init = env->GetMethodID(local_keyrange_class, "", "([B[B)V"); + keyrange_class = (jclass)(env)->NewGlobalRef(local_keyrange_class); + + jclass local_keyrange_array_result_class = env->FindClass("com/apple/foundationdb/KeyRangeArrayResult"); + keyrange_array_result_init = + env->GetMethodID(local_keyrange_array_result_class, "", "([Lcom/apple/foundationdb/Range;)V"); + keyrange_array_result_class = (jclass)(env)->NewGlobalRef(local_keyrange_array_result_class); + jclass local_range_result_summary_class = env->FindClass("com/apple/foundationdb/RangeResultSummary"); range_result_summary_init = env->GetMethodID(local_range_result_summary_class, "", "([BIZ)V"); range_result_summary_class = (jclass)(env)->NewGlobalRef(local_range_result_summary_class); @@ -1856,6 +1925,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { if (range_result_class != JNI_NULL) { env->DeleteGlobalRef(range_result_class); } + if (keyrange_array_result_class != JNI_NULL) { + env->DeleteGlobalRef(keyrange_array_result_class); + } + if (keyrange_class != JNI_NULL) { + env->DeleteGlobalRef(keyrange_class); + } if (mapped_range_result_class != JNI_NULL) { env->DeleteGlobalRef(mapped_range_result_class); } diff --git a/bindings/java/src/main/com/apple/foundationdb/FutureKeyRangeArray.java b/bindings/java/src/main/com/apple/foundationdb/FutureKeyRangeArray.java new file mode 100644 index 0000000000..d866e9fca4 --- /dev/null +++ b/bindings/java/src/main/com/apple/foundationdb/FutureKeyRangeArray.java @@ -0,0 +1,37 @@ +/* + * FutureKeyRangeArray.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb; + +import java.util.concurrent.Executor; + +class FutureKeyRangeArray extends NativeFuture { + FutureKeyRangeArray(long cPtr, Executor executor) { + super(cPtr); + registerMarshalCallback(executor); + } + + @Override + protected KeyRangeArrayResult getIfDone_internal(long cPtr) throws FDBException { + return FutureKeyRangeArray_get(cPtr); + } + + private native KeyRangeArrayResult FutureKeyRangeArray_get(long cPtr) throws FDBException; +} diff --git a/bindings/java/src/main/com/apple/foundationdb/KeyRangeArrayResult.java b/bindings/java/src/main/com/apple/foundationdb/KeyRangeArrayResult.java new file mode 100644 index 0000000000..7385b8fe0a --- /dev/null +++ b/bindings/java/src/main/com/apple/foundationdb/KeyRangeArrayResult.java @@ -0,0 +1,36 @@ +/* + * KeyRangeArrayResult.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb; + +import java.util.Arrays; +import java.util.List; + +public class KeyRangeArrayResult { + final List keyRanges; + + public KeyRangeArrayResult(Range[] keyRangeArr) { + this.keyRanges = Arrays.asList(keyRangeArr); + } + + public List getKeyRanges() { + return keyRanges; + } +} From 03d59428593a79c91d58bf6f460d65c02e027aa2 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Tue, 9 Aug 2022 10:31:39 +0100 Subject: [PATCH 11/15] blob/java: listBlobbifiedRanges java bindings --- bindings/java/fdbJNI.cpp | 34 +++++++++++++++++++ .../main/com/apple/foundationdb/Database.java | 26 ++++++++++++++ .../com/apple/foundationdb/FDBDatabase.java | 11 ++++++ 3 files changed, 71 insertions(+) diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index bb935ccf62..d8b2dea3d7 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -976,6 +976,40 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1unblob return (jlong)f; } +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1listBlobbifiedRanges(JNIEnv* jenv, + jobject, + jlong dbPtr, + jbyteArray beginKeyBytes, + jbyteArray endKeyBytes, + jint rangeLimit) { + if (!dbPtr || !beginKeyBytes || !endKeyBytes) { + throwParamNotNull(jenv); + return 0; + } + FDBDatabase* tr = (FDBDatabase*)dbPtr; + + uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL); + if (!startKey) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKey) { + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_database_list_blobbified_ranges( + tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), rangeLimit); + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT); + return (jlong)f; +} + JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FDB_Error_1predicate(JNIEnv* jenv, jobject, jint predicate, diff --git a/bindings/java/src/main/com/apple/foundationdb/Database.java b/bindings/java/src/main/com/apple/foundationdb/Database.java index fdec34efff..52aab63ca1 100644 --- a/bindings/java/src/main/com/apple/foundationdb/Database.java +++ b/bindings/java/src/main/com/apple/foundationdb/Database.java @@ -252,6 +252,32 @@ public interface Database extends AutoCloseable, TransactionContext { */ CompletableFuture unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e); + /** + * Runs {@link #listBlobbifiedRanges(Function)} on the default executor. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param rangeLimit batch size + * @param e the {@link Executor} to use for asynchronous callbacks + + * @return a future with the list of blobbified ranges. + */ + default CompletableFuture listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit) { + return listBlobbifiedRanges(beginKey, endKey, rangeLimit, getExecutor()); + } + + /** + * Lists blobbified ranges in the database. There may be more if result.size() == rangeLimit. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param rangeLimit batch size + * @param e the {@link Executor} to use for asynchronous callbacks + + * @return a future with the list of blobbified ranges. + */ + CompletableFuture listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e); + /** * Runs a read-only transactional function against this {@code Database} with retry logic. * {@link Function#apply(Object) apply(ReadTransaction)} will be called on the diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java index 340495d0d0..80afa5f091 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java @@ -240,6 +240,16 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume } } + @Override + public CompletableFuture listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e) { + pointerReadLock.lock(); + try { + return new FutureKeyRangeArray(Database_listBlobbifiedRanges(getPtr(), beginKey, endKey, rangeLimit), e); + } finally { + pointerReadLock.unlock(); + } + } + @Override public Executor getExecutor() { return executor; @@ -259,4 +269,5 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume private native long Database_waitPurgeGranulesComplete(long cPtr, byte[] purgeKey); private native long Database_blobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); private native long Database_unblobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); + private native long Database_listBlobbifiedRanges(long cPtr, byte[] beginKey, byte[] endKey, int rangeLimit); } \ No newline at end of file From 1c2109dcbdcd54575589f83ef58d24c0bb81f812 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Tue, 9 Aug 2022 16:26:28 +0100 Subject: [PATCH 12/15] blob: add rangeLimit to getBlobGranuleRanges() --- bindings/c/fdb_c.cpp | 5 +- bindings/c/foundationdb/fdb_c.h | 3 +- .../TesterBlobGranuleCorrectnessWorkload.cpp | 2 +- bindings/c/test/fdb_api.hpp | 4 +- bindings/c/test/unit/fdb_api.cpp | 12 ++- bindings/c/test/unit/fdb_api.hpp | 2 +- bindings/c/test/unit/unit_tests.cpp | 2 +- fdbcli/BlobRangeCommand.actor.cpp | 77 ++++++++++--------- fdbclient/MultiVersionTransaction.actor.cpp | 10 ++- fdbclient/NativeAPI.actor.cpp | 13 +++- fdbclient/ReadYourWrites.actor.cpp | 5 +- fdbclient/ThreadSafeTransaction.cpp | 7 +- fdbclient/include/fdbclient/IClientApi.h | 3 +- .../include/fdbclient/IConfigTransaction.h | 2 +- .../fdbclient/ISingleThreadTransaction.h | 2 +- .../fdbclient/MultiVersionTransaction.h | 9 ++- fdbclient/include/fdbclient/NativeAPI.actor.h | 2 +- fdbclient/include/fdbclient/ReadYourWrites.h | 2 +- .../include/fdbclient/ThreadSafeTransaction.h | 3 +- fdbserver/BlobGranuleValidation.actor.cpp | 2 +- .../BlobGranuleCorrectnessWorkload.actor.cpp | 4 +- .../workloads/BlobGranuleVerifier.actor.cpp | 2 +- 22 files changed, 100 insertions(+), 73 deletions(-) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index e0bf0c44ae..f137ad952a 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -895,11 +895,12 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTrans uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, - int end_key_name_length) { + int end_key_name_length, + int rangeLimit) { RETURN_FUTURE_ON_ERROR( Standalone>, KeyRangeRef range(KeyRef(begin_key_name, begin_key_name_length), KeyRef(end_key_name, end_key_name_length)); - return (FDBFuture*)(TXN(tr)->getBlobGranuleRanges(range).extractPtr());); + return (FDBFuture*)(TXN(tr)->getBlobGranuleRanges(range, rangeLimit).extractPtr());); } extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* tr, diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 238510a6f9..a9b7862019 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -500,7 +500,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges( uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, - int end_key_name_length); + int end_key_name_length, + int rangeLimit); /* LatestVersion (-2) for readVersion means get read version from transaction Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */ diff --git a/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp b/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp index 81fa3d8d81..97e4e5bf01 100644 --- a/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp +++ b/bindings/c/test/apitester/TesterBlobGranuleCorrectnessWorkload.cpp @@ -180,7 +180,7 @@ private: } execTransaction( [begin, end, results](auto ctx) { - fdb::Future f = ctx->tx().getBlobGranuleRanges(begin, end).eraseType(); + fdb::Future f = ctx->tx().getBlobGranuleRanges(begin, end, 1000).eraseType(); ctx->continueAfter( f, [ctx, f, results]() { diff --git a/bindings/c/test/fdb_api.hpp b/bindings/c/test/fdb_api.hpp index bee40981c3..6d0db008a2 100644 --- a/bindings/c/test/fdb_api.hpp +++ b/bindings/c/test/fdb_api.hpp @@ -559,9 +559,9 @@ public: reverse); } - TypedFuture getBlobGranuleRanges(KeyRef begin, KeyRef end) { + TypedFuture getBlobGranuleRanges(KeyRef begin, KeyRef end, int rangeLimit) { return native::fdb_transaction_get_blob_granule_ranges( - tr.get(), begin.data(), intSize(begin), end.data(), intSize(end)); + tr.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit); } Result readBlobGranules(KeyRef begin, diff --git a/bindings/c/test/unit/fdb_api.cpp b/bindings/c/test/unit/fdb_api.cpp index d3c1dec30d..d454082af3 100644 --- a/bindings/c/test/unit/fdb_api.cpp +++ b/bindings/c/test/unit/fdb_api.cpp @@ -356,9 +356,15 @@ fdb_error_t Transaction::add_conflict_range(std::string_view begin_key, tr_, (const uint8_t*)begin_key.data(), begin_key.size(), (const uint8_t*)end_key.data(), end_key.size(), type); } -KeyRangeArrayFuture Transaction::get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key) { - return KeyRangeArrayFuture(fdb_transaction_get_blob_granule_ranges( - tr_, (const uint8_t*)begin_key.data(), begin_key.size(), (const uint8_t*)end_key.data(), end_key.size())); +KeyRangeArrayFuture Transaction::get_blob_granule_ranges(std::string_view begin_key, + std::string_view end_key, + int rangeLimit) { + return KeyRangeArrayFuture(fdb_transaction_get_blob_granule_ranges(tr_, + (const uint8_t*)begin_key.data(), + begin_key.size(), + (const uint8_t*)end_key.data(), + end_key.size(), + rangeLimit)); } KeyValueArrayResult Transaction::read_blob_granules(std::string_view begin_key, std::string_view end_key, diff --git a/bindings/c/test/unit/fdb_api.hpp b/bindings/c/test/unit/fdb_api.hpp index 7d44a30a9a..d0c4abd8db 100644 --- a/bindings/c/test/unit/fdb_api.hpp +++ b/bindings/c/test/unit/fdb_api.hpp @@ -348,7 +348,7 @@ public: // Wrapper around fdb_transaction_add_conflict_range. fdb_error_t add_conflict_range(std::string_view begin_key, std::string_view end_key, FDBConflictRangeType type); - KeyRangeArrayFuture get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key); + KeyRangeArrayFuture get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key, int rangeLimit); KeyValueArrayResult read_blob_granules(std::string_view begin_key, std::string_view end_key, int64_t beginVersion, diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp index 9f5c015bfb..2ab80cf90c 100644 --- a/bindings/c/test/unit/unit_tests.cpp +++ b/bindings/c/test/unit/unit_tests.cpp @@ -2853,7 +2853,7 @@ TEST_CASE("Blob Granule Functions") { // test ranges while (1) { - fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh")); + fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh"), 1000); fdb_error_t err = wait_future(f); if (err) { fdb::EmptyFuture f2 = tr.on_error(err); diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index 56e75578bc..7b41126b3b 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -86,52 +86,57 @@ ACTOR Future checkBlobSubrange(Database db, KeyRange keyRange, Optional } } -ACTOR Future doBlobCheck(Database db, Key startKey, Key endKey, Optional version) { +ACTOR Future verifyBlobRange(Database db, KeyRange range, Optional version) { state Transaction tr(db); - state Version readVersionOut = invalidVersion; - state double elapsed = -timer_monotonic(); - state KeyRange range = KeyRange(KeyRangeRef(startKey, endKey)); state Standalone> allRanges; + state KeyRange curRegion = KeyRangeRef(range.begin, range.begin); + state Version readVersionOut = invalidVersion; + state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2; loop { try { - wait(store(allRanges, tr.getBlobGranuleRanges(range))); - break; + wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize))); } catch (Error& e) { wait(tr.onError(e)); } - } - if (allRanges.empty()) { - fmt::print("ERROR: No blob ranges for [{0} - {1})\n", startKey.printable(), endKey.printable()); - return Void(); - } - fmt::print("Loaded {0} blob ranges to check\n", allRanges.size()); - state std::vector> checkParts; - // Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit - int maxChunkSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2; - KeyRange currentChunk; - int currentChunkSize = 0; - for (auto& it : allRanges) { - if (currentChunkSize == maxChunkSize) { - checkParts.push_back(checkBlobSubrange(db, currentChunk, version)); - currentChunkSize = 0; + if (allRanges.empty()) { + if (curRegion.begin < range.end) { + return invalidVersion; + } + return readVersionOut; } - if (currentChunkSize == 0) { - currentChunk = it; - } else if (it.begin != currentChunk.end) { - fmt::print("ERROR: Blobrange check failed, gap in blob ranges from [{0} - {1})\n", - currentChunk.end.printable(), - it.begin.printable()); - return Void(); - } else { - currentChunk = KeyRangeRef(currentChunk.begin, it.end); - } - currentChunkSize++; - } - checkParts.push_back(checkBlobSubrange(db, currentChunk, version)); - wait(waitForAll(checkParts)); - readVersionOut = checkParts.back().get(); + state std::vector> checkParts; + // Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit + int batchCount = 0; + for (auto& it : allRanges) { + if (it.begin != curRegion.end) { + return invalidVersion; + } + + curRegion = KeyRangeRef(curRegion.begin, it.end); + batchCount++; + + if (batchCount == batchSize) { + checkParts.push_back(checkBlobSubrange(db, curRegion, version)); + batchCount = 0; + curRegion = KeyRangeRef(curRegion.end, curRegion.end); + } + } + if (!curRegion.empty()) { + checkParts.push_back(checkBlobSubrange(db, curRegion, version)); + } + + wait(waitForAll(checkParts)); + readVersionOut = checkParts.back().get(); + curRegion = KeyRangeRef(curRegion.end, curRegion.end); + } +} + +ACTOR Future doBlobCheck(Database db, Key startKey, Key endKey, Optional version) { + state double elapsed = -timer_monotonic(); + + state Version readVersionOut = wait(verifyBlobRange(db, KeyRangeRef(startKey, endKey), version)); elapsed += timer_monotonic(); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 341d210d55..45411bf991 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -257,13 +257,14 @@ ThreadFuture>> DLTransaction::getRangeSplitPoints(c }); } -ThreadFuture>> DLTransaction::getBlobGranuleRanges(const KeyRangeRef& keyRange) { +ThreadFuture>> DLTransaction::getBlobGranuleRanges(const KeyRangeRef& keyRange, + int rangeLimit) { if (!api->transactionGetBlobGranuleRanges) { return unsupported_operation(); } FdbCApi::FDBFuture* f = api->transactionGetBlobGranuleRanges( - tr, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size()); + tr, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), rangeLimit); return toThreadFuture>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { const FdbCApi::FDBKeyRange* keyRanges; int keyRangesLength; @@ -1135,9 +1136,10 @@ ThreadFuture>> MultiVersionTransaction::getRangeSpl } ThreadFuture>> MultiVersionTransaction::getBlobGranuleRanges( - const KeyRangeRef& keyRange) { + const KeyRangeRef& keyRange, + int rangeLimit) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getBlobGranuleRanges(keyRange) + auto f = tr.transaction ? tr.transaction->getBlobGranuleRanges(keyRange, rangeLimit) : makeTimeout>>(); return abortableFuture(f, tr.onChange); } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 393a1d5537..813b06299e 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7638,7 +7638,9 @@ Future>> Transaction::getRangeSplitPoints(KeyRange // the blob granule requests are a bit funky because they piggyback off the existing transaction to read from the system // keyspace -ACTOR Future>> getBlobGranuleRangesActor(Transaction* self, KeyRange keyRange) { +ACTOR Future>> getBlobGranuleRangesActor(Transaction* self, + KeyRange keyRange, + int rangeLimit) { // FIXME: use streaming range read state KeyRange currentRange = keyRange; state Standalone> results; @@ -7661,7 +7663,7 @@ ACTOR Future>> getBlobGranuleRangesActor(Trans // basically krmGetRange, but enable it to not use tenant without RAW_ACCESS by doing manual getRange with // UseTenant::False - GetRangeLimits limits(1000); + GetRangeLimits limits(2 * rangeLimit + 2); limits.minRows = 2; RangeResult rawMapping = wait(getRange(self->trState, self->getReadVersion(), @@ -7683,6 +7685,9 @@ ACTOR Future>> getBlobGranuleRangesActor(Trans if (blobGranuleMapping[i].value.size()) { results.push_back(results.arena(), KeyRangeRef(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key)); + if (results.size() == rangeLimit) { + return results; + } } } results.arena().dependsOn(blobGranuleMapping.arena()); @@ -7694,8 +7699,8 @@ ACTOR Future>> getBlobGranuleRangesActor(Trans } } -Future>> Transaction::getBlobGranuleRanges(const KeyRange& range) { - return ::getBlobGranuleRangesActor(this, range); +Future>> Transaction::getBlobGranuleRanges(const KeyRange& range, int rangeLimit) { + return ::getBlobGranuleRangesActor(this, range, rangeLimit); } // hack (for now) to get blob worker interface into load balance diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 0635358402..d6437c7403 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1783,7 +1783,8 @@ Future>> ReadYourWritesTransaction::getRangeSplitPo return waitOrError(tr.getRangeSplitPoints(range, chunkSize), resetPromise.getFuture()); } -Future>> ReadYourWritesTransaction::getBlobGranuleRanges(const KeyRange& range) { +Future>> ReadYourWritesTransaction::getBlobGranuleRanges(const KeyRange& range, + int rangeLimit) { if (checkUsedDuringCommit()) { return used_during_commit(); } @@ -1794,7 +1795,7 @@ Future>> ReadYourWritesTransaction::getBlobGra if (range.begin > maxKey || range.end > maxKey) return key_outside_legal_range(); - return waitOrError(tr.getBlobGranuleRanges(range), resetPromise.getFuture()); + return waitOrError(tr.getBlobGranuleRanges(range, rangeLimit), resetPromise.getFuture()); } Future>> ReadYourWritesTransaction::readBlobGranules( diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 831cb6d67b..195a45b59a 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -379,13 +379,14 @@ ThreadFuture>> ThreadSafeTransaction::getAddre } ThreadFuture>> ThreadSafeTransaction::getBlobGranuleRanges( - const KeyRangeRef& keyRange) { + const KeyRangeRef& keyRange, + int rangeLimit) { ISingleThreadTransaction* tr = this->tr; KeyRange r = keyRange; - return onMainThread([tr, r]() -> Future>> { + return onMainThread([=]() -> Future>> { tr->checkDeferredError(); - return tr->getBlobGranuleRanges(r); + return tr->getBlobGranuleRanges(r, rangeLimit); }); } diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 794a2bb59c..87f2f486e3 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -78,7 +78,8 @@ public: virtual ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize) = 0; - virtual ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange) = 0; + virtual ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange, + int rowLimit) = 0; virtual ThreadResult readBlobGranules(const KeyRangeRef& keyRange, Version beginVersion, diff --git a/fdbclient/include/fdbclient/IConfigTransaction.h b/fdbclient/include/fdbclient/IConfigTransaction.h index 8f21679e27..9246e4016e 100644 --- a/fdbclient/include/fdbclient/IConfigTransaction.h +++ b/fdbclient/include/fdbclient/IConfigTransaction.h @@ -55,7 +55,7 @@ public: Future>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) override { throw client_invalid_operation(); } - Future>> getBlobGranuleRanges(KeyRange const& range) override { + Future>> getBlobGranuleRanges(KeyRange const& range, int rowLimit) override { throw client_invalid_operation(); } Future>> readBlobGranules(KeyRange const& range, diff --git a/fdbclient/include/fdbclient/ISingleThreadTransaction.h b/fdbclient/include/fdbclient/ISingleThreadTransaction.h index b44f58b464..6143ec8605 100644 --- a/fdbclient/include/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/include/fdbclient/ISingleThreadTransaction.h @@ -80,7 +80,7 @@ public: virtual Future>> getAddressesForKey(Key const& key) = 0; virtual Future>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) = 0; virtual Future getEstimatedRangeSizeBytes(KeyRange const& keys) = 0; - virtual Future>> getBlobGranuleRanges(KeyRange const& range) = 0; + virtual Future>> getBlobGranuleRanges(KeyRange const& range, int rangeLimit) = 0; virtual Future>> readBlobGranules(KeyRange const& range, Version begin, Optional readVersion, diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index f82302bd2a..5d31faebb4 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -295,7 +295,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted { uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, - int end_key_name_length); + int end_key_name_length, + int rangeLimit); FDBResult* (*transactionReadBlobGranules)(FDBTransaction* db, uint8_t const* begin_key_name, @@ -395,7 +396,8 @@ public: ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize) override; - ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override; + ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadResult readBlobGranules(const KeyRangeRef& keyRange, Version beginVersion, @@ -598,7 +600,8 @@ public: ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize) override; - ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override; + ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadResult readBlobGranules(const KeyRangeRef& keyRange, Version beginVersion, diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index a7f9a8a97e..79c1aa09fb 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -415,7 +415,7 @@ public: // The returned list would still be in form of [keys.begin, splitPoint1, splitPoint2, ... , keys.end] Future>> getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize); - Future>> getBlobGranuleRanges(const KeyRange& range); + Future>> getBlobGranuleRanges(const KeyRange& range, int rangeLimit); Future>> readBlobGranules(const KeyRange& range, Version begin, Optional readVersion, diff --git a/fdbclient/include/fdbclient/ReadYourWrites.h b/fdbclient/include/fdbclient/ReadYourWrites.h index 4e20bd68b8..46650be3d3 100644 --- a/fdbclient/include/fdbclient/ReadYourWrites.h +++ b/fdbclient/include/fdbclient/ReadYourWrites.h @@ -121,7 +121,7 @@ public: Future>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) override; Future getEstimatedRangeSizeBytes(const KeyRange& keys) override; - Future>> getBlobGranuleRanges(const KeyRange& range) override; + Future>> getBlobGranuleRanges(const KeyRange& range, int rangeLimit) override; Future>> readBlobGranules(const KeyRange& range, Version begin, Optional readVersion, diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 0abe57692a..4b979f09e6 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -154,7 +154,8 @@ public: ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize) override; - ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override; + ThreadFuture>> getBlobGranuleRanges(const KeyRangeRef& keyRange, + int rangeLimit) override; ThreadResult readBlobGranules(const KeyRangeRef& keyRange, Version beginVersion, diff --git a/fdbserver/BlobGranuleValidation.actor.cpp b/fdbserver/BlobGranuleValidation.actor.cpp index cffff4cdd9..53d7efe76b 100644 --- a/fdbserver/BlobGranuleValidation.actor.cpp +++ b/fdbserver/BlobGranuleValidation.actor.cpp @@ -189,7 +189,7 @@ ACTOR Future clearAndAwaitMerge(Database cx, KeyRange range) { state int reClearInterval = 1; // do quadratic backoff on clear rate, b/c large keys can keep it not write-cold loop { try { - Standalone> ranges = wait(tr.getBlobGranuleRanges(range)); + Standalone> ranges = wait(tr.getBlobGranuleRanges(range, 2)); if (ranges.size() == 1) { return Void(); } diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index aa70e31748..fb4edbf7c0 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -891,8 +891,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { loop { state Transaction tr(cx, threadData->tenantName); try { - Standalone> ranges = wait(tr.getBlobGranuleRanges(normalKeys)); - ASSERT(ranges.size() >= 1); + Standalone> ranges = wait(tr.getBlobGranuleRanges(normalKeys, 1000000)); + ASSERT(ranges.size() >= 1 && ranges.size() < 1000000); ASSERT(ranges.front().begin == normalKeys.begin); ASSERT(ranges.back().end == normalKeys.end); for (int i = 0; i < ranges.size() - 1; i++) { diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index 94bdddfc75..4f0aed31bf 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -169,7 +169,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload { state Transaction tr(cx); loop { try { - Standalone> allGranules = wait(tr.getBlobGranuleRanges(normalKeys)); + Standalone> allGranules = wait(tr.getBlobGranuleRanges(normalKeys, 1000000)); self->granuleRanges.set(allGranules); break; } catch (Error& e) { From 045076339d551514e4880ebe51db4d28816a6bfb Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Tue, 9 Aug 2022 14:50:27 +0100 Subject: [PATCH 13/15] bindings/java: implement getBlobGranuleRanges() bindings --- bindings/java/fdbJNI.cpp | 35 +++++++++++++++++++ .../apple/foundationdb/FDBTransaction.java | 16 +++++++++ .../apple/foundationdb/ReadTransaction.java | 11 ++++++ 3 files changed, 62 insertions(+) diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index d8b2dea3d7..a4958759d6 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -1487,6 +1487,41 @@ Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeSplitPoints(JNIE return (jlong)f; } +JNIEXPORT jlong JNICALL +Java_com_apple_foundationdb_FDBTransaction_Transaction_1getBlobGranuleRanges(JNIEnv* jenv, + jobject, + jlong tPtr, + jbyteArray beginKeyBytes, + jbyteArray endKeyBytes, + jint rowLimit) { + if (!tPtr || !beginKeyBytes || !endKeyBytes || !rowLimit) { + throwParamNotNull(jenv); + return 0; + } + FDBTransaction* tr = (FDBTransaction*)tPtr; + + uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL); + if (!startKey) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKey) { + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_transaction_get_blob_granule_ranges( + tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), rowLimit); + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT); + return (jlong)f; +} + JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1set(JNIEnv* jenv, jobject, jlong tPtr, diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java index b35196c146..7943c5e9d1 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java @@ -97,6 +97,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC return FDBTransaction.this.getRangeSplitPoints(range, chunkSize); } + @Override + public CompletableFuture getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit) { + return FDBTransaction.this.getBlobGranuleRanges(begin, end, rowLimit); + } + @Override public AsyncIterable getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit, int matchIndex, boolean reverse, @@ -352,6 +357,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC return this.getRangeSplitPoints(range.begin, range.end, chunkSize); } + @Override + public CompletableFuture getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit) { + pointerReadLock.lock(); + try { + return new FutureKeyRangeArray(Transaction_getBlobGranuleRanges(getPtr(), begin, end, rowLimit), executor); + } finally { + pointerReadLock.unlock(); + } + } + @Override public AsyncIterable getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit, int matchIndex, boolean reverse, StreamingMode mode) { @@ -842,4 +857,5 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC private native long Transaction_getKeyLocations(long cPtr, byte[] key); private native long Transaction_getEstimatedRangeSizeBytes(long cPtr, byte[] keyBegin, byte[] keyEnd); private native long Transaction_getRangeSplitPoints(long cPtr, byte[] keyBegin, byte[] keyEnd, long chunkSize); + private native long Transaction_getBlobGranuleRanges(long cPtr, byte[] keyBegin, byte[] keyEnd, int rowLimit); } diff --git a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java index 11ed7e900c..04050de6fb 100644 --- a/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/ReadTransaction.java @@ -513,6 +513,17 @@ public interface ReadTransaction extends ReadTransactionContext { */ CompletableFuture getRangeSplitPoints(Range range, long chunkSize); + /** + * Gets the blob granule ranges for a given region. + * Returned in batches, requires calling again moving the begin key up. + * + * @param begin beginning of the range (inclusive) + * @param end end of the range (exclusive) + + * @return list of blob granules in the given range. May not be all. + */ + CompletableFuture getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit); + /** * Returns a set of options that can be set on a {@code Transaction} From 3d400cff644cb512d68f05b86de97aa7d5a17cac Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Wed, 10 Aug 2022 00:19:28 +0100 Subject: [PATCH 14/15] blob: verifyBlobRange() c api --- bindings/c/fdb_c.cpp | 13 ++++ bindings/c/foundationdb/fdb_c.h | 7 ++ fdbcli/BlobRangeCommand.actor.cpp | 62 +----------------- fdbclient/MultiVersionTransaction.actor.cpp | 23 +++++++ fdbclient/NativeAPI.actor.cpp | 65 +++++++++++++++++++ fdbclient/ThreadSafeTransaction.cpp | 6 ++ fdbclient/include/fdbclient/DatabaseContext.h | 1 + fdbclient/include/fdbclient/IClientApi.h | 2 + .../fdbclient/MultiVersionTransaction.h | 9 +++ .../include/fdbclient/ThreadSafeTransaction.h | 2 + 10 files changed, 129 insertions(+), 61 deletions(-) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index f137ad952a..fdba399204 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -533,6 +533,19 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase* .extractPtr()); } +extern "C" DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + int64_t version) { + return (FDBFuture*)(DB(db) + ->verifyBlobRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length), + StringRef(end_key_name, end_key_name_length)), + version) + .extractPtr()); +} + extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) { CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr();); } diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index a9b7862019..10534a94dc 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -342,6 +342,13 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_list_blobbified_ranges(FDBD int end_key_name_length, int rangeLimit); +DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + int64_t version); + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction); diff --git a/fdbcli/BlobRangeCommand.actor.cpp b/fdbcli/BlobRangeCommand.actor.cpp index 7b41126b3b..4c6bdf9614 100644 --- a/fdbcli/BlobRangeCommand.actor.cpp +++ b/fdbcli/BlobRangeCommand.actor.cpp @@ -73,70 +73,10 @@ ACTOR Future doBlobPurge(Database db, Key startKey, Key endKey, Optional checkBlobSubrange(Database db, KeyRange keyRange, Optional version) { - state Transaction tr(db); - state Version readVersionOut = invalidVersion; - loop { - try { - wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut))); - return readVersionOut; - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} - -ACTOR Future verifyBlobRange(Database db, KeyRange range, Optional version) { - state Transaction tr(db); - state Standalone> allRanges; - state KeyRange curRegion = KeyRangeRef(range.begin, range.begin); - state Version readVersionOut = invalidVersion; - state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2; - loop { - try { - wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize))); - } catch (Error& e) { - wait(tr.onError(e)); - } - - if (allRanges.empty()) { - if (curRegion.begin < range.end) { - return invalidVersion; - } - return readVersionOut; - } - - state std::vector> checkParts; - // Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit - int batchCount = 0; - for (auto& it : allRanges) { - if (it.begin != curRegion.end) { - return invalidVersion; - } - - curRegion = KeyRangeRef(curRegion.begin, it.end); - batchCount++; - - if (batchCount == batchSize) { - checkParts.push_back(checkBlobSubrange(db, curRegion, version)); - batchCount = 0; - curRegion = KeyRangeRef(curRegion.end, curRegion.end); - } - } - if (!curRegion.empty()) { - checkParts.push_back(checkBlobSubrange(db, curRegion, version)); - } - - wait(waitForAll(checkParts)); - readVersionOut = checkParts.back().get(); - curRegion = KeyRangeRef(curRegion.end, curRegion.end); - } -} - ACTOR Future doBlobCheck(Database db, Key startKey, Key endKey, Optional version) { state double elapsed = -timer_monotonic(); - state Version readVersionOut = wait(verifyBlobRange(db, KeyRangeRef(startKey, endKey), version)); + state Version readVersionOut = wait(db->verifyBlobRange(KeyRangeRef(startKey, endKey), version)); elapsed += timer_monotonic(); diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 45411bf991..1d88e37b1a 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -634,6 +634,21 @@ ThreadFuture>> DLDatabase::listBlobbifiedRange }); } +ThreadFuture DLDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional version) { + if (!api->databaseVerifyBlobRange) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture* f = api->databaseVerifyBlobRange( + db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), version); + + return toThreadFuture(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + Version version = invalidVersion; + ASSERT(!api->futureGetInt64(f, &version)); + return version; + }); +} + // DLApi // Loads the specified function from a dynamic library @@ -726,6 +741,8 @@ void DLApi::init() { &api->databaseUnblobbifyRange, lib, fdbCPath, "fdb_database_unblobbify_range", headerVersion >= 720); loadClientFunction( &api->databaseListBlobbifiedRanges, lib, fdbCPath, "fdb_database_list_blobbified_ranges", headerVersion >= 720); + loadClientFunction( + &api->databaseVerifyBlobRange, lib, fdbCPath, "fdb_database_verify_blob_range", headerVersion >= 720); loadClientFunction( &api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710); @@ -1667,6 +1684,12 @@ ThreadFuture>> MultiVersionDatabase::listBlobb return abortableFuture(f, dbVar.onChange); } +ThreadFuture MultiVersionDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional version) { + auto dbVar = dbState->dbVar->get(); + auto f = dbVar.value ? dbVar.value->verifyBlobRange(keyRange, version) : ThreadFuture(Never()); + return abortableFuture(f, dbVar.onChange); +} + // Returns the protocol version reported by the coordinator this client is connected to // If an expected version is given, the future won't return until the protocol version is different than expected // Note: this will never return if the server is running a protocol from FDB 5.0 or older diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 813b06299e..dfa106f048 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -8012,6 +8012,71 @@ ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAw return version; } +ACTOR Future checkBlobSubrange(Database db, KeyRange keyRange, Optional version) { + state Transaction tr(db); + state Version readVersionOut = invalidVersion; + loop { + try { + wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut))); + return readVersionOut; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +ACTOR Future verifyBlobRangeActor(Reference cx, KeyRange range, Optional version) { + state Database db(cx); + state Transaction tr(db); + state Standalone> allRanges; + state KeyRange curRegion = KeyRangeRef(range.begin, range.begin); + state Version readVersionOut = invalidVersion; + state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2; + loop { + try { + wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize))); + } catch (Error& e) { + wait(tr.onError(e)); + } + + if (allRanges.empty()) { + if (curRegion.begin < range.end) { + return invalidVersion; + } + return readVersionOut; + } + + state std::vector> checkParts; + // Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit + int batchCount = 0; + for (auto& it : allRanges) { + if (it.begin != curRegion.end) { + return invalidVersion; + } + + curRegion = KeyRangeRef(curRegion.begin, it.end); + batchCount++; + + if (batchCount == batchSize) { + checkParts.push_back(checkBlobSubrange(db, curRegion, version)); + batchCount = 0; + curRegion = KeyRangeRef(curRegion.end, curRegion.end); + } + } + if (!curRegion.empty()) { + checkParts.push_back(checkBlobSubrange(db, curRegion, version)); + } + + wait(waitForAll(checkParts)); + readVersionOut = checkParts.back().get(); + curRegion = KeyRangeRef(curRegion.end, curRegion.end); + } +} + +Future DatabaseContext::verifyBlobRange(const KeyRange& range, Optional version) { + return verifyBlobRangeActor(Reference::addRef(this), range, version); +} + ACTOR Future>> readStorageWiggleValues(Database cx, bool primary, bool use_system_priority) { diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 195a45b59a..7684980c7b 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -164,6 +164,12 @@ ThreadFuture>> ThreadSafeDatabase::listBlobbif [=]() -> Future>> { return db->listBlobbifiedRanges(range, rangeLimit); }); } +ThreadFuture ThreadSafeDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional version) { + DatabaseContext* db = this->db; + KeyRange range = keyRange; + return onMainThread([=]() -> Future { return db->verifyBlobRange(range, version); }); +} + ThreadSafeDatabase::ThreadSafeDatabase(ConnectionRecordType connectionRecordType, std::string connectionRecordString, int apiVersion) { diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 5a04e9325a..5d36b4fe8c 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -388,6 +388,7 @@ public: Future blobbifyRange(KeyRange range); Future unblobbifyRange(KeyRange range); Future>> listBlobbifiedRanges(KeyRange range, int rangeLimit); + Future verifyBlobRange(const KeyRange& range, Optional version); // private: explicit DatabaseContext(Reference>> connectionRecord, diff --git a/fdbclient/include/fdbclient/IClientApi.h b/fdbclient/include/fdbclient/IClientApi.h index 87f2f486e3..90a5612acd 100644 --- a/fdbclient/include/fdbclient/IClientApi.h +++ b/fdbclient/include/fdbclient/IClientApi.h @@ -178,6 +178,8 @@ public: virtual ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, int rangeLimit) = 0; + virtual ThreadFuture verifyBlobRange(const KeyRangeRef& keyRange, Optional version) = 0; + // Interface to manage shared state across multiple connections to the same Database virtual ThreadFuture createSharedState() = 0; virtual void setSharedState(DatabaseSharedState* p) = 0; diff --git a/fdbclient/include/fdbclient/MultiVersionTransaction.h b/fdbclient/include/fdbclient/MultiVersionTransaction.h index 5d31faebb4..dc4088a57b 100644 --- a/fdbclient/include/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/include/fdbclient/MultiVersionTransaction.h @@ -190,6 +190,13 @@ struct FdbCApi : public ThreadSafeReferenceCounted { int end_key_name_length, int rangeLimit); + FDBFuture* (*databaseVerifyBlobRange)(FDBDatabase* db, + uint8_t const* begin_key_name, + int begin_key_name_length, + uint8_t const* end_key_name, + int end_key_name_length, + Optional version); + // Tenant fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction); @@ -501,6 +508,7 @@ public: ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, int rangeLimit) override; + ThreadFuture verifyBlobRange(const KeyRangeRef& keyRange, Optional version) override; ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; @@ -848,6 +856,7 @@ public: ThreadFuture unblobbifyRange(const KeyRangeRef& keyRange) override; ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, int rangeLimit) override; + ThreadFuture verifyBlobRange(const KeyRangeRef& keyRange, Optional version) override; ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; diff --git a/fdbclient/include/fdbclient/ThreadSafeTransaction.h b/fdbclient/include/fdbclient/ThreadSafeTransaction.h index 4b979f09e6..a069c648be 100644 --- a/fdbclient/include/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/include/fdbclient/ThreadSafeTransaction.h @@ -67,6 +67,8 @@ public: ThreadFuture>> listBlobbifiedRanges(const KeyRangeRef& keyRange, int rangeLimit) override; + ThreadFuture verifyBlobRange(const KeyRangeRef& keyRange, Optional version) override; + ThreadFuture createSharedState() override; void setSharedState(DatabaseSharedState* p) override; From fb3ce21d195327150db581d6b9f31de66495bc51 Mon Sep 17 00:00:00 2001 From: Dennis Zhou Date: Wed, 10 Aug 2022 00:31:55 +0100 Subject: [PATCH 15/15] blob/java: verifyBlobRange() java bindings --- bindings/java/fdbJNI.cpp | 34 +++++++++++++++++++ .../main/com/apple/foundationdb/Database.java | 24 +++++++++++++ .../com/apple/foundationdb/FDBDatabase.java | 11 ++++++ 3 files changed, 69 insertions(+) diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index a4958759d6..c2b5ea90cc 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -1010,6 +1010,40 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1listBl return (jlong)f; } +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1verifyBlobRange(JNIEnv* jenv, + jobject, + jlong dbPtr, + jbyteArray beginKeyBytes, + jbyteArray endKeyBytes, + jlong version) { + if (!dbPtr || !beginKeyBytes || !endKeyBytes) { + throwParamNotNull(jenv); + return 0; + } + FDBDatabase* tr = (FDBDatabase*)dbPtr; + + uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL); + if (!startKey) { + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL); + if (!endKey) { + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + if (!jenv->ExceptionOccurred()) + throwRuntimeEx(jenv, "Error getting handle to native resources"); + return 0; + } + + FDBFuture* f = fdb_database_list_blobbified_ranges( + tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), version); + jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT); + jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT); + return (jlong)f; +} + JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FDB_Error_1predicate(JNIEnv* jenv, jobject, jint predicate, diff --git a/bindings/java/src/main/com/apple/foundationdb/Database.java b/bindings/java/src/main/com/apple/foundationdb/Database.java index 52aab63ca1..6be76fdf32 100644 --- a/bindings/java/src/main/com/apple/foundationdb/Database.java +++ b/bindings/java/src/main/com/apple/foundationdb/Database.java @@ -278,6 +278,30 @@ public interface Database extends AutoCloseable, TransactionContext { */ CompletableFuture listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e); + /** + * Runs {@link #verifyBlobRange(Function)} on the default executor. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param version version to read at + * + * @return a future with the version of the last blob granule. + */ + default CompletableFuture verifyBlobRange(byte[] beginKey, byte[] endKey, long version) { + return verifyBlobRange(beginKey, endKey, version, getExecutor()); + } + + /** + * Checks if a blob range is blobbified. + * + * @param beginKey start of the key range + * @param endKey end of the key range + * @param version version to read at + * + * @return a future with the version of the last blob granule. + */ + CompletableFuture verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e); + /** * Runs a read-only transactional function against this {@code Database} with retry logic. * {@link Function#apply(Object) apply(ReadTransaction)} will be called on the diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java index 80afa5f091..98c001a1b0 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBDatabase.java @@ -250,6 +250,16 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume } } + @Override + public CompletableFuture verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e) { + pointerReadLock.lock(); + try { + return new FutureInt64(Database_verifyBlobRange(getPtr(), beginKey, endKey, version), e); + } finally { + pointerReadLock.unlock(); + } + } + @Override public Executor getExecutor() { return executor; @@ -270,4 +280,5 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume private native long Database_blobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); private native long Database_unblobbifyRange(long cPtr, byte[] beginKey, byte[] endKey); private native long Database_listBlobbifiedRanges(long cPtr, byte[] beginKey, byte[] endKey, int rangeLimit); + private native long Database_verifyBlobRange(long cPtr, byte[] beginKey, byte[] endKey, long version); } \ No newline at end of file