Added bindings supports
This commit is contained in:
parent
2126f46195
commit
440630a0cb
|
@ -281,6 +281,17 @@ fdb_error_t fdb_future_get_string_array(
|
|||
);
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT
|
||||
fdb_error_t fdb_future_get_key_array(
|
||||
FDBFuture* f, FDBKey const** out_key_array, int* out_count)
|
||||
{
|
||||
CATCH_AND_RETURN(
|
||||
Standalone<VectorRef<KeyRef>> na = TSAV(Standalone<VectorRef<KeyRef>>, f)->get();
|
||||
*out_key_array = (FDBKey*) na.begin();
|
||||
*out_count = na.size();
|
||||
);
|
||||
}
|
||||
|
||||
FDBFuture* fdb_create_cluster_v609( const char* cluster_file_path ) {
|
||||
char *path;
|
||||
if(cluster_file_path) {
|
||||
|
@ -634,6 +645,13 @@ FDBFuture* fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, u
|
|||
return (FDBFuture*)(TXN(tr)->getEstimatedRangeSizeBytes(range).extractPtr());
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT
|
||||
FDBFuture* fdb_transaction_get_range_split_points( FDBTransaction* tr, uint8_t const* begin_key_name,
|
||||
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length, int64_t chunkSize) {
|
||||
KeyRangeRef range(KeyRef(begin_key_name, begin_key_name_length), KeyRef(end_key_name, end_key_name_length));
|
||||
return (FDBFuture*)(TXN(tr)->getRangeSplitPoints(range, chunkSize).extractPtr());
|
||||
}
|
||||
|
||||
#include "fdb_c_function_pointers.g.h"
|
||||
|
||||
#define FDB_API_CHANGED(func, ver) if (header_version < ver) fdb_api_ptr_##func = (void*)&(func##_v##ver##_PREV); else if (fdb_api_ptr_##func == (void*)&fdb_api_ptr_unimpl) fdb_api_ptr_##func = (void*)&(func##_impl);
|
||||
|
|
|
@ -91,6 +91,10 @@ extern "C" {
|
|||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter);
|
||||
|
||||
#pragma pack(push, 4)
|
||||
typedef struct key {
|
||||
const uint8_t* key;
|
||||
int key_length;
|
||||
} FDBKey;
|
||||
#if FDB_API_VERSION >= 700
|
||||
typedef struct keyvalue {
|
||||
const uint8_t* key;
|
||||
|
@ -143,9 +147,12 @@ extern "C" {
|
|||
|
||||
#if FDB_API_VERSION >= 14
|
||||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
|
||||
fdb_future_get_keyvalue_array( FDBFuture* f, FDBKeyValue const** out_kv,
|
||||
fdb_future_get_keyvalue_array( FDBFuture* f, FDBKeyValue const** out_key_array,
|
||||
int* out_count, fdb_bool_t* out_more );
|
||||
#endif
|
||||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
|
||||
fdb_future_get_key_array( FDBFuture* f, FDBKey const** out_k,
|
||||
int* out_count);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_string_array(FDBFuture* f,
|
||||
const char*** out_strings, int* out_count);
|
||||
|
@ -259,6 +266,10 @@ extern "C" {
|
|||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
||||
fdb_transaction_get_estimated_range_size_bytes( FDBTransaction* tr, uint8_t const* begin_key_name,
|
||||
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
||||
fdb_transaction_get_range_split_points( FDBTransaction* tr, uint8_t const* begin_key_name,
|
||||
int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length, int64_t chunkSize);
|
||||
|
||||
#define FDB_KEYSEL_LAST_LESS_THAN(k, l) k, l, 0, 0
|
||||
#define FDB_KEYSEL_LAST_LESS_OR_EQUAL(k, l) k, l, 1, 0
|
||||
|
|
|
@ -134,6 +134,7 @@ namespace FDB {
|
|||
FDBStreamingMode streamingMode = FDB_STREAMING_MODE_SERIAL) override;
|
||||
|
||||
Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys) override;
|
||||
Future<FDBStandalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) override;
|
||||
|
||||
void addReadConflictRange(KeyRangeRef const& keys) override;
|
||||
void addReadConflictKey(KeyRef const& key) override;
|
||||
|
@ -356,6 +357,16 @@ namespace FDB {
|
|||
});
|
||||
}
|
||||
|
||||
Future<FDBStandalone<VectorRef<KeyRef>>> TransactionImpl::getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) {
|
||||
return backToFuture<FDBStandalone<VectorRef<KeyRef>>>(fdb_transaction_get_range_split_points(tr, range.begin.begin(), range.begin.size(), range.end.begin(), range.end.size(), chunkSize), [](Reference<CFuture> f) {
|
||||
FDBKey const* ks;
|
||||
int count;
|
||||
throw_on_error(fdb_future_get_key_array(f->f, &ks, &count));
|
||||
|
||||
return FDBStandalone<VectorRef<KeyRef>>(f, VectorRef<KeyRef>((KeyRef*)ks, count));
|
||||
});
|
||||
}
|
||||
|
||||
void TransactionImpl::addReadConflictRange(KeyRangeRef const& keys) {
|
||||
throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ ) );
|
||||
}
|
||||
|
|
|
@ -90,6 +90,7 @@ namespace FDB {
|
|||
}
|
||||
|
||||
virtual Future<int64_t> getEstimatedRangeSizeBytes(const KeyRange& keys) = 0;
|
||||
virtual Future<FDBStandalone<VectorRef<KeyRef>>> getRangeSplitPoints(const KeyRange& range, int64_t chunkSize) = 0;
|
||||
|
||||
virtual void addReadConflictRange(KeyRangeRef const& keys) = 0;
|
||||
virtual void addReadConflictKey(KeyRef const& key) = 0;
|
||||
|
|
|
@ -3,4 +3,4 @@ module github.com/apple/foundationdb/bindings/go
|
|||
// The FoundationDB go bindings currently have no external golang dependencies outside of
|
||||
// the go standard library.
|
||||
|
||||
require golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
|
||||
require golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
|
||||
|
|
|
@ -306,6 +306,57 @@ func (f *futureKeyValueArray) Get() ([]KeyValue, bool, error) {
|
|||
return ret, (more != 0), nil
|
||||
}
|
||||
|
||||
// FutureKeyArray represents the asynchronous result of a function
|
||||
// that returns an array of keys. FutureKeyArray is a lightweight object
|
||||
// that may be efficiently copied, and is safe for concurrent use by multiple goroutines.
|
||||
type FutureKeyArray interface {
|
||||
|
||||
// Get returns an array of keys or an error if the asynchronous operation
|
||||
// associated with this future did not successfully complete. The current
|
||||
// goroutine will be blocked until the future is ready.
|
||||
Get() ([]Key, error)
|
||||
|
||||
// MustGet returns an array of keys, or panics if the asynchronous operations
|
||||
// associated with this future did not successfully complete. The current goroutine
|
||||
// will be blocked until the future is ready.
|
||||
MustGet() []Key
|
||||
}
|
||||
|
||||
type futureKeyArray struct {
|
||||
*future
|
||||
}
|
||||
|
||||
func (f *futureKeyArray) Get() ([]Key, error) {
|
||||
defer runtime.KeepAlive(f.future)
|
||||
|
||||
f.BlockUntilReady()
|
||||
|
||||
var ks *C.FDBKey
|
||||
var count C.int
|
||||
|
||||
if err:= C.fdb_future_get_key_array(f.ptr, &ks, &count); err != 0 {
|
||||
return nil, Error{int(err)}
|
||||
}
|
||||
|
||||
ret := make([]Key, int(count))
|
||||
|
||||
for i:= 0; i<int(count); i++ {
|
||||
kptr := unsafe.Pointer(uintptr(unsafe.Pointer(ks)) + uintptr(i*24))
|
||||
|
||||
ret[i] = stringRefToSlice(kptr)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (f *futureKeyArray) MustGet() ([]Key) {
|
||||
val, err := f.Get()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return val
|
||||
}
|
||||
|
||||
// FutureInt64 represents the asynchronous result of a function that returns a
|
||||
// database version. FutureInt64 is a lightweight object that may be efficiently
|
||||
// copied, and is safe for concurrent use by multiple goroutines.
|
||||
|
|
|
@ -40,6 +40,7 @@ type ReadTransaction interface {
|
|||
GetDatabase() Database
|
||||
Snapshot() Snapshot
|
||||
GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64
|
||||
GetRangeSplitPoints(r ExactRange, chunkSize int64) FutureKeyArray
|
||||
|
||||
ReadTransactor
|
||||
}
|
||||
|
@ -328,6 +329,30 @@ func (t Transaction) GetEstimatedRangeSizeBytes(r ExactRange) FutureInt64 {
|
|||
)
|
||||
}
|
||||
|
||||
func (t *transaction) getRangeSplitPoints(beginKey Key, endKey Key, chunkSize int64) FutureKeyArray {
|
||||
return &futureKeyArray{
|
||||
future: newFuture(C.fdb_transaction_get_range_split_points(
|
||||
t.ptr,
|
||||
byteSliceToPtr(beginKey),
|
||||
C.int(len(beginKey)),
|
||||
byteSliceToPtr(endKey),
|
||||
C.int(len(endKey)),
|
||||
chunkSize,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
// GetRangeSplitPoints will return a list of keys that can devide the given range into
|
||||
// chunks based on the chunk size provided.
|
||||
func (t Transaction) GetRangeSplitPoints(r ExactRange, chunkSize int64) FutureKeyArray {
|
||||
beginKey, endKey := r.FDBRangeKeys()
|
||||
return t.getRangeSplitPoints(
|
||||
beginKey.FDBKey(),
|
||||
endKey.FDBKey(),
|
||||
chunkSize,
|
||||
)
|
||||
}
|
||||
|
||||
func (t *transaction) getReadVersion() FutureInt64 {
|
||||
return &futureInt64{
|
||||
future: newFuture(C.fdb_transaction_get_read_version(t.ptr)),
|
||||
|
|
|
@ -27,6 +27,7 @@ set(JAVA_BINDING_SRCS
|
|||
src/main/com/apple/foundationdb/FDBTransaction.java
|
||||
src/main/com/apple/foundationdb/FutureInt64.java
|
||||
src/main/com/apple/foundationdb/FutureKey.java
|
||||
src/main/com/apple/foundationdb/FutureKeyArray.java
|
||||
src/main/com/apple/foundationdb/FutureResult.java
|
||||
src/main/com/apple/foundationdb/FutureResults.java
|
||||
src/main/com/apple/foundationdb/FutureStrings.java
|
||||
|
@ -42,6 +43,7 @@ set(JAVA_BINDING_SRCS
|
|||
src/main/com/apple/foundationdb/package-info.java
|
||||
src/main/com/apple/foundationdb/Range.java
|
||||
src/main/com/apple/foundationdb/RangeQuery.java
|
||||
src/main/com/apple/foundationdb/KeyArrayResult.java
|
||||
src/main/com/apple/foundationdb/RangeResult.java
|
||||
src/main/com/apple/foundationdb/RangeResultInfo.java
|
||||
src/main/com/apple/foundationdb/RangeResultSummary.java
|
||||
|
|
|
@ -39,6 +39,8 @@ static thread_local bool is_external = false;
|
|||
static jclass range_result_summary_class;
|
||||
static jclass range_result_class;
|
||||
static jclass string_class;
|
||||
static jclass key_array_result_class;
|
||||
static jmethodID key_array_result_init;
|
||||
static jmethodID range_result_init;
|
||||
static jmethodID range_result_summary_init;
|
||||
|
||||
|
@ -305,6 +307,75 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureString
|
|||
return arr;
|
||||
}
|
||||
|
||||
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureKeyArray_FutureKeyArray_1get(JNIEnv *jenv, jobject, jlong future) {
|
||||
if( !future ) {
|
||||
throwParamNotNull(jenv);
|
||||
return JNI_NULL;
|
||||
}
|
||||
|
||||
FDBFuture *f = (FDBFuture *)future;
|
||||
|
||||
const FDBKey *ks;
|
||||
int count;
|
||||
fdb_error_t err = fdb_future_get_key_array( f, &ks, &count );
|
||||
if( err ) {
|
||||
safeThrow( jenv, getThrowable( jenv, err ) );
|
||||
return JNI_NULL;
|
||||
}
|
||||
|
||||
int totalKeySize = 0;
|
||||
for(int i = 0; i < count; i++) {
|
||||
totalKeySize += ks[i].key_length;
|
||||
}
|
||||
|
||||
jbyteArray keyArray = jenv->NewByteArray(totalKeySize);
|
||||
if( !keyArray ) {
|
||||
if( !jenv->ExceptionOccurred() )
|
||||
throwOutOfMem(jenv);
|
||||
return JNI_NULL;
|
||||
}
|
||||
uint8_t *keys_barr = (uint8_t *)jenv->GetByteArrayElements(keyArray, JNI_NULL);
|
||||
if (!keys_barr) {
|
||||
throwRuntimeEx( jenv, "Error getting handle to native resources" );
|
||||
return JNI_NULL;
|
||||
}
|
||||
|
||||
jintArray lengthArray = jenv->NewIntArray(count);
|
||||
if( !lengthArray ) {
|
||||
if( !jenv->ExceptionOccurred() )
|
||||
throwOutOfMem(jenv);
|
||||
|
||||
jenv->ReleaseByteArrayElements(keyArray, (jbyte *)keys_barr, 0);
|
||||
return JNI_NULL;
|
||||
}
|
||||
|
||||
jint *length_barr = jenv->GetIntArrayElements(lengthArray, JNI_NULL);
|
||||
if( !length_barr ) {
|
||||
if( !jenv->ExceptionOccurred() )
|
||||
throwOutOfMem(jenv);
|
||||
|
||||
jenv->ReleaseByteArrayElements(keyArray, (jbyte *)keys_barr, 0);
|
||||
return JNI_NULL;
|
||||
}
|
||||
|
||||
int offset = 0;
|
||||
for(int i = 0; i < count; i++) {
|
||||
memcpy(keys_barr + offset, ks[i].key, ks[i].key_length);
|
||||
length_barr[i] = ks[i].key_length;
|
||||
offset += ks[i].key_length;
|
||||
}
|
||||
|
||||
jenv->ReleaseByteArrayElements(keyArray, (jbyte *)keys_barr, 0);
|
||||
jenv->ReleaseIntArrayElements(lengthArray, length_barr, 0);
|
||||
|
||||
jobject result = jenv->NewObject(key_array_result_class, key_array_result_init, keyArray, lengthArray);
|
||||
if( jenv->ExceptionOccurred() )
|
||||
return JNI_NULL;
|
||||
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getSummary(JNIEnv *jenv, jobject, jlong future) {
|
||||
if( !future ) {
|
||||
throwParamNotNull(jenv);
|
||||
|
@ -669,6 +740,35 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeSplitPoints(JNIEnv *jenv, jobject, jlong tPtr,
|
||||
jbyteArray beginKeyBytes, jbyteArray endKeyBytes, jlong chunkSize) {
|
||||
if( !tPtr || !beginKeyBytes || !endKeyBytes) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTransaction *tr = (FDBTransaction *)tPtr;
|
||||
|
||||
uint8_t *startKey = (uint8_t *)jenv->GetByteArrayElements( beginKeyBytes, JNI_NULL );
|
||||
if(!startKey) {
|
||||
if( !jenv->ExceptionOccurred() )
|
||||
throwRuntimeEx( jenv, "Error getting handle to native resources" );
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t *endKey = (uint8_t *)jenv->GetByteArrayElements(endKeyBytes, JNI_NULL);
|
||||
if (!endKey) {
|
||||
jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT );
|
||||
if( !jenv->ExceptionOccurred() )
|
||||
throwRuntimeEx( jenv, "Error getting handle to native resources" );
|
||||
return 0;
|
||||
}
|
||||
|
||||
FDBFuture *f = fdb_transaction_get_range_split_points( tr, startKey, jenv->GetArrayLength( beginKeyBytes ), endKey, jenv->GetArrayLength( endKeyBytes ), chunkSize );
|
||||
jenv->ReleaseByteArrayElements( beginKeyBytes, (jbyte *)startKey, JNI_ABORT );
|
||||
jenv->ReleaseByteArrayElements( endKeyBytes, (jbyte *)endKey, JNI_ABORT );
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1set(JNIEnv *jenv, jobject, jlong tPtr, jbyteArray keyBytes, jbyteArray valueBytes) {
|
||||
if( !tPtr || !keyBytes || !valueBytes ) {
|
||||
throwParamNotNull(jenv);
|
||||
|
@ -1045,6 +1145,10 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
|
|||
range_result_init = env->GetMethodID(local_range_result_class, "<init>", "([B[IZ)V");
|
||||
range_result_class = (jclass) (env)->NewGlobalRef(local_range_result_class);
|
||||
|
||||
jclass local_key_array_result_class = env->FindClass("com/apple/foundationdb/KeyArrayResult");
|
||||
key_array_result_init = env->GetMethodID(local_key_array_result_class, "<init>", "([B[I)V");
|
||||
key_array_result_class = (jclass) (env)->NewGlobalRef(local_key_array_result_class);
|
||||
|
||||
jclass local_range_result_summary_class = env->FindClass("com/apple/foundationdb/RangeResultSummary");
|
||||
range_result_summary_init = env->GetMethodID(local_range_result_summary_class, "<init>", "([BIZ)V");
|
||||
range_result_summary_class = (jclass) (env)->NewGlobalRef(local_range_result_summary_class);
|
||||
|
|
|
@ -80,6 +80,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
return FDBTransaction.this.getEstimatedRangeSizeBytes(range);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeyArrayResult> getRangeSplitPoints(byte[] begin, byte[] end, long chunkSize) {
|
||||
return FDBTransaction.this.getRangeSplitPoints(begin, end, chunkSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long chunkSize) {
|
||||
return FDBTransaction.this.getRangeSplitPoints(range, chunkSize);
|
||||
}
|
||||
|
||||
///////////////////
|
||||
// getRange -> KeySelectors
|
||||
///////////////////
|
||||
|
@ -282,6 +292,21 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
return this.getEstimatedRangeSizeBytes(range.begin, range.end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeyArrayResult> getRangeSplitPoints(byte[] begin, byte[] end, long chunkSize) {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureKeyArray(Transaction_getRangeSplitPoints(getPtr(), begin, end, chunkSize), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long chunkSize) {
|
||||
return this.getRangeSplitPoints(range.begin, range.end, chunkSize);
|
||||
}
|
||||
|
||||
///////////////////
|
||||
// getRange -> KeySelectors
|
||||
///////////////////
|
||||
|
@ -685,4 +710,5 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
private native void Transaction_cancel(long cPtr);
|
||||
private native long Transaction_getKeyLocations(long cPtr, byte[] key);
|
||||
private native long Transaction_getEstimatedRangeSizeBytes(long cPtr, byte[] keyBegin, byte[] keyEnd);
|
||||
private native long Transaction_getRangeSplitPoints(long cPtr, byte[] keyBegin, byte[] keyEnd, long chunkSize);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* FutureKeyArray.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
class FutureKeyArray extends NativeFuture<KeyArrayResult> {
|
||||
FutureKeyArray(long cPtr, Executor executor) {
|
||||
super(cPtr);
|
||||
registerMarshalCallback(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KeyArrayResult getIfDone_internal(long cPtr) throws FDBException {
|
||||
return FutureKeyArray_get(cPtr);
|
||||
}
|
||||
|
||||
private native KeyArrayResult FutureKeyArray_get(long cPtr) throws FDBException;
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* KeyArrayResult.java
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.apple.foundationdb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
class KeyArrayResult {
|
||||
final List<byte[]> keys;
|
||||
|
||||
KeyArrayResult(byte[] keyBytes, int[] keyLengths) {
|
||||
int count = keyLengths.length;
|
||||
keys = new ArrayList<byte[]>(count);
|
||||
|
||||
int offset = 0;
|
||||
for(int i = 0; i < count; i++) {
|
||||
int keyLength = keyLengths[i];
|
||||
|
||||
byte[] key = new byte[keyLength];
|
||||
System.arraycopy(keyBytes, offset, key, 0, keyLength);
|
||||
|
||||
offset += keyLength;
|
||||
keys.add(key);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -444,6 +444,26 @@ public interface ReadTransaction extends ReadTransactionContext {
|
|||
*/
|
||||
CompletableFuture<Long> getEstimatedRangeSizeBytes(Range range);
|
||||
|
||||
/**
|
||||
* Gets a list of keys that can split the given range into similar sized chunks based on chunkSize
|
||||
*
|
||||
* @param begin the beginning of the range (inclusive)
|
||||
* @param end the end of the range (exclusive)
|
||||
*
|
||||
* @return a handle to access the results of the asynchronous call
|
||||
*/
|
||||
CompletableFuture<KeyArrayResult> getRangeSplitPoints(byte[] begin, byte[] end, long chunkSize);
|
||||
|
||||
/**
|
||||
* Gets a list of keys that can split the given range into similar sized chunks based on chunkSize
|
||||
*
|
||||
* @param range the range of the keys
|
||||
*
|
||||
* @return a handle to access the results of the asynchronous call
|
||||
*/
|
||||
CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long chunkSize);
|
||||
|
||||
|
||||
/**
|
||||
* Returns a set of options that can be set on a {@code Transaction}
|
||||
*
|
||||
|
|
|
@ -471,7 +471,16 @@ class TransactionRead(_FDBBase):
|
|||
begin_key, len(begin_key),
|
||||
end_key, len(end_key)
|
||||
))
|
||||
|
||||
|
||||
def get_range_split_points(self, begin_key, end_key, chunkSize):
|
||||
if begin_key is None or end_key is None:
|
||||
raise Exception('Invalid begin key or end key')
|
||||
return FutureKeyArray(self.capi.fdb_transaction_get_range_split_points(
|
||||
self.tpointer,
|
||||
begin_key, len(begin_key),
|
||||
end_key, len(end_key),
|
||||
chunkSize
|
||||
))
|
||||
|
||||
class Transaction(TransactionRead):
|
||||
"""A modifiable snapshot of a Database.
|
||||
|
@ -736,6 +745,14 @@ class FutureKeyValueArray(Future):
|
|||
# the KVs on the python side and in most cases we are about to
|
||||
# destroy the future anyway
|
||||
|
||||
class FutureKeyArray(Future):
|
||||
def wait(self):
|
||||
self.block_until_ready()
|
||||
ks = ctypes.pointer(KeyStruct())
|
||||
count = ctypes.c_int()
|
||||
self.capi.fdb_future_get_key_array(self.fpointer, ctypes.byref(ks), ctypes.byref(count))
|
||||
return ([ctypes.string_at(x.key, x.key_length) for x in ks[0:count.value]], count.value)
|
||||
|
||||
|
||||
class FutureStringArray(Future):
|
||||
def wait(self):
|
||||
|
@ -1217,6 +1234,11 @@ class KeyValueStruct(ctypes.Structure):
|
|||
('value_length', ctypes.c_int)]
|
||||
_pack_ = 4
|
||||
|
||||
class KeyStruct(ctypes.Structure):
|
||||
_fields_ = [('key', ctypes.POINTER(ctypes.c_byte)),
|
||||
('key_length', ctypes.c_int)]
|
||||
_pack_ = 4
|
||||
|
||||
|
||||
class KeyValue(object):
|
||||
def __init__(self, key, value):
|
||||
|
@ -1406,6 +1428,11 @@ def init_c_api():
|
|||
_capi.fdb_future_get_keyvalue_array.restype = int
|
||||
_capi.fdb_future_get_keyvalue_array.errcheck = check_error_code
|
||||
|
||||
_capi.fdb_future_get_key_array.argtypes = [ctypes.c_void_p, ctypes.POINTER(
|
||||
ctypes.POINTER(KeyStruct)), ctypes.POINTER(ctypes.c_int)]
|
||||
_capi.fdb_future_get_key_array.restype = int
|
||||
_capi.fdb_future_get_key_array.errcheck = check_error_code
|
||||
|
||||
_capi.fdb_future_get_string_array.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p)), ctypes.POINTER(ctypes.c_int)]
|
||||
_capi.fdb_future_get_string_array.restype = int
|
||||
_capi.fdb_future_get_string_array.errcheck = check_error_code
|
||||
|
@ -1451,6 +1478,9 @@ def init_c_api():
|
|||
_capi.fdb_transaction_get_estimated_range_size_bytes.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int]
|
||||
_capi.fdb_transaction_get_estimated_range_size_bytes.restype = ctypes.c_void_p
|
||||
|
||||
_capi.fdb_transaction_get_range_split_points.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_int]
|
||||
_capi.fdb_transaction_get_range_split_points.restype = ctypes.c_void_p
|
||||
|
||||
_capi.fdb_transaction_add_conflict_range.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_int]
|
||||
_capi.fdb_transaction_add_conflict_range.restype = ctypes.c_int
|
||||
_capi.fdb_transaction_add_conflict_range.errcheck = check_error_code
|
||||
|
|
|
@ -109,6 +109,7 @@ module FDB
|
|||
attach_function :fdb_transaction_get_key, [ :pointer, :pointer, :int, :int, :int, :int ], :pointer
|
||||
attach_function :fdb_transaction_get_range, [ :pointer, :pointer, :int, :int, :int, :pointer, :int, :int, :int, :int, :int, :int, :int, :int, :int ], :pointer
|
||||
attach_function :fdb_transaction_get_estimated_range_size_bytes, [ :pointer, :pointer, :int, :pointer, :int ], :pointer
|
||||
attach_function :fdb_transaction_get_range_split_points, [ :pointer, :pointer, :int, :pointer, :int, :int64 ], :pointer
|
||||
attach_function :fdb_transaction_set, [ :pointer, :pointer, :int, :pointer, :int ], :void
|
||||
attach_function :fdb_transaction_clear, [ :pointer, :pointer, :int ], :void
|
||||
attach_function :fdb_transaction_clear_range, [ :pointer, :pointer, :int, :pointer, :int ], :void
|
||||
|
@ -129,6 +130,12 @@ module FDB
|
|||
:value_length, :int
|
||||
end
|
||||
|
||||
class KeyStruct < FFI::Struct
|
||||
pack 4
|
||||
layout :key, :pointer,
|
||||
:key_length, :int
|
||||
end
|
||||
|
||||
def self.check_error(code)
|
||||
raise Error.new(code) if code.nonzero?
|
||||
nil
|
||||
|
@ -472,6 +479,22 @@ module FDB
|
|||
end
|
||||
end
|
||||
|
||||
class FutureKeyArray < Future
|
||||
def wait
|
||||
block_until_ready
|
||||
|
||||
ks = FFI::MemoryPointer.new :pointer
|
||||
count = FFI::MemoryPointer.new :int
|
||||
FDBC.check_error FDBC.fdb_future_get_key_array(@fpointer, kvs, count)
|
||||
ks = ks.read_pointer
|
||||
|
||||
[(0..count.read_int-1).map{|i|
|
||||
x = FDBC::KeyStruct.new(ks + (i * FDBC::KeyStruct.size))
|
||||
x[:key].read_bytes(x[:key_length])
|
||||
}, count.read_int]
|
||||
end
|
||||
end
|
||||
|
||||
class FutureStringArray < LazyFuture
|
||||
def getter
|
||||
strings = FFI::MemoryPointer.new :pointer
|
||||
|
@ -825,6 +848,12 @@ module FDB
|
|||
Int64Future.new(FDBC.fdb_transaction_get_estimated_range_size_bytes(@tpointer, bkey, bkey.bytesize, ekey, ekey.bytesize))
|
||||
end
|
||||
|
||||
def get_range_split_points(begin_key, end_key, chunkSize)
|
||||
bkey = FDB.key_to_bytes(begin_key)
|
||||
ekey = FDB.key_to_bytes(end_key)
|
||||
FutureKeyArray.new(FDBC.fdb_transaction_get_range_split_points(@tpointer, bkey, bkey.bytesize, ekey, ekey.bytesize, chunkSize))
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
TransactionRead.class_variable_set("@@StreamingMode", @@StreamingMode)
|
||||
|
|
Loading…
Reference in New Issue