Merge pull request #7835 from sfc-gh-dzhou/blobapi

blob: java bindings for various blob management functions
This commit is contained in:
Dennis Zhou 2022-08-16 15:11:22 -07:00 committed by GitHub
commit 750c5a9ca0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1088 additions and 170 deletions

View File

@ -239,6 +239,10 @@ fdb_error_t fdb_future_get_version_v619(FDBFuture* f, int64_t* out_version) {
CATCH_AND_RETURN(*out_version = TSAV(Version, f)->get(););
}
extern "C" DLLEXPORT fdb_error_t fdb_future_get_bool(FDBFuture* f, fdb_bool_t* out_value) {
CATCH_AND_RETURN(*out_value = TSAV(bool, f)->get(););
}
extern "C" DLLEXPORT fdb_error_t fdb_future_get_int64(FDBFuture* f, int64_t* out_value) {
CATCH_AND_RETURN(*out_value = TSAV(int64_t, f)->get(););
}
@ -494,6 +498,54 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDat
FDBFuture*)(DB(db)->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length)).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_blobbify_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length) {
return (FDBFuture*)(DB(db)
->blobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)))
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_unblobbify_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length) {
return (FDBFuture*)(DB(db)
->unblobbifyRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)))
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int rangeLimit) {
return (FDBFuture*)(DB(db)
->listBlobbifiedRanges(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
rangeLimit)
.extractPtr());
}
extern "C" DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t version) {
return (FDBFuture*)(DB(db)
->verifyBlobRange(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
version)
.extractPtr());
}
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}
@ -856,11 +908,12 @@ extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTrans
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length) {
int end_key_name_length,
int rangeLimit) {
RETURN_FUTURE_ON_ERROR(
Standalone<VectorRef<KeyRangeRef>>,
KeyRangeRef range(KeyRef(begin_key_name, begin_key_name_length), KeyRef(end_key_name, end_key_name_length));
return (FDBFuture*)(TXN(tr)->getBlobGranuleRanges(range).extractPtr()););
return (FDBFuture*)(TXN(tr)->getBlobGranuleRanges(range, rangeLimit).extractPtr()););
}
extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* tr,

View File

@ -227,6 +227,8 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_set_callback(FDBFuture* f,
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_error(FDBFuture* f);
#endif
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_bool(FDBFuture* f, fdb_bool_t* out);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_int64(FDBFuture* f, int64_t* out);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_uint64(FDBFuture* f, uint64_t* out);
@ -321,6 +323,32 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complet
uint8_t const* purge_key_name,
int purge_key_name_length);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_blobbify_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_unblobbify_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_list_blobbified_ranges(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int rangeLimit);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_verify_blob_range(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t version);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);
@ -479,7 +507,8 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
int end_key_name_length,
int rangeLimit);
/* LatestVersion (-2) for readVersion means get read version from transaction
Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */

View File

@ -180,7 +180,7 @@ private:
}
execTransaction(
[begin, end, results](auto ctx) {
fdb::Future f = ctx->tx().getBlobGranuleRanges(begin, end).eraseType();
fdb::Future f = ctx->tx().getBlobGranuleRanges(begin, end, 1000).eraseType();
ctx->continueAfter(
f,
[ctx, f, results]() {

View File

@ -559,9 +559,9 @@ public:
reverse);
}
TypedFuture<future_var::KeyRangeRefArray> getBlobGranuleRanges(KeyRef begin, KeyRef end) {
TypedFuture<future_var::KeyRangeRefArray> getBlobGranuleRanges(KeyRef begin, KeyRef end, int rangeLimit) {
return native::fdb_transaction_get_blob_granule_ranges(
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end));
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit);
}
Result readBlobGranules(KeyRef begin,

View File

@ -356,9 +356,15 @@ fdb_error_t Transaction::add_conflict_range(std::string_view begin_key,
tr_, (const uint8_t*)begin_key.data(), begin_key.size(), (const uint8_t*)end_key.data(), end_key.size(), type);
}
KeyRangeArrayFuture Transaction::get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key) {
return KeyRangeArrayFuture(fdb_transaction_get_blob_granule_ranges(
tr_, (const uint8_t*)begin_key.data(), begin_key.size(), (const uint8_t*)end_key.data(), end_key.size()));
KeyRangeArrayFuture Transaction::get_blob_granule_ranges(std::string_view begin_key,
std::string_view end_key,
int rangeLimit) {
return KeyRangeArrayFuture(fdb_transaction_get_blob_granule_ranges(tr_,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),
end_key.size(),
rangeLimit));
}
KeyValueArrayResult Transaction::read_blob_granules(std::string_view begin_key,
std::string_view end_key,

View File

@ -348,7 +348,7 @@ public:
// Wrapper around fdb_transaction_add_conflict_range.
fdb_error_t add_conflict_range(std::string_view begin_key, std::string_view end_key, FDBConflictRangeType type);
KeyRangeArrayFuture get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key);
KeyRangeArrayFuture get_blob_granule_ranges(std::string_view begin_key, std::string_view end_key, int rangeLimit);
KeyValueArrayResult read_blob_granules(std::string_view begin_key,
std::string_view end_key,
int64_t beginVersion,

View File

@ -2853,7 +2853,7 @@ TEST_CASE("Blob Granule Functions") {
// test ranges
while (1) {
fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh"));
fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh"), 1000);
fdb_error_t err = wait_future(f);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);

View File

@ -34,9 +34,11 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTenant.java
src/main/com/apple/foundationdb/FDBTransaction.java
src/main/com/apple/foundationdb/FutureBool.java
src/main/com/apple/foundationdb/FutureInt64.java
src/main/com/apple/foundationdb/FutureKey.java
src/main/com/apple/foundationdb/FutureKeyArray.java
src/main/com/apple/foundationdb/FutureKeyRangeArray.java
src/main/com/apple/foundationdb/FutureResult.java
src/main/com/apple/foundationdb/FutureResults.java
src/main/com/apple/foundationdb/FutureMappedResults.java
@ -56,6 +58,7 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/RangeQuery.java
src/main/com/apple/foundationdb/MappedRangeQuery.java
src/main/com/apple/foundationdb/KeyArrayResult.java
src/main/com/apple/foundationdb/KeyRangeArrayResult.java
src/main/com/apple/foundationdb/RangeResult.java
src/main/com/apple/foundationdb/MappedRangeResult.java
src/main/com/apple/foundationdb/RangeResultInfo.java

View File

@ -25,9 +25,11 @@
#include "com_apple_foundationdb_FDB.h"
#include "com_apple_foundationdb_FDBDatabase.h"
#include "com_apple_foundationdb_FDBTransaction.h"
#include "com_apple_foundationdb_FutureBool.h"
#include "com_apple_foundationdb_FutureInt64.h"
#include "com_apple_foundationdb_FutureKey.h"
#include "com_apple_foundationdb_FutureKeyArray.h"
#include "com_apple_foundationdb_FutureKeyRangeArray.h"
#include "com_apple_foundationdb_FutureResult.h"
#include "com_apple_foundationdb_FutureResults.h"
#include "com_apple_foundationdb_FutureStrings.h"
@ -55,7 +57,11 @@ static jclass mapped_range_result_class;
static jclass mapped_key_value_class;
static jclass string_class;
static jclass key_array_result_class;
static jclass keyrange_class;
static jclass keyrange_array_result_class;
static jmethodID key_array_result_init;
static jmethodID keyrange_init;
static jmethodID keyrange_array_result_init;
static jmethodID range_result_init;
static jmethodID mapped_range_result_init;
static jmethodID mapped_key_value_from_bytes;
@ -278,6 +284,23 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_NativeFuture_Future_1releaseM
fdb_future_release_memory(var);
}
JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FutureBool_FutureBool_1get(JNIEnv* jenv, jobject, jlong future) {
if (!future) {
throwParamNotNull(jenv);
return 0;
}
FDBFuture* f = (FDBFuture*)future;
fdb_bool_t value = false;
fdb_error_t err = fdb_future_get_bool(f, &value);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return 0;
}
return (jboolean)value;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureInt64_FutureInt64_1get(JNIEnv* jenv, jobject, jlong future) {
if (!future) {
throwParamNotNull(jenv);
@ -407,6 +430,61 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureKeyArray_FutureKeyAr
return result;
}
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureKeyRangeArray_FutureKeyRangeArray_1get(JNIEnv* jenv,
jobject,
jlong future) {
if (!future) {
throwParamNotNull(jenv);
return JNI_NULL;
}
FDBFuture* f = (FDBFuture*)future;
const FDBKeyRange* fdbKr;
int count;
fdb_error_t err = fdb_future_get_keyrange_array(f, &fdbKr, &count);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return JNI_NULL;
}
jobjectArray kr_values = jenv->NewObjectArray(count, keyrange_class, NULL);
if (!kr_values) {
if (!jenv->ExceptionOccurred())
throwOutOfMem(jenv);
return JNI_NULL;
}
for (int i = 0; i < count; i++) {
jbyteArray beginArr = jenv->NewByteArray(fdbKr[i].begin_key_length);
if (!beginArr) {
if (!jenv->ExceptionOccurred())
throwOutOfMem(jenv);
return JNI_NULL;
}
jbyteArray endArr = jenv->NewByteArray(fdbKr[i].end_key_length);
if (!endArr) {
if (!jenv->ExceptionOccurred())
throwOutOfMem(jenv);
return JNI_NULL;
}
jenv->SetByteArrayRegion(beginArr, 0, fdbKr[i].begin_key_length, (const jbyte*)fdbKr[i].begin_key);
jenv->SetByteArrayRegion(endArr, 0, fdbKr[i].end_key_length, (const jbyte*)fdbKr[i].end_key);
jobject kr = jenv->NewObject(keyrange_class, keyrange_init, beginArr, endArr);
if (jenv->ExceptionOccurred())
return JNI_NULL;
jenv->SetObjectArrayElement(kr_values, i, kr);
if (jenv->ExceptionOccurred())
return JNI_NULL;
}
jobject krarr = jenv->NewObject(keyrange_array_result_class, keyrange_array_result_init, kr_values);
if (jenv->ExceptionOccurred())
return JNI_NULL;
return krarr;
}
// SOMEDAY: explore doing this more efficiently with Direct ByteBuffers
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1get(JNIEnv* jenv,
jobject,
@ -830,6 +908,142 @@ Java_com_apple_foundationdb_FDBDatabase_Database_1waitPurgeGranulesComplete(JNIE
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1blobbifyRange(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray beginKeyBytes,
jbyteArray endKeyBytes) {
if (!dbPtr || !beginKeyBytes || !endKeyBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* database = (FDBDatabase*)dbPtr;
uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
if (!beginKeyArr) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKeyArr) {
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_blobbify_range(
database, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes));
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1unblobbifyRange(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray beginKeyBytes,
jbyteArray endKeyBytes) {
if (!dbPtr || !beginKeyBytes || !endKeyBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* database = (FDBDatabase*)dbPtr;
uint8_t* beginKeyArr = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
if (!beginKeyArr) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
uint8_t* endKeyArr = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKeyArr) {
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_unblobbify_range(
database, beginKeyArr, jenv->GetArrayLength(beginKeyBytes), endKeyArr, jenv->GetArrayLength(endKeyBytes));
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)beginKeyArr, JNI_ABORT);
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKeyArr, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1listBlobbifiedRanges(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray beginKeyBytes,
jbyteArray endKeyBytes,
jint rangeLimit) {
if (!dbPtr || !beginKeyBytes || !endKeyBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* tr = (FDBDatabase*)dbPtr;
uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
if (!startKey) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKey) {
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_list_blobbified_ranges(
tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), rangeLimit);
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1verifyBlobRange(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray beginKeyBytes,
jbyteArray endKeyBytes,
jlong version) {
if (!dbPtr || !beginKeyBytes || !endKeyBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* tr = (FDBDatabase*)dbPtr;
uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
if (!startKey) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKey) {
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_list_blobbified_ranges(
tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), version);
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jboolean JNICALL Java_com_apple_foundationdb_FDB_Error_1predicate(JNIEnv* jenv,
jobject,
jint predicate,
@ -1307,6 +1521,41 @@ Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeSplitPoints(JNIE
return (jlong)f;
}
JNIEXPORT jlong JNICALL
Java_com_apple_foundationdb_FDBTransaction_Transaction_1getBlobGranuleRanges(JNIEnv* jenv,
jobject,
jlong tPtr,
jbyteArray beginKeyBytes,
jbyteArray endKeyBytes,
jint rowLimit) {
if (!tPtr || !beginKeyBytes || !endKeyBytes || !rowLimit) {
throwParamNotNull(jenv);
return 0;
}
FDBTransaction* tr = (FDBTransaction*)tPtr;
uint8_t* startKey = (uint8_t*)jenv->GetByteArrayElements(beginKeyBytes, JNI_NULL);
if (!startKey) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
uint8_t* endKey = (uint8_t*)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
if (!endKey) {
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_transaction_get_blob_granule_ranges(
tr, startKey, jenv->GetArrayLength(beginKeyBytes), endKey, jenv->GetArrayLength(endKeyBytes), rowLimit);
jenv->ReleaseByteArrayElements(beginKeyBytes, (jbyte*)startKey, JNI_ABORT);
jenv->ReleaseByteArrayElements(endKeyBytes, (jbyte*)endKey, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1set(JNIEnv* jenv,
jobject,
jlong tPtr,
@ -1746,6 +1995,15 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
key_array_result_init = env->GetMethodID(local_key_array_result_class, "<init>", "([B[I)V");
key_array_result_class = (jclass)(env)->NewGlobalRef(local_key_array_result_class);
jclass local_keyrange_class = env->FindClass("com/apple/foundationdb/Range");
keyrange_init = env->GetMethodID(local_keyrange_class, "<init>", "([B[B)V");
keyrange_class = (jclass)(env)->NewGlobalRef(local_keyrange_class);
jclass local_keyrange_array_result_class = env->FindClass("com/apple/foundationdb/KeyRangeArrayResult");
keyrange_array_result_init =
env->GetMethodID(local_keyrange_array_result_class, "<init>", "([Lcom/apple/foundationdb/Range;)V");
keyrange_array_result_class = (jclass)(env)->NewGlobalRef(local_keyrange_array_result_class);
jclass local_range_result_summary_class = env->FindClass("com/apple/foundationdb/RangeResultSummary");
range_result_summary_init = env->GetMethodID(local_range_result_summary_class, "<init>", "([BIZ)V");
range_result_summary_class = (jclass)(env)->NewGlobalRef(local_range_result_summary_class);
@ -1770,6 +2028,12 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
if (range_result_class != JNI_NULL) {
env->DeleteGlobalRef(range_result_class);
}
if (keyrange_array_result_class != JNI_NULL) {
env->DeleteGlobalRef(keyrange_array_result_class);
}
if (keyrange_class != JNI_NULL) {
env->DeleteGlobalRef(keyrange_class);
}
if (mapped_range_result_class != JNI_NULL) {
env->DeleteGlobalRef(mapped_range_result_class);
}

View File

@ -161,6 +161,20 @@ public interface Database extends AutoCloseable, TransactionContext {
*/
double getMainThreadBusyness();
/**
* Runs {@link #purgeBlobGranules(Function)} on the default executor.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param purgeVersion version to purge at
* @param force if true delete all data, if not keep data >= purgeVersion
*
* @return the key to watch for purge complete
*/
default CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force) {
return purgeBlobGranules(beginKey, endKey, purgeVersion, force, getExecutor());
}
/**
* Queues a purge of blob granules for the specified key range, at the specified version.
*
@ -168,17 +182,126 @@ public interface Database extends AutoCloseable, TransactionContext {
* @param endKey end of the key range
* @param purgeVersion version to purge at
* @param force if true delete all data, if not keep data >= purgeVersion
* @param e the {@link Executor} to use for asynchronous callbacks
* @return the key to watch for purge complete
*/
CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e);
/**
* Wait for a previous call to purgeBlobGranules to complete
* Runs {@link #waitPurgeGranulesComplete(Function)} on the default executor.
*
* @param purgeKey key to watch
*/
default CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey) {
return waitPurgeGranulesComplete(purgeKey, getExecutor());
}
/**
* Wait for a previous call to purgeBlobGranules to complete.
*
* @param purgeKey key to watch
* @param e the {@link Executor} to use for asynchronous callbacks
*/
CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey, Executor e);
/**
* Runs {@link #blobbifyRange(Function)} on the default executor.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @return if the recording of the range was successful
*/
default CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey) {
return blobbifyRange(beginKey, endKey, getExecutor());
}
/**
* Sets a range to be blobbified in the database. Must be a completely unblobbified range.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param e the {@link Executor} to use for asynchronous callbacks
* @return if the recording of the range was successful
*/
CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey, Executor e);
/**
* Runs {@link #unblobbifyRange(Function)} on the default executor.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @return if the recording of the range was successful
*/
default CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey) {
return unblobbifyRange(beginKey, endKey, getExecutor());
}
/**
* Sets a range to be unblobbified in the database.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param e the {@link Executor} to use for asynchronous callbacks
* @return if the recording of the range was successful
*/
CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e);
/**
* Runs {@link #listBlobbifiedRanges(Function)} on the default executor.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param rangeLimit batch size
* @param e the {@link Executor} to use for asynchronous callbacks
* @return a future with the list of blobbified ranges.
*/
default CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit) {
return listBlobbifiedRanges(beginKey, endKey, rangeLimit, getExecutor());
}
/**
* Lists blobbified ranges in the database. There may be more if result.size() == rangeLimit.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param rangeLimit batch size
* @param e the {@link Executor} to use for asynchronous callbacks
* @return a future with the list of blobbified ranges.
*/
CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e);
/**
* Runs {@link #verifyBlobRange(Function)} on the default executor.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param version version to read at
*
* @return a future with the version of the last blob granule.
*/
default CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version) {
return verifyBlobRange(beginKey, endKey, version, getExecutor());
}
/**
* Checks if a blob range is blobbified.
*
* @param beginKey start of the key range
* @param endKey end of the key range
* @param version version to read at
*
* @return a future with the version of the last blob granule.
*/
CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e);
/**
* Runs a read-only transactional function against this {@code Database} with retry logic.
* {@link Function#apply(Object) apply(ReadTransaction)} will be called on the

View File

@ -201,20 +201,60 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume
}
@Override
public CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor executor) {
public CompletableFuture<byte[]> purgeBlobGranules(byte[] beginKey, byte[] endKey, long purgeVersion, boolean force, Executor e) {
pointerReadLock.lock();
try {
return new FutureKey(Database_purgeBlobGranules(getPtr(), beginKey, endKey, purgeVersion, force), executor, eventKeeper);
return new FutureKey(Database_purgeBlobGranules(getPtr(), beginKey, endKey, purgeVersion, force), e, eventKeeper);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey, Executor executor) {
public CompletableFuture<Void> waitPurgeGranulesComplete(byte[] purgeKey, Executor e) {
pointerReadLock.lock();
try {
return new FutureVoid(Database_waitPurgeGranulesComplete(getPtr(), purgeKey), executor);
return new FutureVoid(Database_waitPurgeGranulesComplete(getPtr(), purgeKey), e);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Boolean> blobbifyRange(byte[] beginKey, byte[] endKey, Executor e) {
pointerReadLock.lock();
try {
return new FutureBool(Database_blobbifyRange(getPtr(), beginKey, endKey), e);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Boolean> unblobbifyRange(byte[] beginKey, byte[] endKey, Executor e) {
pointerReadLock.lock();
try {
return new FutureBool(Database_unblobbifyRange(getPtr(), beginKey, endKey), e);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<KeyRangeArrayResult> listBlobbifiedRanges(byte[] beginKey, byte[] endKey, int rangeLimit, Executor e) {
pointerReadLock.lock();
try {
return new FutureKeyRangeArray(Database_listBlobbifiedRanges(getPtr(), beginKey, endKey, rangeLimit), e);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Long> verifyBlobRange(byte[] beginKey, byte[] endKey, long version, Executor e) {
pointerReadLock.lock();
try {
return new FutureInt64(Database_verifyBlobRange(getPtr(), beginKey, endKey, version), e);
} finally {
pointerReadLock.unlock();
}
@ -237,4 +277,8 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume
private native double Database_getMainThreadBusyness(long cPtr);
private native long Database_purgeBlobGranules(long cPtr, byte[] beginKey, byte[] endKey, long purgeVersion, boolean force);
private native long Database_waitPurgeGranulesComplete(long cPtr, byte[] purgeKey);
private native long Database_blobbifyRange(long cPtr, byte[] beginKey, byte[] endKey);
private native long Database_unblobbifyRange(long cPtr, byte[] beginKey, byte[] endKey);
private native long Database_listBlobbifiedRanges(long cPtr, byte[] beginKey, byte[] endKey, int rangeLimit);
private native long Database_verifyBlobRange(long cPtr, byte[] beginKey, byte[] endKey, long version);
}

View File

@ -97,6 +97,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
return FDBTransaction.this.getRangeSplitPoints(range, chunkSize);
}
@Override
public CompletableFuture<KeyRangeArrayResult> getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit) {
return FDBTransaction.this.getBlobGranuleRanges(begin, end, rowLimit);
}
@Override
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper,
int limit, int matchIndex, boolean reverse,
@ -352,6 +357,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
return this.getRangeSplitPoints(range.begin, range.end, chunkSize);
}
@Override
public CompletableFuture<KeyRangeArrayResult> getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit) {
pointerReadLock.lock();
try {
return new FutureKeyRangeArray(Transaction_getBlobGranuleRanges(getPtr(), begin, end, rowLimit), executor);
} finally {
pointerReadLock.unlock();
}
}
@Override
public AsyncIterable<MappedKeyValue> getMappedRange(KeySelector begin, KeySelector end, byte[] mapper, int limit,
int matchIndex, boolean reverse, StreamingMode mode) {
@ -842,4 +857,5 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
private native long Transaction_getKeyLocations(long cPtr, byte[] key);
private native long Transaction_getEstimatedRangeSizeBytes(long cPtr, byte[] keyBegin, byte[] keyEnd);
private native long Transaction_getRangeSplitPoints(long cPtr, byte[] keyBegin, byte[] keyEnd, long chunkSize);
private native long Transaction_getBlobGranuleRanges(long cPtr, byte[] keyBegin, byte[] keyEnd, int rowLimit);
}

View File

@ -0,0 +1,37 @@
/*
* FutureBool.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.concurrent.Executor;
class FutureBool extends NativeFuture<Boolean> {
FutureBool(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
}
@Override
protected Boolean getIfDone_internal(long cPtr) throws FDBException {
return FutureBool_get(cPtr);
}
private native boolean FutureBool_get(long cPtr) throws FDBException;
}

View File

@ -0,0 +1,37 @@
/*
* FutureKeyRangeArray.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.concurrent.Executor;
class FutureKeyRangeArray extends NativeFuture<KeyRangeArrayResult> {
FutureKeyRangeArray(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
}
@Override
protected KeyRangeArrayResult getIfDone_internal(long cPtr) throws FDBException {
return FutureKeyRangeArray_get(cPtr);
}
private native KeyRangeArrayResult FutureKeyRangeArray_get(long cPtr) throws FDBException;
}

View File

@ -0,0 +1,36 @@
/*
* KeyRangeArrayResult.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.Arrays;
import java.util.List;
public class KeyRangeArrayResult {
final List<Range> keyRanges;
public KeyRangeArrayResult(Range[] keyRangeArr) {
this.keyRanges = Arrays.asList(keyRangeArr);
}
public List<Range> getKeyRanges() {
return keyRanges;
}
}

View File

@ -513,6 +513,17 @@ public interface ReadTransaction extends ReadTransactionContext {
*/
CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long chunkSize);
/**
* Gets the blob granule ranges for a given region.
* Returned in batches, requires calling again moving the begin key up.
*
* @param begin beginning of the range (inclusive)
* @param end end of the range (exclusive)
* @return list of blob granules in the given range. May not be all.
*/
CompletableFuture<KeyRangeArrayResult> getBlobGranuleRanges(byte[] begin, byte[] end, int rowLimit);
/**
* Returns a set of options that can be set on a {@code Transaction}

View File

@ -23,6 +23,7 @@
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
@ -31,33 +32,6 @@
namespace {
// copy to standalones for krm
ACTOR Future<Void> setBlobRange(Database db, Key startKey, Key endKey, Value value) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// FIXME: check that the set range is currently inactive, and that a revoked range is currently its own
// range in the map and fully set.
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
// This is not coalescing because we want to keep each range logically separate.
wait(krmSetRange(tr, blobRangeKeys.begin, KeyRange(KeyRangeRef(startKey, endKey)), value));
wait(tr->commit());
printf("Successfully updated blob range [%s - %s) to %s\n",
startKey.printable().c_str(),
endKey.printable().c_str(),
value.printable().c_str());
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Version> getLatestReadVersion(Database db) {
state Transaction tr(db);
loop {
@ -99,65 +73,10 @@ ACTOR Future<Void> doBlobPurge(Database db, Key startKey, Key endKey, Optional<V
return Void();
}
ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional<Version> version) {
state Transaction tr(db);
state Version readVersionOut = invalidVersion;
loop {
try {
wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut)));
return readVersionOut;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> doBlobCheck(Database db, Key startKey, Key endKey, Optional<Version> version) {
state Transaction tr(db);
state Version readVersionOut = invalidVersion;
state double elapsed = -timer_monotonic();
state KeyRange range = KeyRange(KeyRangeRef(startKey, endKey));
state Standalone<VectorRef<KeyRangeRef>> allRanges;
loop {
try {
wait(store(allRanges, tr.getBlobGranuleRanges(range)));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
if (allRanges.empty()) {
fmt::print("ERROR: No blob ranges for [{0} - {1})\n", startKey.printable(), endKey.printable());
return Void();
}
fmt::print("Loaded {0} blob ranges to check\n", allRanges.size());
state std::vector<Future<Version>> checkParts;
// Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit
int maxChunkSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
KeyRange currentChunk;
int currentChunkSize = 0;
for (auto& it : allRanges) {
if (currentChunkSize == maxChunkSize) {
checkParts.push_back(checkBlobSubrange(db, currentChunk, version));
currentChunkSize = 0;
}
if (currentChunkSize == 0) {
currentChunk = it;
} else if (it.begin != currentChunk.end) {
fmt::print("ERROR: Blobrange check failed, gap in blob ranges from [{0} - {1})\n",
currentChunk.end.printable(),
it.begin.printable());
return Void();
} else {
currentChunk = KeyRangeRef(currentChunk.begin, it.end);
}
currentChunkSize++;
}
checkParts.push_back(checkBlobSubrange(db, currentChunk, version));
wait(waitForAll(checkParts));
readVersionOut = checkParts.back().get();
state Version readVersionOut = wait(db->verifyBlobRange(KeyRangeRef(startKey, endKey), version));
elapsed += timer_monotonic();
@ -201,7 +120,7 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
fmt::print("Invalid blob range [{0} - {1})\n", tokens[2].printable(), tokens[3].printable());
} else {
if (tokencmp(tokens[1], "start") || tokencmp(tokens[1], "stop")) {
bool starting = tokencmp(tokens[1], "start");
state bool starting = tokencmp(tokens[1], "start");
if (tokens.size() > 4) {
printUsage(tokens[0]);
return false;
@ -210,7 +129,19 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
starting ? "Starting" : "Stopping",
tokens[2].printable().c_str(),
tokens[3].printable().c_str());
wait(setBlobRange(localDb, begin, end, starting ? LiteralStringRef("1") : StringRef()));
state bool success = false;
if (starting) {
wait(store(success, localDb->blobbifyRange(KeyRangeRef(begin, end))));
} else {
wait(store(success, localDb->unblobbifyRange(KeyRangeRef(begin, end))));
}
if (!success) {
fmt::print("{0} blobbify range for [{1} - {2}) failed\n",
starting ? "Starting" : "Stopping",
tokens[2].printable().c_str(),
tokens[3].printable().c_str());
}
return success;
} else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) {
bool purge = tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge");
bool forcePurge = tokencmp(tokens[1], "forcepurge");

View File

@ -257,13 +257,14 @@ ThreadFuture<Standalone<VectorRef<KeyRef>>> DLTransaction::getRangeSplitPoints(c
});
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLTransaction::getBlobGranuleRanges(const KeyRangeRef& keyRange) {
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLTransaction::getBlobGranuleRanges(const KeyRangeRef& keyRange,
int rangeLimit) {
if (!api->transactionGetBlobGranuleRanges) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->transactionGetBlobGranuleRanges(
tr, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
tr, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), rangeLimit);
return toThreadFuture<Standalone<VectorRef<KeyRangeRef>>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const FdbCApi::FDBKeyRange* keyRanges;
int keyRangesLength;
@ -583,6 +584,71 @@ ThreadFuture<Void> DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey)
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
ThreadFuture<bool> DLDatabase::blobbifyRange(const KeyRangeRef& keyRange) {
if (!api->databaseBlobbifyRange) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseBlobbifyRange(
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
bool ret = false;
ASSERT(!api->futureGetBool(f, &ret));
return ret;
});
}
ThreadFuture<bool> DLDatabase::unblobbifyRange(const KeyRangeRef& keyRange) {
if (!api->databaseUnblobbifyRange) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseUnblobbifyRange(
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size());
return toThreadFuture<bool>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
bool ret = false;
ASSERT(!api->futureGetBool(f, &ret));
return ret;
});
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> DLDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) {
if (!api->databaseListBlobbifiedRanges) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseListBlobbifiedRanges(
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), rangeLimit);
return toThreadFuture<Standalone<VectorRef<KeyRangeRef>>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const FdbCApi::FDBKeyRange* keyRanges;
int keyRangesLength;
FdbCApi::fdb_error_t error = api->futureGetKeyRangeArray(f, &keyRanges, &keyRangesLength);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed.
return Standalone<VectorRef<KeyRangeRef>>(VectorRef<KeyRangeRef>((KeyRangeRef*)keyRanges, keyRangesLength),
Arena());
});
}
ThreadFuture<Version> DLDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
if (!api->databaseVerifyBlobRange) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->databaseVerifyBlobRange(
db, keyRange.begin.begin(), keyRange.begin.size(), keyRange.end.begin(), keyRange.end.size(), version);
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
Version version = invalidVersion;
ASSERT(!api->futureGetInt64(f, &version));
return version;
});
}
// DLApi
// Loads the specified function from a dynamic library
@ -670,6 +736,13 @@ void DLApi::init() {
fdbCPath,
"fdb_database_wait_purge_granules_complete",
headerVersion >= 710);
loadClientFunction(&api->databaseBlobbifyRange, lib, fdbCPath, "fdb_database_blobbify_range", headerVersion >= 720);
loadClientFunction(
&api->databaseUnblobbifyRange, lib, fdbCPath, "fdb_database_unblobbify_range", headerVersion >= 720);
loadClientFunction(
&api->databaseListBlobbifiedRanges, lib, fdbCPath, "fdb_database_list_blobbified_ranges", headerVersion >= 720);
loadClientFunction(
&api->databaseVerifyBlobRange, lib, fdbCPath, "fdb_database_verify_blob_range", headerVersion >= 720);
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
@ -744,6 +817,7 @@ void DLApi::init() {
fdbCPath,
headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version",
headerVersion >= 0);
loadClientFunction(&api->futureGetBool, lib, fdbCPath, "fdb_future_get_bool", headerVersion >= 720);
loadClientFunction(&api->futureGetUInt64, lib, fdbCPath, "fdb_future_get_uint64", headerVersion >= 700);
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error", headerVersion >= 0);
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key", headerVersion >= 0);
@ -1079,9 +1153,10 @@ ThreadFuture<Standalone<VectorRef<KeyRef>>> MultiVersionTransaction::getRangeSpl
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> MultiVersionTransaction::getBlobGranuleRanges(
const KeyRangeRef& keyRange) {
const KeyRangeRef& keyRange,
int rangeLimit) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getBlobGranuleRanges(keyRange)
auto f = tr.transaction ? tr.transaction->getBlobGranuleRanges(keyRange, rangeLimit)
: makeTimeout<Standalone<VectorRef<KeyRangeRef>>>();
return abortableFuture(f, tr.onChange);
}
@ -1589,6 +1664,32 @@ ThreadFuture<Void> MultiVersionDatabase::waitPurgeGranulesComplete(const KeyRef&
return abortableFuture(f, dbState->dbVar->get().onChange);
}
ThreadFuture<bool> MultiVersionDatabase::blobbifyRange(const KeyRangeRef& keyRange) {
auto dbVar = dbState->dbVar->get();
auto f = dbVar.value ? dbVar.value->blobbifyRange(keyRange) : ThreadFuture<bool>(Never());
return abortableFuture(f, dbVar.onChange);
}
ThreadFuture<bool> MultiVersionDatabase::unblobbifyRange(const KeyRangeRef& keyRange) {
auto dbVar = dbState->dbVar->get();
auto f = dbVar.value ? dbVar.value->unblobbifyRange(keyRange) : ThreadFuture<bool>(Never());
return abortableFuture(f, dbVar.onChange);
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> MultiVersionDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) {
auto dbVar = dbState->dbVar->get();
auto f = dbVar.value ? dbVar.value->listBlobbifiedRanges(keyRange, rangeLimit)
: ThreadFuture<Standalone<VectorRef<KeyRangeRef>>>(Never());
return abortableFuture(f, dbVar.onChange);
}
ThreadFuture<Version> MultiVersionDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
auto dbVar = dbState->dbVar->get();
auto f = dbVar.value ? dbVar.value->verifyBlobRange(keyRange, version) : ThreadFuture<Version>(Never());
return abortableFuture(f, dbVar.onChange);
}
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older

View File

@ -7638,7 +7638,9 @@ Future<Standalone<VectorRef<KeyRef>>> Transaction::getRangeSplitPoints(KeyRange
// the blob granule requests are a bit funky because they piggyback off the existing transaction to read from the system
// keyspace
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Transaction* self, KeyRange keyRange) {
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Transaction* self,
KeyRange keyRange,
int rangeLimit) {
// FIXME: use streaming range read
state KeyRange currentRange = keyRange;
state Standalone<VectorRef<KeyRangeRef>> results;
@ -7661,7 +7663,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
// basically krmGetRange, but enable it to not use tenant without RAW_ACCESS by doing manual getRange with
// UseTenant::False
GetRangeLimits limits(1000);
GetRangeLimits limits(2 * rangeLimit + 2);
limits.minRows = 2;
RangeResult rawMapping = wait(getRange(self->trState,
self->getReadVersion(),
@ -7683,6 +7685,9 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
if (blobGranuleMapping[i].value.size()) {
results.push_back(results.arena(),
KeyRangeRef(blobGranuleMapping[i].key, blobGranuleMapping[i + 1].key));
if (results.size() == rangeLimit) {
return results;
}
}
}
results.arena().dependsOn(blobGranuleMapping.arena());
@ -7694,8 +7699,8 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRangesActor(Trans
}
}
Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getBlobGranuleRanges(const KeyRange& range) {
return ::getBlobGranuleRangesActor(this, range);
Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getBlobGranuleRanges(const KeyRange& range, int rangeLimit) {
return ::getBlobGranuleRangesActor(this, range, rangeLimit);
}
// hack (for now) to get blob worker interface into load balance
@ -8007,6 +8012,71 @@ ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAw
return version;
}
ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional<Version> version) {
state Transaction tr(db);
state Version readVersionOut = invalidVersion;
loop {
try {
wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut)));
return readVersionOut;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, Optional<Version> version) {
state Database db(cx);
state Transaction tr(db);
state Standalone<VectorRef<KeyRangeRef>> allRanges;
state KeyRange curRegion = KeyRangeRef(range.begin, range.begin);
state Version readVersionOut = invalidVersion;
state int batchSize = CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
loop {
try {
wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), 20 * batchSize)));
} catch (Error& e) {
wait(tr.onError(e));
}
if (allRanges.empty()) {
if (curRegion.begin < range.end) {
return invalidVersion;
}
return readVersionOut;
}
state std::vector<Future<Version>> checkParts;
// Chunk up to smaller ranges than this limit. Must be smaller than BG_TOO_MANY_GRANULES to not hit the limit
int batchCount = 0;
for (auto& it : allRanges) {
if (it.begin != curRegion.end) {
return invalidVersion;
}
curRegion = KeyRangeRef(curRegion.begin, it.end);
batchCount++;
if (batchCount == batchSize) {
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
batchCount = 0;
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
}
}
if (!curRegion.empty()) {
checkParts.push_back(checkBlobSubrange(db, curRegion, version));
}
wait(waitForAll(checkParts));
readVersionOut = checkParts.back().get();
curRegion = KeyRangeRef(curRegion.end, curRegion.end);
}
}
Future<Version> DatabaseContext::verifyBlobRange(const KeyRange& range, Optional<Version> version) {
return verifyBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, version);
}
ACTOR Future<std::vector<std::pair<UID, StorageWiggleValue>>> readStorageWiggleValues(Database cx,
bool primary,
bool use_system_priority) {
@ -9716,6 +9786,7 @@ Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
}
// BlobGranule API.
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
KeyRange range,
Version purgeVersion,
@ -9807,6 +9878,89 @@ Future<Void> DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) {
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
}
ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange range, bool active) {
state Database db(cx);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
state Value value = active ? blobRangeActive : blobRangeInactive;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
if (active) {
state RangeResult results = wait(krmGetRanges(tr, blobRangeKeys.begin, range));
ASSERT(results.size() >= 2);
if (results[0].key == range.begin && results[1].key == range.end &&
results[0].value == blobRangeActive) {
return true;
} else {
for (int i = 0; i < results.size(); i++) {
if (results[i].value == blobRangeActive) {
return false;
}
}
}
}
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
// This is not coalescing because we want to keep each range logically separate.
wait(krmSetRange(tr, blobRangeKeys.begin, range, value));
wait(tr->commit());
printf("Successfully updated blob range [%s - %s) to %s\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
value.printable().c_str());
return true;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<bool> DatabaseContext::blobbifyRange(KeyRange range) {
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, true);
}
Future<bool> DatabaseContext::unblobbifyRange(KeyRange range) {
return setBlobRangeActor(Reference<DatabaseContext>::addRef(this), range, false);
}
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRangesActor(Reference<DatabaseContext> cx,
KeyRange range,
int rangeLimit) {
state Database db(cx);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
state Standalone<VectorRef<KeyRangeRef>> blobRanges;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state RangeResult results = wait(krmGetRanges(tr, blobRangeKeys.begin, range, 2 * rangeLimit + 2));
blobRanges.arena().dependsOn(results.arena());
for (int i = 0; i < results.size() - 1; i++) {
if (results[i].value == LiteralStringRef("1")) {
blobRanges.push_back(blobRanges.arena(), KeyRangeRef(results[i].value, results[i + 1].value));
}
if (blobRanges.size() == rangeLimit) {
return blobRanges;
}
}
return blobRanges;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<Standalone<VectorRef<KeyRangeRef>>> DatabaseContext::listBlobbifiedRanges(KeyRange range, int rowLimit) {
return listBlobbifiedRangesActor(Reference<DatabaseContext>::addRef(this), range, rowLimit);
}
int64_t getMaxKeySize(KeyRef const& key) {
return getMaxWriteKeySize(key, true);
}

View File

@ -1783,7 +1783,8 @@ Future<Standalone<VectorRef<KeyRef>>> ReadYourWritesTransaction::getRangeSplitPo
return waitOrError(tr.getRangeSplitPoints(range, chunkSize), resetPromise.getFuture());
}
Future<Standalone<VectorRef<KeyRangeRef>>> ReadYourWritesTransaction::getBlobGranuleRanges(const KeyRange& range) {
Future<Standalone<VectorRef<KeyRangeRef>>> ReadYourWritesTransaction::getBlobGranuleRanges(const KeyRange& range,
int rangeLimit) {
if (checkUsedDuringCommit()) {
return used_during_commit();
}
@ -1794,7 +1795,7 @@ Future<Standalone<VectorRef<KeyRangeRef>>> ReadYourWritesTransaction::getBlobGra
if (range.begin > maxKey || range.end > maxKey)
return key_outside_legal_range();
return waitOrError(tr.getBlobGranuleRanges(range), resetPromise.getFuture());
return waitOrError(tr.getBlobGranuleRanges(range, rangeLimit), resetPromise.getFuture());
}
Future<Standalone<VectorRef<BlobGranuleChunkRef>>> ReadYourWritesTransaction::readBlobGranules(

View File

@ -1331,6 +1331,9 @@ int64_t decodeBlobManagerEpochValue(ValueRef const& value) {
}
// blob granule data
const KeyRef blobRangeActive = LiteralStringRef("1");
const KeyRef blobRangeInactive = LiteralStringRef("0");
const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), LiteralStringRef("\xff\x02/bgf0"));
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));

View File

@ -144,6 +144,32 @@ ThreadFuture<Void> ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& p
return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); });
}
ThreadFuture<bool> ThreadSafeDatabase::blobbifyRange(const KeyRangeRef& keyRange) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([=]() -> Future<bool> { return db->blobbifyRange(range); });
}
ThreadFuture<bool> ThreadSafeDatabase::unblobbifyRange(const KeyRangeRef& keyRange) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([=]() -> Future<bool> { return db->blobbifyRange(range); });
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeDatabase::listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread(
[=]() -> Future<Standalone<VectorRef<KeyRangeRef>>> { return db->listBlobbifiedRanges(range, rangeLimit); });
}
ThreadFuture<Version> ThreadSafeDatabase::verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([=]() -> Future<Version> { return db->verifyBlobRange(range, version); });
}
ThreadSafeDatabase::ThreadSafeDatabase(ConnectionRecordType connectionRecordType,
std::string connectionRecordString,
int apiVersion) {
@ -359,13 +385,14 @@ ThreadFuture<Standalone<VectorRef<const char*>>> ThreadSafeTransaction::getAddre
}
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeTransaction::getBlobGranuleRanges(
const KeyRangeRef& keyRange) {
const KeyRangeRef& keyRange,
int rangeLimit) {
ISingleThreadTransaction* tr = this->tr;
KeyRange r = keyRange;
return onMainThread([tr, r]() -> Future<Standalone<VectorRef<KeyRangeRef>>> {
return onMainThread([=]() -> Future<Standalone<VectorRef<KeyRangeRef>>> {
tr->checkDeferredError();
return tr->getBlobGranuleRanges(r);
return tr->getBlobGranuleRanges(r, rangeLimit);
});
}

View File

@ -378,12 +378,18 @@ public:
Future<OverlappingChangeFeedsInfo> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
// BlobGranule API.
Future<Key> purgeBlobGranules(KeyRange keyRange,
Version purgeVersion,
Optional<TenantName> tenant,
bool force = false);
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
Future<bool> blobbifyRange(KeyRange range);
Future<bool> unblobbifyRange(KeyRange range);
Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(KeyRange range, int rangeLimit);
Future<Version> verifyBlobRange(const KeyRange& range, Optional<Version> version);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,

View File

@ -78,7 +78,8 @@ public:
virtual ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) = 0;
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) = 0;
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange,
int rowLimit) = 0;
virtual ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
@ -172,6 +173,13 @@ public:
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
virtual ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) = 0;
virtual ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) = 0;
virtual ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) = 0;
virtual ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) = 0;
// Interface to manage shared state across multiple connections to the same Database
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
virtual void setSharedState(DatabaseSharedState* p) = 0;

View File

@ -55,7 +55,7 @@ public:
Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) override {
throw client_invalid_operation();
}
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(KeyRange const& range) override {
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(KeyRange const& range, int rowLimit) override {
throw client_invalid_operation();
}
Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranules(KeyRange const& range,

View File

@ -80,7 +80,7 @@ public:
virtual Future<Standalone<VectorRef<const char*>>> getAddressesForKey(Key const& key) = 0;
virtual Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(KeyRange const& range, int64_t chunkSize) = 0;
virtual Future<int64_t> getEstimatedRangeSizeBytes(KeyRange const& keys) = 0;
virtual Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(KeyRange const& range) = 0;
virtual Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(KeyRange const& range, int rangeLimit) = 0;
virtual Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranules(KeyRange const& range,
Version begin,
Optional<Version> readVersion,

View File

@ -171,6 +171,32 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
uint8_t const* purge_key_name,
int purge_key_name_length);
FDBFuture* (*databaseBlobbifyRange)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
FDBFuture* (*databaseUnblobbifyRange)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
FDBFuture* (*databaseListBlobbifiedRanges)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int rangeLimit);
FDBFuture* (*databaseVerifyBlobRange)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
Optional<Version> version);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
@ -276,7 +302,8 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length);
int end_key_name_length,
int rangeLimit);
FDBResult* (*transactionReadBlobGranules)(FDBTransaction* db,
uint8_t const* begin_key_name,
@ -376,7 +403,8 @@ public:
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
@ -476,6 +504,12 @@ public:
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
@ -574,7 +608,8 @@ public:
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,
@ -817,6 +852,12 @@ public:
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;

View File

@ -415,7 +415,7 @@ public:
// The returned list would still be in form of [keys.begin, splitPoint1, splitPoint2, ... , keys.end]
Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize);
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRange& range);
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRange& range, int rangeLimit);
Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranules(const KeyRange& range,
Version begin,
Optional<Version> readVersion,

View File

@ -121,7 +121,7 @@ public:
Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) override;
Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys) override;
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRange& range) override;
Future<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRange& range, int rangeLimit) override;
Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranules(const KeyRange& range,
Version begin,
Optional<Version> readVersion,

View File

@ -594,6 +594,8 @@ const Value blobManagerEpochValueFor(int64_t epoch);
int64_t decodeBlobManagerEpochValue(ValueRef const& value);
// blob granule keys
extern const StringRef blobRangeActive;
extern const StringRef blobRangeInactive;
extern const uint8_t BG_FILE_TYPE_DELTA;
extern const uint8_t BG_FILE_TYPE_SNAPSHOT;

View File

@ -62,6 +62,13 @@ public:
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<bool> blobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<bool> unblobbifyRange(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadFuture<Version> verifyBlobRange(const KeyRangeRef& keyRange, Optional<Version> version) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
@ -149,7 +156,8 @@ public:
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange) override;
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> getBlobGranuleRanges(const KeyRangeRef& keyRange,
int rangeLimit) override;
ThreadResult<RangeResult> readBlobGranules(const KeyRangeRef& keyRange,
Version beginVersion,

View File

@ -189,7 +189,7 @@ ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
state int reClearInterval = 1; // do quadratic backoff on clear rate, b/c large keys can keep it not write-cold
loop {
try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range));
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range, 2));
if (ranges.size() == 1) {
return Void();
}

View File

@ -130,7 +130,7 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
}
break;
}
bool active = dbBlobRanges[i].value == LiteralStringRef("1");
bool active = dbBlobRanges[i].value == blobRangeActive;
if (active) {
if (BM_DEBUG) {
fmt::print("BM sees client range [{0} - {1})\n",
@ -1282,7 +1282,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
needToCoalesce = false;
for (int i = 0; i < results.size() - 1; i++) {
bool active = results[i].value == LiteralStringRef("1");
bool active = results[i].value == blobRangeActive;
bmData->knownBlobRanges.insert(KeyRangeRef(results[i].key, results[i + 1].key), active);
}
}
@ -5073,9 +5073,6 @@ TEST_CASE("/blobmanager/updateranges") {
VectorRef<KeyRangeRef> added;
VectorRef<KeyRangeRef> removed;
StringRef active = LiteralStringRef("1");
StringRef inactive = StringRef();
RangeResult dbDataEmpty;
std::vector<std::pair<KeyRangeRef, bool>> kbrRanges;
@ -5086,34 +5083,34 @@ TEST_CASE("/blobmanager/updateranges") {
// db data setup
RangeResult dbDataAB;
dbDataAB.emplace_back(ar, keyA, active);
dbDataAB.emplace_back(ar, keyB, inactive);
dbDataAB.emplace_back(ar, keyA, blobRangeActive);
dbDataAB.emplace_back(ar, keyB, blobRangeInactive);
RangeResult dbDataAC;
dbDataAC.emplace_back(ar, keyA, active);
dbDataAC.emplace_back(ar, keyC, inactive);
dbDataAC.emplace_back(ar, keyA, blobRangeActive);
dbDataAC.emplace_back(ar, keyC, blobRangeInactive);
RangeResult dbDataAD;
dbDataAD.emplace_back(ar, keyA, active);
dbDataAD.emplace_back(ar, keyD, inactive);
dbDataAD.emplace_back(ar, keyA, blobRangeActive);
dbDataAD.emplace_back(ar, keyD, blobRangeInactive);
RangeResult dbDataBC;
dbDataBC.emplace_back(ar, keyB, active);
dbDataBC.emplace_back(ar, keyC, inactive);
dbDataBC.emplace_back(ar, keyB, blobRangeActive);
dbDataBC.emplace_back(ar, keyC, blobRangeInactive);
RangeResult dbDataBD;
dbDataBD.emplace_back(ar, keyB, active);
dbDataBD.emplace_back(ar, keyD, inactive);
dbDataBD.emplace_back(ar, keyB, blobRangeActive);
dbDataBD.emplace_back(ar, keyD, blobRangeInactive);
RangeResult dbDataCD;
dbDataCD.emplace_back(ar, keyC, active);
dbDataCD.emplace_back(ar, keyD, inactive);
dbDataCD.emplace_back(ar, keyC, blobRangeActive);
dbDataCD.emplace_back(ar, keyD, blobRangeInactive);
RangeResult dbDataAB_CD;
dbDataAB_CD.emplace_back(ar, keyA, active);
dbDataAB_CD.emplace_back(ar, keyB, inactive);
dbDataAB_CD.emplace_back(ar, keyC, active);
dbDataAB_CD.emplace_back(ar, keyD, inactive);
dbDataAB_CD.emplace_back(ar, keyA, blobRangeActive);
dbDataAB_CD.emplace_back(ar, keyB, blobRangeInactive);
dbDataAB_CD.emplace_back(ar, keyC, blobRangeActive);
dbDataAB_CD.emplace_back(ar, keyD, blobRangeInactive);
// key ranges setup
KeyRangeRef rangeAB = KeyRangeRef(keyA, keyB);

View File

@ -230,27 +230,6 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
}
ACTOR Future<Void> setUpBlobRange(Database cx, KeyRange keyRange) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(krmSetRange(tr, blobRangeKeys.begin, keyRange, LiteralStringRef("1")));
wait(tr->commit());
if (BGW_DEBUG) {
fmt::print("Successfully set up blob granule range for tenant range [{0} - {1})\n",
keyRange.begin.printable(),
keyRange.end.printable());
}
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<TenantMapEntry> setUpTenant(Database cx, TenantName name) {
if (BGW_DEBUG) {
fmt::print("Setting up blob granule range for tenant {0}\n", name.printable());
@ -291,7 +270,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
self->directories[directoryIdx]->directoryRange =
KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end));
tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry });
wait(self->setUpBlobRange(cx, self->directories[directoryIdx]->directoryRange));
bool _success = wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange));
ASSERT(_success);
}
tenantData.addTenants(tenants);
@ -911,8 +891,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
loop {
state Transaction tr(cx, threadData->tenantName);
try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(normalKeys));
ASSERT(ranges.size() >= 1);
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(normalKeys, 1000000));
ASSERT(ranges.size() >= 1 && ranges.size() < 1000000);
ASSERT(ranges.front().begin == normalKeys.begin);
ASSERT(ranges.back().end == normalKeys.end);
for (int i = 0; i < ranges.size() - 1; i++) {

View File

@ -175,7 +175,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<KeyRangeRef>> allGranules = wait(tr.getBlobGranuleRanges(normalKeys));
Standalone<VectorRef<KeyRangeRef>> allGranules = wait(tr.getBlobGranuleRanges(normalKeys, 1000000));
self->granuleRanges.set(allGranules);
break;
} catch (Error& e) {