Added client side support for range split

This commit is contained in:
Xin Dong 2020-06-18 09:41:50 -07:00
parent ea08bc5462
commit 2126f46195
7 changed files with 68 additions and 2 deletions

View File

@ -49,6 +49,8 @@ public:
virtual void addReadConflictRange(const KeyRangeRef& keys) = 0;
virtual ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) = 0;
virtual ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) = 0;
virtual void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) = 0;
virtual void set(const KeyRef& key, const ValueRef& value) = 0;

View File

@ -159,6 +159,23 @@ ThreadFuture<int64_t> DLTransaction::getEstimatedRangeSizeBytes(const KeyRangeRe
});
}
ThreadFuture<Standalone<VectorRef<KeyRef>>> DLTransaction::getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) {
if (!api->transactionGetRangeSplitPoints) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->transactionGetRangeSplitPoints(tr, range.begin.begin(), range.begin.size(),
range.end.begin(), range.end.size(), chunkSize);
return toThreadFuture<Standalone<VectorRef<KeyRef>>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const FdbCApi::FDBKey* splitKeys;
int keysArrayLength;
FdbCApi::fdb_error_t error = api->futureGetKeyArray(f, &splitKeys, &keysArrayLength);
ASSERT(!error);
return Standalone<VectorRef<KeyRef>>(VectorRef<KeyRef>((KeyRef*)splitKeys, keysArrayLength), Arena());
});
}
void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ));
}
@ -322,12 +339,15 @@ void DLApi::init() {
loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel");
loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
loadClientFunction(&api->transactionGetEstimatedRangeSizeBytes, lib, fdbCPath, "fdb_transaction_get_estimated_range_size_bytes", headerVersion >= 630);
loadClientFunction(&api->transactionGetRangeSplitPoints, lib, fdbCPath, "fdb_transaction_get_range_split_points",
headerVersion >= 630);
loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
loadClientFunction(&api->futureGetStringArray, lib, fdbCPath, "fdb_future_get_string_array");
loadClientFunction(&api->futureGetKeyArray, lib, fdbCPath, "fdb_future_get_key_array");
loadClientFunction(&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array");
loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback");
loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel");
@ -568,6 +588,14 @@ ThreadFuture<int64_t> MultiVersionTransaction::getEstimatedRangeSizeBytes(const
return abortableFuture(f, tr.onChange);
}
ThreadFuture<Standalone<VectorRef<KeyRef>>> MultiVersionTransaction::getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getRangeSplitPoints(range, chunkSize)
: ThreadFuture<Standalone<VectorRef<KeyRef>>>(Never());
return abortableFuture(f, tr.onChange);
}
void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
auto tr = getTransaction();
if(tr.transaction) {

View File

@ -35,6 +35,10 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct transaction FDBTransaction;
#pragma pack(push, 4)
typedef struct key {
const uint8_t* key;
int keyLength;
} FDBKey;
typedef struct keyvalue {
const void *key;
int keyLength;
@ -84,7 +88,11 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length);
FDBFuture* (*transactionGetRangeSplitPoints)(FDBTransaction* tr, uint8_t const* begin_key_name,
int begin_key_name_length, uint8_t const* end_key_name,
int end_key_name_length, int64_t chunkSize);
FDBFuture* (*transactionCommit)(FDBTransaction *tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr);
@ -103,6 +111,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength);
fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength);
fdb_error_t (*futureGetStringArray)(FDBFuture *f, const char ***outStrings, int *outCount);
fdb_error_t (*futureGetKeyArray)(FDBFuture* f, FDBKey const** outKeys, int* outCount);
fdb_error_t (*futureGetKeyValueArray)(FDBFuture *f, FDBKeyValue const ** outKV, int *outCount, fdb_bool_t *outMore);
fdb_error_t (*futureSetCallback)(FDBFuture *f, FDBCallback callback, void *callback_parameter);
void (*futureCancel)(FDBFuture *f);
@ -133,7 +142,9 @@ public:
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
@ -237,6 +248,8 @@ public:
void addReadConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;

View File

@ -1397,6 +1397,16 @@ Future<int64_t> ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyR
return map(waitOrError(tr.getStorageMetrics(keys, -1), resetPromise.getFuture()), [](const StorageMetrics& m) { return m.bytes; });
}
Future<Standalone<VectorRef<KeyRef>>> ReadYourWritesTransaction::getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) {
if (checkUsedDuringCommit()) {
throw used_during_commit();
}
if (resetPromise.isSet()) return resetPromise.getFuture().getError();
return waitOrError(tr.getRangeSplitPoints(range, chunkSize), resetPromise.getFuture());
}
void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) {
if(checkUsedDuringCommit()) {
throw used_during_commit();

View File

@ -86,6 +86,7 @@ public:
[[nodiscard]] Future<Standalone<VectorRef<const char*>>> getAddressesForKey(const Key& key);
Future<int64_t> getEstimatedRangeSizeBytes( const KeyRangeRef& keys );
Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize);
void addReadConflictRange( KeyRangeRef const& keys );
void makeSelfConflicting() { tr.makeSelfConflicting(); }

View File

@ -164,6 +164,16 @@ ThreadFuture<int64_t> ThreadSafeTransaction::getEstimatedRangeSizeBytes( const K
} );
}
ThreadFuture<Standalone<VectorRef<KeyRef>>> ThreadSafeTransaction::getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) {
KeyRange r = range;
ReadYourWritesTransaction* tr = this->tr;
return onMainThread([tr, r, chunkSize]() -> Future<Standalone<VectorRef<KeyRef>>> {
tr->checkDeferredError();
return tr->getRangeSplitPoints(r, chunkSize);
});
}
ThreadFuture< Standalone<RangeResultRef> > ThreadSafeTransaction::getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse ) {
KeySelector b = begin;

View File

@ -72,6 +72,8 @@ public:
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
ThreadFuture<int64_t> getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override;
ThreadFuture<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRangeRef& range,
int64_t chunkSize) override;
void addReadConflictRange( const KeyRangeRef& keys ) override;
void makeSelfConflicting();