diff --git a/bindings/bindingtester/spec/bindingApiTester.md b/bindings/bindingtester/spec/bindingApiTester.md index 872710a671..2f67c9dc1f 100644 --- a/bindings/bindingtester/spec/bindingApiTester.md +++ b/bindings/bindingtester/spec/bindingApiTester.md @@ -277,6 +277,12 @@ futures must apply the following rules to the result: internal stack machine state as the last seen version. Pushes the byte string "GOT_COMMITTED_VERSION" onto the stack. +#### GET_APPROXIMATE_SIZE + + Calls get_approximate_size and pushes the byte string "GOT_APPROXIMATE_SIZE" + onto the stack. Note bindings may issue GET_RANGE calls with different + limits, so these bindings can obtain different sizes back. + #### WAIT_FUTURE Pops the top item off the stack and pushes it back on. If the top item on diff --git a/bindings/bindingtester/tests/api.py b/bindings/bindingtester/tests/api.py index ccad9d39b3..be6350ece6 100644 --- a/bindings/bindingtester/tests/api.py +++ b/bindings/bindingtester/tests/api.py @@ -156,6 +156,7 @@ class ApiTest(Test): resets = ['ON_ERROR', 'RESET', 'CANCEL'] read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY'] write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT'] + txn_sizes = ['GET_APPROXIMATE_SIZE'] op_choices += reads op_choices += mutations @@ -168,6 +169,7 @@ class ApiTest(Test): op_choices += read_conflicts op_choices += write_conflicts op_choices += resets + op_choices += txn_sizes idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX'] atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR', u'APPEND_IF_FITS'] @@ -434,6 +436,10 @@ class ApiTest(Test): self.can_set_version = True self.can_use_key_selectors = True + elif op == 'GET_APPROXIMATE_SIZE': + instructions.append(op) + self.add_strings(1) + elif op == 'TUPLE_PACK' or op == 'TUPLE_RANGE': tup = self.random.random_tuple(10) instructions.push_args(len(tup), *tup) diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index a5897a08d4..b30a78bd84 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -60,16 +60,20 @@ if(NOT WIN32) if(OPEN_FOR_IDE) add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h) add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h) + add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h) add_library(mako OBJECT ${MAKO_SRCS}) else() add_executable(fdb_c_performance_test test/performance_test.c test/test.h) add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h) + add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h) add_executable(mako ${MAKO_SRCS}) strip_debug_symbols(fdb_c_performance_test) strip_debug_symbols(fdb_c_ryw_benchmark) + strip_debug_symbols(fdb_c_txn_size_test) endif() target_link_libraries(fdb_c_performance_test PRIVATE fdb_c) target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c) + target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c) # do not set RPATH for mako set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE) target_link_libraries(mako PRIVATE fdb_c) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index a1475e8bb0..f0ea313b7b 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -215,10 +215,15 @@ fdb_error_t fdb_future_get_error_v22( FDBFuture* f, const char** description ) { } extern "C" DLLEXPORT -fdb_error_t fdb_future_get_version( FDBFuture* f, int64_t* out_version ) { +fdb_error_t fdb_future_get_version_v619( FDBFuture* f, int64_t* out_version ) { CATCH_AND_RETURN( *out_version = TSAV(Version, f)->get(); ); } +extern "C" DLLEXPORT +fdb_error_t fdb_future_get_int64( FDBFuture* f, int64_t* out_value ) { + CATCH_AND_RETURN( *out_value = TSAV(int64_t, f)->get(); ); +} + extern "C" DLLEXPORT fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key, int* out_key_length ) { @@ -584,6 +589,11 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr, *out_version = TXN(tr)->getCommittedVersion(); ); } +extern "C" DLLEXPORT +FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) { + return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr(); +} + extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr ) { @@ -670,6 +680,7 @@ fdb_error_t fdb_select_api_version_impl( int runtime_version, int header_version // Versioned API changes -- descending order by version (new changes at top) // FDB_API_CHANGED( function, ver ) means there is a new implementation as of ver, and a function function_(ver-1) is the old implementation // FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and function_(ver-1) is the old implementation + FDB_API_REMOVED( fdb_future_get_version, 620 ); FDB_API_REMOVED( fdb_create_cluster, 610 ); FDB_API_REMOVED( fdb_cluster_create_database, 610 ); FDB_API_REMOVED( fdb_cluster_set_option, 610 ); diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index 5122488b33..0e049e4119 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -120,8 +120,13 @@ extern "C" { fdb_future_get_error( FDBFuture* f ); #endif +#if FDB_API_VERSION < 620 DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_version( FDBFuture* f, int64_t* out_version ); +#endif + + DLLEXPORT WARN_UNUSED_RESULT fdb_error_t + fdb_future_get_int64( FDBFuture* f, int64_t* out ); DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key, @@ -225,6 +230,13 @@ extern "C" { fdb_transaction_get_committed_version( FDBTransaction* tr, int64_t* out_version ); + // This function intentionally returns an FDBFuture instead of an integer directly, + // so that calling this API can see the effect of previous mutations on the transaction. + // Specifically, mutations are applied asynchronously by the main thread. In order to + // see them, this call has to be serviced by the main thread too. + DLLEXPORT WARN_UNUSED_RESULT FDBFuture* + fdb_transaction_get_approximate_size(FDBTransaction* tr); + DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr ); DLLEXPORT WARN_UNUSED_RESULT FDBFuture* diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index bfccdb9594..9b49e57fc3 100755 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -290,13 +290,22 @@ int64_t run_op_getreadversion(FDBTransaction *transaction) { } while (err && retry--); if (err) { - fprintf(stderr, "ERROR: fdb_transaction_get_version: %s\n", fdb_get_error(err)); + fprintf(stderr, "ERROR: fdb_transaction_get_read_version: %s\n", fdb_get_error(err)); return -1; } +#if FDB_API_VERSION < 620 err = fdb_future_get_version(f, &rv); +#else + err = fdb_future_get_int64(f, &rv); +#endif + if (err) { +#if FDB_API_VERSION < 620 fprintf(stderr, "ERROR: fdb_future_get_version: %s\n", fdb_get_error(err)); +#else + fprintf(stderr, "ERROR: fdb_future_get_int64: %s\n", fdb_get_error(err)); +#endif } fdb_future_destroy(f); return rv; diff --git a/bindings/c/test/ryw_benchmark.c b/bindings/c/test/ryw_benchmark.c index 167d3723e1..1777150894 100644 --- a/bindings/c/test/ryw_benchmark.c +++ b/bindings/c/test/ryw_benchmark.c @@ -224,7 +224,7 @@ void runTests(struct ResultSet *rs) { checkError(fdb_future_block_until_ready(f), "block for read version", rs); int64_t version; - checkError(fdb_future_get_version(f, &version), "get version", rs); + checkError(fdb_future_get_int64(f, &version), "get version", rs); fdb_future_destroy(f); insertData(tr); diff --git a/bindings/c/test/txn_size_test.c b/bindings/c/test/txn_size_test.c new file mode 100644 index 0000000000..73ee9a82e6 --- /dev/null +++ b/bindings/c/test/txn_size_test.c @@ -0,0 +1,110 @@ +/* + * txn_size_test.c + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "test.h" + +#include +#include +#include + +#include +#include + +pthread_t netThread; +const int numKeys = 100; +uint8_t** keys = NULL; + +#define KEY_SIZE 16 +#define VALUE_SIZE 100 +uint8_t valueStr[VALUE_SIZE]; + +fdb_error_t getSize(struct ResultSet* rs, FDBTransaction* tr, int64_t* out_size) { + fdb_error_t e; + FDBFuture* future = fdb_transaction_get_approximate_size(tr); + + e = maybeLogError(fdb_future_block_until_ready(future), "waiting for get future", rs); + if (e) { + fdb_future_destroy(future); + return e; + } + + e = maybeLogError(fdb_future_get_int64(future, out_size), "getting future value", rs); + if (e) { + fdb_future_destroy(future); + return e; + } + + fdb_future_destroy(future); + return 0; +} + +void runTests(struct ResultSet *rs) { + int64_t sizes[numKeys]; + int i = 0, j = 0; + FDBDatabase *db = openDatabase(rs, &netThread); + FDBTransaction *tr = NULL; + fdb_error_t e = fdb_database_create_transaction(db, &tr); + checkError(e, "create transaction", rs); + memset(sizes, 0, numKeys * sizeof(uint32_t)); + + fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE); + e = getSize(rs, tr, sizes + i); + checkError(e, "transaction get size", rs); + printf("size %d: %u\n", i, sizes[i]); + i++; + + fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE); + e = getSize(rs, tr, sizes + i); + checkError(e, "transaction get size", rs); + printf("size %d: %u\n", i, sizes[i]); + i++; + + fdb_transaction_clear(tr, keys[i], KEY_SIZE); + e = getSize(rs, tr, sizes + i); + checkError(e, "transaction get size", rs); + printf("size %d: %u\n", i, sizes[i]); + i++; + + fdb_transaction_clear_range(tr, keys[i], KEY_SIZE, keys[i+1], KEY_SIZE); + e = getSize(rs, tr, sizes + i); + checkError(e, "transaction get size", rs); + printf("size %d: %u\n", i, sizes[i]); + i++; + + for (j = 0; j + 1 < i; j++) { + assert(sizes[j] < sizes[j + 1]); + } + printf("Test passed!\n"); +} + +int main(int argc, char **argv) { + srand(time(NULL)); + struct ResultSet *rs = newResultSet(); + checkError(fdb_select_api_version(620), "select API version", rs); + printf("Running performance test at client version: %s\n", fdb_get_client_version()); + + keys = generateKeys(numKeys, KEY_SIZE); + runTests(rs); + + freeResultSet(rs); + freeKeys(keys, numKeys); + + return 0; +} diff --git a/bindings/flow/fdb_flow.actor.cpp b/bindings/flow/fdb_flow.actor.cpp index 4554ce716c..810f088e1c 100644 --- a/bindings/flow/fdb_flow.actor.cpp +++ b/bindings/flow/fdb_flow.actor.cpp @@ -20,12 +20,13 @@ #include "fdb_flow.h" -#include "flow/DeterministicRandom.h" -#include "flow/SystemMonitor.h" - #include #include +#include "flow/DeterministicRandom.h" +#include "flow/SystemMonitor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + using namespace FDB; THREAD_FUNC networkThread(void* fdb) { @@ -147,6 +148,7 @@ namespace FDB { void setOption(FDBTransactionOption option, Optional value = Optional()) override; + Future getApproximateSize() override; Future onError(Error const& e) override; void cancel() override; @@ -290,7 +292,7 @@ namespace FDB { return backToFuture( fdb_transaction_get_read_version( tr ), [](Reference f){ Version value; - throw_on_error( fdb_future_get_version( f->f, &value ) ); + throw_on_error( fdb_future_get_int64( f->f, &value ) ); return value; } ); @@ -408,6 +410,14 @@ namespace FDB { } } + Future TransactionImpl::getApproximateSize() { + return backToFuture(fdb_transaction_get_approximate_size(tr), [](Reference f) { + int64_t size = 0; + throw_on_error(fdb_future_get_int64(f->f, &size)); + return size; + }); + } + Future TransactionImpl::onError(Error const& e) { return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference f) { throw_on_error( fdb_future_get_error( f->f ) ); @@ -422,4 +432,5 @@ namespace FDB { void TransactionImpl::reset() { fdb_transaction_reset( tr ); } -} + +} // namespace FDB diff --git a/bindings/flow/fdb_flow.h b/bindings/flow/fdb_flow.h index 3db5d97e0b..90f77e67cc 100644 --- a/bindings/flow/fdb_flow.h +++ b/bindings/flow/fdb_flow.h @@ -112,6 +112,7 @@ namespace FDB { virtual Future commit() = 0; virtual Version getCommittedVersion() = 0; + virtual Future getApproximateSize() = 0; virtual Future> getVersionstamp() = 0; }; diff --git a/bindings/flow/tester/DirectoryTester.actor.cpp b/bindings/flow/tester/DirectoryTester.actor.cpp index 60c6cb88e2..a04317792b 100644 --- a/bindings/flow/tester/DirectoryTester.actor.cpp +++ b/bindings/flow/tester/DirectoryTester.actor.cpp @@ -19,6 +19,7 @@ */ #include "Tester.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. using namespace FDB; diff --git a/bindings/flow/tester/Tester.actor.cpp b/bindings/flow/tester/Tester.actor.cpp index 09fa520a55..69feab7b0d 100644 --- a/bindings/flow/tester/Tester.actor.cpp +++ b/bindings/flow/tester/Tester.actor.cpp @@ -18,16 +18,18 @@ * limitations under the License. */ -#include "fdbrpc/fdbrpc.h" -#include "flow/DeterministicRandom.h" -#include "bindings/flow/Tuple.h" -#include "bindings/flow/FDBLoanerTypes.h" - #include "Tester.actor.h" +#include #ifdef __linux__ #include #endif +#include "bindings/flow/Tuple.h" +#include "bindings/flow/FDBLoanerTypes.h" +#include "fdbrpc/fdbrpc.h" +#include "flow/DeterministicRandom.h" +#include "flow/actorcompiler.h" // This must be the last #include. + // Otherwise we have to type setupNetwork(), FDB::open(), etc. using namespace FDB; @@ -292,7 +294,7 @@ ACTOR Future printFlowTesterStack(FlowTesterStack* stack) { state int idx; for (idx = stack->data.size() - 1; idx >= 0; --idx) { Standalone value = wait(stack->data[idx].value); - // printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, printable(value).c_str()); + // printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, value.printable().c_str()); } return Void(); } @@ -703,6 +705,20 @@ struct GetCommittedVersionFunc : InstructionFunc { const char* GetCommittedVersionFunc::name = "GET_COMMITTED_VERSION"; REGISTER_INSTRUCTION_FUNC(GetCommittedVersionFunc); +// GET_APPROXIMATE_SIZE +struct GetApproximateSizeFunc : InstructionFunc { + static const char* name; + + ACTOR static Future call(Reference data, Reference instruction) { + int64_t _ = wait(instruction->tr->getApproximateSize()); + (void) _; // disable unused variable warning + data->stack.pushTuple(LiteralStringRef("GOT_APPROXIMATE_SIZE")); + return Void(); + } +}; +const char* GetApproximateSizeFunc::name = "GET_APPROXIMATE_SIZE"; +REGISTER_INSTRUCTION_FUNC(GetApproximateSizeFunc); + // GET_VERSIONSTAMP struct GetVersionstampFunc : InstructionFunc { static const char* name; @@ -1638,7 +1654,7 @@ ACTOR static Future doInstructions(Reference data) { op = op.substr(0, op.size() - 9); // printf("[==========]%ld/%ld:%s:%s: isDatabase:%d, isSnapshot:%d, stack count:%ld\n", - // idx, data->instructions.size(), printable(StringRef(data->instructions[idx].key)).c_str(), printable(StringRef(data->instructions[idx].value)).c_str(), + // idx, data->instructions.size(), StringRef(data->instructions[idx].key).printable().c_str(), StringRef(data->instructions[idx].value).printable().c_str(), // isDatabase, isSnapshot, data->stack.data.size()); //wait(printFlowTesterStack(&(data->stack))); diff --git a/bindings/go/src/_stacktester/stacktester.go b/bindings/go/src/_stacktester/stacktester.go index aef3f5ceda..0530c2e146 100644 --- a/bindings/go/src/_stacktester/stacktester.go +++ b/bindings/go/src/_stacktester/stacktester.go @@ -590,6 +590,9 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { panic(e) } sm.store(idx, []byte("GOT_COMMITTED_VERSION")) + case op == "GET_APPROXIMATE_SIZE": + _ = sm.currentTransaction().GetApproximateSize().MustGet() + sm.store(idx, []byte("GOT_APPROXIMATE_SIZE")) case op == "GET_VERSIONSTAMP": sm.store(idx, sm.currentTransaction().GetVersionstamp()) case op == "GET_KEY": diff --git a/bindings/go/src/fdb/futures.go b/bindings/go/src/fdb/futures.go index 9c34903436..4894ee40ea 100644 --- a/bindings/go/src/fdb/futures.go +++ b/bindings/go/src/fdb/futures.go @@ -331,7 +331,7 @@ func (f *futureInt64) Get() (int64, error) { f.BlockUntilReady() var ver C.int64_t - if err := C.fdb_future_get_version(f.ptr, &ver); err != 0 { + if err := C.fdb_future_get_int64(f.ptr, &ver); err != 0 { return 0, Error{int(err)} } diff --git a/bindings/go/src/fdb/transaction.go b/bindings/go/src/fdb/transaction.go index 13f170f36b..274aeee867 100644 --- a/bindings/go/src/fdb/transaction.go +++ b/bindings/go/src/fdb/transaction.go @@ -372,6 +372,16 @@ func (t Transaction) GetVersionstamp() FutureKey { return &futureKey{future: newFuture(C.fdb_transaction_get_versionstamp(t.ptr))} } +func (t *transaction) getApproximateSize() FutureInt64 { + return &futureInt64{ + future: newFuture(C.fdb_transaction_get_approximate_size(t.ptr)), + } +} + +func (t Transaction) GetApproximateSize() FutureInt64 { + return t.getApproximateSize() +} + // Reset rolls back a transaction, completely resetting it to its initial // state. This is logically equivalent to destroying the transaction and // creating a new one. diff --git a/bindings/java/CMakeLists.txt b/bindings/java/CMakeLists.txt index dca5990ad2..42c96c93a5 100644 --- a/bindings/java/CMakeLists.txt +++ b/bindings/java/CMakeLists.txt @@ -25,11 +25,11 @@ set(JAVA_BINDING_SRCS src/main/com/apple/foundationdb/FDB.java src/main/com/apple/foundationdb/FDBDatabase.java src/main/com/apple/foundationdb/FDBTransaction.java + src/main/com/apple/foundationdb/FutureInt64.java src/main/com/apple/foundationdb/FutureKey.java src/main/com/apple/foundationdb/FutureResult.java src/main/com/apple/foundationdb/FutureResults.java src/main/com/apple/foundationdb/FutureStrings.java - src/main/com/apple/foundationdb/FutureVersion.java src/main/com/apple/foundationdb/FutureVoid.java src/main/com/apple/foundationdb/JNIUtil.java src/main/com/apple/foundationdb/KeySelector.java diff --git a/bindings/java/fdbJNI.cpp b/bindings/java/fdbJNI.cpp index 9f08f135aa..5a49987a85 100644 --- a/bindings/java/fdbJNI.cpp +++ b/bindings/java/fdbJNI.cpp @@ -243,21 +243,21 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_NativeFuture_Future_1releaseM fdb_future_release_memory(var); } -JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureVersion_FutureVersion_1get(JNIEnv *jenv, jobject, jlong future) { - if( !future ) { +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureInt64_FutureInt64_1get(JNIEnv *jenv, jobject, jlong future) { + if (!future) { throwParamNotNull(jenv); return 0; } FDBFuture *f = (FDBFuture *)future; - int64_t version = 0; - fdb_error_t err = fdb_future_get_version(f, &version); - if( err ) { - safeThrow( jenv, getThrowable( jenv, err ) ); + int64_t value = 0; + fdb_error_t err = fdb_future_get_int64(f, &value); + if (err) { + safeThrow(jenv, getThrowable(jenv, err)); return 0; } - return (jlong)version; + return (jlong)value; } JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureStrings_1get(JNIEnv *jenv, jobject, jlong future) { @@ -805,6 +805,15 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1 return (jlong)version; } +JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) { + if (!tPtr) { + throwParamNotNull(jenv); + return 0; + } + FDBFuture* f = fdb_transaction_get_approximate_size((FDBTransaction*)tPtr); + return (jlong)f; +} + JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getVersionstamp(JNIEnv *jenv, jobject, jlong tPtr) { if (!tPtr) { throwParamNotNull(jenv); diff --git a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java index 0e507c914d..d6f1e4f935 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java @@ -216,7 +216,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC public CompletableFuture getReadVersion() { pointerReadLock.lock(); try { - return new FutureVersion(Transaction_getReadVersion(getPtr()), executor); + return new FutureInt64(Transaction_getReadVersion(getPtr()), executor); } finally { pointerReadLock.unlock(); } @@ -514,6 +514,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC } } + @Override + public CompletableFuture getApproximateSize() { + pointerReadLock.lock(); + try { + return new FutureInt64(Transaction_getApproximateSize(getPtr()), executor); + } finally { + pointerReadLock.unlock(); + } + } + @Override public CompletableFuture watch(byte[] key) throws FDBException { pointerReadLock.lock(); @@ -642,6 +652,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC private native long Transaction_commit(long cPtr); private native long Transaction_getCommittedVersion(long cPtr); private native long Transaction_getVersionstamp(long cPtr); + private native long Transaction_getApproximateSize(long cPtr); private native long Transaction_onError(long cPtr, int errorCode); private native void Transaction_dispose(long cPtr); private native void Transaction_reset(long cPtr); diff --git a/bindings/java/src/main/com/apple/foundationdb/FutureVersion.java b/bindings/java/src/main/com/apple/foundationdb/FutureInt64.java similarity index 74% rename from bindings/java/src/main/com/apple/foundationdb/FutureVersion.java rename to bindings/java/src/main/com/apple/foundationdb/FutureInt64.java index c5f1322094..0e1b280b71 100644 --- a/bindings/java/src/main/com/apple/foundationdb/FutureVersion.java +++ b/bindings/java/src/main/com/apple/foundationdb/FutureInt64.java @@ -1,9 +1,9 @@ /* - * FutureVersion.java + * FutureInt64.java * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,16 +22,16 @@ package com.apple.foundationdb; import java.util.concurrent.Executor; -class FutureVersion extends NativeFuture { - FutureVersion(long cPtr, Executor executor) { +class FutureInt64 extends NativeFuture { + FutureInt64(long cPtr, Executor executor) { super(cPtr); registerMarshalCallback(executor); } @Override protected Long getIfDone_internal(long cPtr) throws FDBException { - return FutureVersion_get(cPtr); + return FutureInt64_get(cPtr); } - private native long FutureVersion_get(long cPtr) throws FDBException; + private native long FutureInt64_get(long cPtr) throws FDBException; } diff --git a/bindings/java/src/main/com/apple/foundationdb/Transaction.java b/bindings/java/src/main/com/apple/foundationdb/Transaction.java index c3a8c6b671..f82c68e790 100644 --- a/bindings/java/src/main/com/apple/foundationdb/Transaction.java +++ b/bindings/java/src/main/com/apple/foundationdb/Transaction.java @@ -260,6 +260,15 @@ public interface Transaction extends AutoCloseable, ReadTransaction, Transaction */ CompletableFuture getVersionstamp(); + /** + * Returns a future that will contain the approximated size of the commit, which is the + * summation of mutations, read conflict ranges, and write conflict ranges. This can be + * called multiple times before transaction commit. + * + * @return a future that will contain the approximated size of the commit. + */ + CompletableFuture getApproximateSize(); + /** * Resets a transaction and returns a delayed signal for error recovery. If the error * encountered by the {@code Transaction} could not be recovered from, the returned diff --git a/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java b/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java index 7920d9d9dc..57278728dc 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java @@ -282,6 +282,11 @@ public class AsyncStackTester { return AsyncUtil.DONE; } + else if(op == StackOperation.GET_APPROXIMATE_SIZE) { + return inst.tr.getApproximateSize().thenAcceptAsync(size -> { + inst.push("GOT_APPROXIMATE_SIZE".getBytes()); + }, FDB.DEFAULT_EXECUTOR); + } else if(op == StackOperation.GET_VERSIONSTAMP) { try { inst.push(inst.tr.getVersionstamp()); diff --git a/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java b/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java index 7fdc7b9ff5..8d13aadde1 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java @@ -54,6 +54,7 @@ enum StackOperation { GET_KEY, GET_READ_VERSION, GET_COMMITTED_VERSION, + GET_APPROXIMATE_SIZE, GET_VERSIONSTAMP, SET_READ_VERSION, ON_ERROR, diff --git a/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java b/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java index a9cf47320f..96eac7a843 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java @@ -261,6 +261,10 @@ public class StackTester { inst.context.lastVersion = inst.tr.getCommittedVersion(); inst.push("GOT_COMMITTED_VERSION".getBytes()); } + else if(op == StackOperation.GET_APPROXIMATE_SIZE) { + Long size = inst.tr.getApproximateSize().join(); + inst.push("GOT_APPROXIMATE_SIZE".getBytes()); + } else if(op == StackOperation.GET_VERSIONSTAMP) { inst.push(inst.tr.getVersionstamp()); } diff --git a/bindings/python/fdb/impl.py b/bindings/python/fdb/impl.py index 77c7a74d13..c7d33f0fe9 100644 --- a/bindings/python/fdb/impl.py +++ b/bindings/python/fdb/impl.py @@ -405,7 +405,7 @@ class TransactionRead(_FDBBase): def get_read_version(self): """Get the read version of the transaction.""" - return FutureVersion(self.capi.fdb_transaction_get_read_version(self.tpointer)) + return FutureInt64(self.capi.fdb_transaction_get_read_version(self.tpointer)) def get(self, key): key = keyToBytes(key) @@ -541,6 +541,10 @@ class Transaction(TransactionRead): self.capi.fdb_transaction_get_committed_version(self.tpointer, ctypes.byref(version)) return version.value + def get_approximate_size(self): + """Get the approximate commit size of the transaction.""" + return FutureInt64(self.capi.fdb_transaction_get_approximate_size(self.tpointer)) + def get_versionstamp(self): return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer)) @@ -687,12 +691,12 @@ class FutureVoid(Future): return None -class FutureVersion(Future): +class FutureInt64(Future): def wait(self): self.block_until_ready() - version = ctypes.c_int64() - self.capi.fdb_future_get_version(self.fpointer, ctypes.byref(version)) - return version.value + value = ctypes.c_int64() + self.capi.fdb_future_get_int64(self.fpointer, ctypes.byref(value)) + return value.value class FutureKeyValueArray(Future): @@ -1359,9 +1363,9 @@ def init_c_api(): _capi.fdb_future_get_error.restype = int _capi.fdb_future_get_error.errcheck = check_error_code - _capi.fdb_future_get_version.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)] - _capi.fdb_future_get_version.restype = ctypes.c_int - _capi.fdb_future_get_version.errcheck = check_error_code + _capi.fdb_future_get_int64.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)] + _capi.fdb_future_get_int64.restype = ctypes.c_int + _capi.fdb_future_get_int64.errcheck = check_error_code _capi.fdb_future_get_key.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)), ctypes.POINTER(ctypes.c_int)] @@ -1453,6 +1457,9 @@ def init_c_api(): _capi.fdb_transaction_get_committed_version.restype = ctypes.c_int _capi.fdb_transaction_get_committed_version.errcheck = check_error_code + _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p] + _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p + _capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p] _capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p diff --git a/bindings/python/tests/size_limit_tests.py b/bindings/python/tests/size_limit_tests.py index 37ccf98ccd..7036eb27a4 100644 --- a/bindings/python/tests/size_limit_tests.py +++ b/bindings/python/tests/size_limit_tests.py @@ -68,9 +68,31 @@ def test_size_limit_option(db): except fdb.FDBError as e: assert(e.code == 2101) # Transaction exceeds byte limit (2101) +@fdb.transactional +def test_get_approximate_size(tr): + tr[b'key1'] = b'value1' + s1 = tr.get_approximate_size().wait() + + tr[b'key2'] = b'value2' + s2 = tr.get_approximate_size().wait() + assert(s1 < s2) + + tr.clear(b'key3') + s3 = tr.get_approximate_size().wait() + assert(s2 < s3) + + tr.add_read_conflict_key(b'key3+') + s4 = tr.get_approximate_size().wait() + assert(s3 < s4) + + tr.add_write_conflict_key(b'key4') + s5 = tr.get_approximate_size().wait() + assert(s4 < s5) + # Expect a cluster file as input. This test will write to the FDB cluster, so # be aware of potential side effects. if __name__ == '__main__': clusterFile = sys.argv[1] db = fdb.open(clusterFile) test_size_limit_option(db) + test_get_approximate_size(db) diff --git a/bindings/python/tests/tester.py b/bindings/python/tests/tester.py index b5dc84dbd3..9fb1829ce7 100644 --- a/bindings/python/tests/tester.py +++ b/bindings/python/tests/tester.py @@ -48,7 +48,7 @@ from cancellation_timeout_tests import test_retry_limits from cancellation_timeout_tests import test_db_retry_limits from cancellation_timeout_tests import test_combinations -from size_limit_tests import test_size_limit_option +from size_limit_tests import test_size_limit_option, test_get_approximate_size random.seed(0) @@ -478,6 +478,9 @@ class Tester: elif inst.op == six.u("GET_COMMITTED_VERSION"): self.last_version = inst.tr.get_committed_version() inst.push(b"GOT_COMMITTED_VERSION") + elif inst.op == six.u("GET_APPROXIMATE_SIZE"): + approximate_size = inst.tr.get_approximate_size().wait() + inst.push(b"GOT_APPROXIMATE_SIZE") elif inst.op == six.u("GET_VERSIONSTAMP"): inst.push(inst.tr.get_versionstamp()) elif inst.op == six.u("TUPLE_PACK"): @@ -561,6 +564,7 @@ class Tester: test_predicates() test_size_limit_option(db) + test_get_approximate_size(db) except fdb.FDBError as e: print("Unit tests failed: %s" % e.description) diff --git a/bindings/ruby/lib/fdbimpl.rb b/bindings/ruby/lib/fdbimpl.rb index dcbe246f89..b1deb1123c 100644 --- a/bindings/ruby/lib/fdbimpl.rb +++ b/bindings/ruby/lib/fdbimpl.rb @@ -85,7 +85,7 @@ module FDB attach_function :fdb_future_set_callback, [ :pointer, :fdb_future_callback, :pointer ], :fdb_error attach_function :fdb_future_get_error, [ :pointer ], :fdb_error - attach_function :fdb_future_get_version, [ :pointer, :pointer ], :fdb_error + attach_function :fdb_future_get_int64, [ :pointer, :pointer ], :fdb_error attach_function :fdb_future_get_key, [ :pointer, :pointer, :pointer ], :fdb_error attach_function :fdb_future_get_value, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error attach_function :fdb_future_get_keyvalue_array, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error @@ -114,6 +114,7 @@ module FDB attach_function :fdb_transaction_watch, [ :pointer, :pointer, :int ], :pointer attach_function :fdb_transaction_commit, [ :pointer ], :pointer attach_function :fdb_transaction_get_committed_version, [ :pointer, :pointer ], :fdb_error + attach_function :fdb_transaction_get_approximate_size, [ :pointer ], :pointer attach_function :fdb_transaction_get_versionstamp, [ :pointer ], :pointer attach_function :fdb_transaction_on_error, [ :pointer, :fdb_error ], :pointer attach_function :fdb_transaction_reset, [ :pointer ], :void @@ -443,11 +444,11 @@ module FDB end end - class Version < LazyFuture + class Int64Future < LazyFuture def getter - version = FFI::MemoryPointer.new :int64 - FDBC.check_error FDBC.fdb_future_get_version(@fpointer, version) - @value = version.read_long_long + val = FFI::MemoryPointer.new :int64 + FDBC.check_error FDBC.fdb_future_get_int64(@fpointer, val) + @value = val.read_long_long end private :getter end @@ -687,7 +688,7 @@ module FDB end def get_read_version - Version.new(FDBC.fdb_transaction_get_read_version @tpointer) + Int64Future.new(FDBC.fdb_transaction_get_read_version @tpointer) end def get(key) @@ -904,6 +905,10 @@ module FDB version.read_long_long end + def get_approximate_size + Int64Future.new(FDBC.fdb_transaction_get_approximate_size @tpointer) + end + def get_versionstamp Key.new(FDBC.fdb_transaction_get_versionstamp(@tpointer)) end diff --git a/bindings/ruby/tests/tester.rb b/bindings/ruby/tests/tester.rb index c199eddc09..ccf863797d 100755 --- a/bindings/ruby/tests/tester.rb +++ b/bindings/ruby/tests/tester.rb @@ -381,6 +381,9 @@ class Tester when "GET_COMMITTED_VERSION" @last_version = inst.tr.get_committed_version inst.push("GOT_COMMITTED_VERSION") + when "GET_APPROXIMATE_SIZE" + size = inst.tr.get_approximate_size.to_i + inst.push("GOT_APPROXIMATE_SIZE") when "GET_VERSIONSTAMP" inst.push(inst.tr.get_versionstamp) when "TUPLE_PACK" diff --git a/documentation/sphinx/source/api-c.rst b/documentation/sphinx/source/api-c.rst index 672656a908..bd8eb33189 100644 --- a/documentation/sphinx/source/api-c.rst +++ b/documentation/sphinx/source/api-c.rst @@ -291,9 +291,9 @@ See :ref:`developer-guide-programming-with-futures` for further (language-indepe |future-get-return1|. -.. function:: fdb_error_t fdb_future_get_version(FDBFuture* future, int64_t* out_version) +.. function:: fdb_error_t fdb_future_get_int64(FDBFuture* future, int64_t* out) - Extracts a version from an :type:`FDBFuture` into a caller-provided variable of type ``int64_t``. |future-warning| + Extracts a 64-bit integer from an :type:`FDBFuture*` into a caller-provided variable of type ``int64_t``. |future-warning| |future-get-return1| |future-get-return2|. @@ -447,7 +447,7 @@ Applications must provide error handling and an appropriate retry loop around th .. function:: FDBFuture* fdb_transaction_get_read_version(FDBTransaction* transaction) - |future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_version()` to extract the version into an int64_t that you provide, |future-return2| + |future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_int64()` to extract the version into an int64_t that you provide, |future-return2| The transaction obtains a snapshot read version automatically at the time of the first call to :func:`fdb_transaction_get_*()` (including this one) and (unless causal consistency has been deliberately compromised by transaction options) is guaranteed to represent all transactions which were reported committed before that call. @@ -720,6 +720,12 @@ Applications must provide error handling and an appropriate retry loop around th Most applications will not call this function. +.. function:: FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* transaction) + + |future-return0| the approximate transaction size so far in the returned future, which is the summation of the estimated size of mutations, read conflict ranges, and write conflict ranges. |future-return1| call :func:`fdb_future_get_int64()` to extract the size, |future-return2| + + This can be called multiple times before the transaction is committed. + .. function:: FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* transaction) |future-return0| the versionstamp which was used by any versionstamp operations in this transaction. |future-return1| call :func:`fdb_future_get_key()` to extract the key, |future-return2| diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 68315c2603..8f774bb049 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -28,6 +28,8 @@ Status Bindings -------- +* Added a new API to get the approximated transaction size before commit, e.g., ``fdb_transaction_get_approximate_size`` in the C binding. `(PR #1756) `_. +* C: ``fdb_future_get_version`` has been renamed to ``fdb_future_get_int64``. `(PR #1756) `_. * Go: The Go bindings now require Go version 1.11 or later. * Go: Fix issue with finalizers running too early that could lead to undefined behavior. `(PR #1451) `_. * Added transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) `_. diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index a02166262e..00e1241b70 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -843,4 +843,6 @@ public: return updateErrorInfo(cx, e, details); } }; + +#include "flow/unactorcompiler.h" #endif diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index e1d04e5a9f..b3e6217054 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -61,6 +61,7 @@ public: virtual ThreadFuture commit() = 0; virtual Version getCommittedVersion() = 0; + virtual ThreadFuture getApproximateSize() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index eb973b2659..ce7e6a02d7 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -47,7 +47,7 @@ ThreadFuture DLTransaction::getReadVersion() { return toThreadFuture(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) { int64_t version; - FdbCApi::fdb_error_t error = api->futureGetVersion(f, &version); + FdbCApi::fdb_error_t error = api->futureGetInt64(f, &version); ASSERT(!error); return version; }); @@ -195,6 +195,20 @@ Version DLTransaction::getCommittedVersion() { return version; } +ThreadFuture DLTransaction::getApproximateSize() { + if(!api->transactionGetApproximateSize) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr); + return toThreadFuture(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) { + int64_t size = 0; + FdbCApi::fdb_error_t error = api->futureGetInt64(f, &size); + ASSERT(!error); + return size; + }); +} + void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0)); } @@ -287,6 +301,7 @@ void DLApi::init() { loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op"); loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit"); loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version"); + loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 620); loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch"); loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error"); loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset"); @@ -294,7 +309,7 @@ void DLApi::init() { loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range"); loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database"); - loadClientFunction(&api->futureGetVersion, lib, fdbCPath, "fdb_future_get_version"); + loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version"); loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error"); loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key"); loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value"); @@ -595,6 +610,12 @@ Version MultiVersionTransaction::getCommittedVersion() { return invalidVersion; } +ThreadFuture MultiVersionTransaction::getApproximateSize() { + auto tr = getTransaction(); + auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture(Never()); + return abortableFuture(f, tr.onChange); +} + void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { auto itr = FDBTransactionOptions::optionInfo.find(option); if(itr == FDBTransactionOptions::optionInfo.end()) { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index b1a1c3372a..e71f6e895f 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -84,6 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { FDBFuture* (*transactionCommit)(FDBTransaction *tr); fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion); + FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr); FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength); FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error); void (*transactionReset)(FDBTransaction *tr); @@ -94,7 +95,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { //Future fdb_error_t (*futureGetDatabase)(FDBFuture *f, FDBDatabase **outDb); - fdb_error_t (*futureGetVersion)(FDBFuture *f, int64_t *outVersion); + fdb_error_t (*futureGetInt64)(FDBFuture *f, int64_t *outValue); fdb_error_t (*futureGetError)(FDBFuture *f); fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength); fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength); @@ -116,41 +117,42 @@ public: DLTransaction(Reference api, FdbCApi::FDBTransaction *tr) : api(api), tr(tr) {} ~DLTransaction() { api->transactionDestroy(tr); } - void cancel(); - void setVersion(Version v); - ThreadFuture getReadVersion(); + void cancel() override; + void setVersion(Version v) override; + ThreadFuture getReadVersion() override; - ThreadFuture> get(const KeyRef& key, bool snapshot=false); - ThreadFuture getKey(const KeySelectorRef& key, bool snapshot=false); - ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false); - ThreadFuture>> getAddressesForKey(const KeyRef& key); - ThreadFuture> getVersionstamp(); + ThreadFuture> get(const KeyRef& key, bool snapshot=false) override; + ThreadFuture getKey(const KeySelectorRef& key, bool snapshot=false) override; + ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override; + ThreadFuture>> getAddressesForKey(const KeyRef& key) override; + ThreadFuture> getVersionstamp() override; - void addReadConflictRange(const KeyRangeRef& keys); + void addReadConflictRange(const KeyRangeRef& keys) override; - void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType); - void set(const KeyRef& key, const ValueRef& value); - void clear(const KeyRef& begin, const KeyRef& end); - void clear(const KeyRangeRef& range); - void clear(const KeyRef& key); + void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override; + void set(const KeyRef& key, const ValueRef& value) override; + void clear(const KeyRef& begin, const KeyRef& end) override; + void clear(const KeyRangeRef& range) override; + void clear(const KeyRef& key) override; - ThreadFuture watch(const KeyRef& key); + ThreadFuture watch(const KeyRef& key) override; - void addWriteConflictRange(const KeyRangeRef& keys); + void addWriteConflictRange(const KeyRangeRef& keys) override; - ThreadFuture commit(); - Version getCommittedVersion(); + ThreadFuture commit() override; + Version getCommittedVersion() override; + ThreadFuture getApproximateSize() override; - void setOption(FDBTransactionOptions::Option option, Optional value=Optional()); + void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) override; - ThreadFuture onError(Error const& e); - void reset(); + ThreadFuture onError(Error const& e) override; + void reset() override; - void addref() { ThreadSafeReferenceCounted::addref(); } - void delref() { ThreadSafeReferenceCounted::delref(); } + void addref() override { ThreadSafeReferenceCounted::addref(); } + void delref() override { ThreadSafeReferenceCounted::delref(); } private: const Reference api; @@ -165,11 +167,11 @@ public: ThreadFuture onReady(); - Reference createTransaction(); - void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()); + Reference createTransaction() override; + void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; - void addref() { ThreadSafeReferenceCounted::addref(); } - void delref() { ThreadSafeReferenceCounted::delref(); } + void addref() override { ThreadSafeReferenceCounted::addref(); } + void delref() override { ThreadSafeReferenceCounted::delref(); } private: const Reference api; @@ -181,18 +183,18 @@ class DLApi : public IClientApi { public: DLApi(std::string fdbCPath); - void selectApiVersion(int apiVersion); - const char* getClientVersion(); + void selectApiVersion(int apiVersion) override; + const char* getClientVersion() override; - void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional()); - void setupNetwork(); - void runNetwork(); - void stopNetwork(); + void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional()) override; + void setupNetwork() override; + void runNetwork() override; + void stopNetwork() override; - Reference createDatabase(const char *clusterFilePath); + Reference createDatabase(const char *clusterFilePath) override; Reference createDatabase609(const char *clusterFilePath); // legacy database creation - void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter); + void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override; private: const std::string fdbCPath; @@ -212,41 +214,42 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted< public: MultiVersionTransaction(Reference db, UniqueOrderedOptionList defaultOptions); - void cancel(); - void setVersion(Version v); - ThreadFuture getReadVersion(); + void cancel() override; + void setVersion(Version v) override; + ThreadFuture getReadVersion() override; - ThreadFuture> get(const KeyRef& key, bool snapshot=false); - ThreadFuture getKey(const KeySelectorRef& key, bool snapshot=false); - ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false); - ThreadFuture> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false); - ThreadFuture>> getAddressesForKey(const KeyRef& key); - ThreadFuture> getVersionstamp(); + ThreadFuture> get(const KeyRef& key, bool snapshot=false) override; + ThreadFuture getKey(const KeySelectorRef& key, bool snapshot=false) override; + ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override; + ThreadFuture> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override; + ThreadFuture>> getAddressesForKey(const KeyRef& key) override; + ThreadFuture> getVersionstamp() override; - void addReadConflictRange(const KeyRangeRef& keys); + void addReadConflictRange(const KeyRangeRef& keys) override; - void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType); - void set(const KeyRef& key, const ValueRef& value); - void clear(const KeyRef& begin, const KeyRef& end); - void clear(const KeyRangeRef& range); - void clear(const KeyRef& key); + void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override; + void set(const KeyRef& key, const ValueRef& value) override; + void clear(const KeyRef& begin, const KeyRef& end) override; + void clear(const KeyRangeRef& range) override; + void clear(const KeyRef& key) override; - ThreadFuture watch(const KeyRef& key); + ThreadFuture watch(const KeyRef& key) override; - void addWriteConflictRange(const KeyRangeRef& keys); + void addWriteConflictRange(const KeyRangeRef& keys) override; - ThreadFuture commit(); - Version getCommittedVersion(); + ThreadFuture commit() override; + Version getCommittedVersion() override; + ThreadFuture getApproximateSize() override; - void setOption(FDBTransactionOptions::Option option, Optional value=Optional()); + void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) override; - ThreadFuture onError(Error const& e); - void reset(); + ThreadFuture onError(Error const& e) override; + void reset() override; - void addref() { ThreadSafeReferenceCounted::addref(); } - void delref() { ThreadSafeReferenceCounted::delref(); } + void addref() override { ThreadSafeReferenceCounted::addref(); } + void delref() override { ThreadSafeReferenceCounted::delref(); } private: const Reference db; @@ -289,11 +292,11 @@ public: MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference db, bool openConnectors=true); ~MultiVersionDatabase(); - Reference createTransaction(); - void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()); + Reference createTransaction() override; + void setOption(FDBDatabaseOptions::Option option, Optional value = Optional()) override; - void addref() { ThreadSafeReferenceCounted::addref(); } - void delref() { ThreadSafeReferenceCounted::delref(); } + void addref() override { ThreadSafeReferenceCounted::addref(); } + void delref() override { ThreadSafeReferenceCounted::delref(); } static Reference debugCreateFromExistingDatabase(Reference db); @@ -354,16 +357,16 @@ private: class MultiVersionApi : public IClientApi { public: - void selectApiVersion(int apiVersion); - const char* getClientVersion(); + void selectApiVersion(int apiVersion) override; + const char* getClientVersion() override; - void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional()); - void setupNetwork(); - void runNetwork(); - void stopNetwork(); - void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter); + void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional()) override; + void setupNetwork() override; + void runNetwork() override; + void stopNetwork() override; + void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override; - Reference createDatabase(const char *clusterFilePath); + Reference createDatabase(const char *clusterFilePath) override; static MultiVersionApi* api; Reference getLocalClient(); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index ea9923a462..700274a5bb 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2824,7 +2824,7 @@ Future Transaction::commitMutations() { cx->mutationsPerCommit.addSample(tr.transaction.mutations.size()); cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize()); - size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize(); + size_t transactionSize = getSize(); if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) { TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction") .suppressFor(1.0) @@ -3191,6 +3191,12 @@ Future> Transaction::getVersionstamp() { return versionstampPromise.getFuture(); } +uint32_t Transaction::getSize() { + auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + + tr.transaction.write_conflict_ranges.expectedSize(); + return s; +} + Future Transaction::onError( Error const& e ) { if (e.code() == error_code_success) { return client_invalid_operation(); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 1338ffafca..49dbc3e557 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -287,6 +287,7 @@ public: Promise> versionstampPromise; + uint32_t getSize(); Future onError( Error const& e ); void flushTrLogsIfEnabled(); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 2f862cb338..c41739d907 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -990,9 +990,10 @@ public: if(!ryw->options.readYourWritesDisabled) { ryw->watchMap[key].push_back(watch); val = readWithConflictRange( ryw, GetValueReq(key), false ); - } - else + } else { + ryw->approximateSize += 2 * key.expectedSize() + 1; val = ryw->tr.get(key); + } try { wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture()); @@ -1045,7 +1046,7 @@ public: return Void(); } - + ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), allKeys.end)); auto conflictRanges = ryw->readConflicts.ranges(); @@ -1123,8 +1124,11 @@ public: } }; -ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) { - std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions)); +ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx) + : cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false), + options(tr), deferredError(cx->deferredError) { + std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), + std::back_inserter(persistentOptions)); applyPersistentOptions(); } @@ -1367,6 +1371,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) } if(options.readYourWritesDisabled) { + approximateSize += r.expectedSize() + sizeof(KeyRangeRef); tr.addReadConflictRange(r); return; } @@ -1381,6 +1386,7 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap:: //it.skip( key ); //ASSERT( it.beginKey() <= key && key < it.endKey() ); if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) { + approximateSize += 2 * key.expectedSize() + 1 + sizeof(KeyRangeRef); readConflicts.insert( singleKeyRange( key, arena ), true ); } } @@ -1391,13 +1397,15 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, Writ for(; it.beginKey() < keys.end; ++it ) { if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) { KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) ); - if( !insert_range.empty() ) + if (!insert_range.empty()) { + approximateSize += keys.expectedSize() + sizeof(KeyRangeRef); readConflicts.insert( insert_range, true ); + } } } } -void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) { +void ReadYourWritesTransaction::writeRangeToNativeTransaction(KeyRangeRef const& keys) { WriteMap::iterator it( &writes ); it.skip(keys.begin); @@ -1410,12 +1418,12 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey()); inClearRange = true; } else if( !it.is_cleared_range() && inClearRange ) { - tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false ); + tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena)), false); inClearRange = false; } } - if( inClearRange ) { + if (inClearRange) { tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false); } @@ -1429,7 +1437,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey()); inConflictRange = true; } else if( !it.is_conflict_range() && inConflictRange ) { - tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) ); + tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena))); inConflictRange = false; } @@ -1440,9 +1448,9 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const switch(op[i].type) { case MutationRef::SetValue: if (op[i].value.present()) { - tr.set( it.beginKey().assertRef(), op[i].value.get(), false ); + tr.set(it.beginKey().assertRef(), op[i].value.get(), false); } else { - tr.clear( it.beginKey().assertRef(), false ); + tr.clear(it.beginKey().assertRef(), false); } break; case MutationRef::AddValue: @@ -1469,7 +1477,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const } if( inConflictRange ) { - tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) ); + tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), keys.end)); } } @@ -1574,7 +1582,9 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope throw client_invalid_operation(); } - if(options.readYourWritesDisabled) { + approximateSize += k.expectedSize() + v.expectedSize() + sizeof(MutationRef) + + (addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0); + if (options.readYourWritesDisabled) { return tr.atomicOp(k, v, (MutationRef::Type) operationType, addWriteConflict); } @@ -1604,7 +1614,9 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) if(key >= getMaxWriteKey()) throw key_outside_legal_range(); - if(options.readYourWritesDisabled ) { + approximateSize += key.expectedSize() + value.expectedSize() + sizeof(MutationRef) + + (addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0); + if (options.readYourWritesDisabled) { return tr.set(key, value, addWriteConflict); } @@ -1632,10 +1644,12 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) { if(range.begin > maxKey || range.end > maxKey) throw key_outside_legal_range(); - if( options.readYourWritesDisabled ) { + approximateSize += range.expectedSize() + sizeof(MutationRef) + + (addWriteConflict ? sizeof(KeyRangeRef) + range.expectedSize() : 0); + if (options.readYourWritesDisabled) { return tr.clear(range, addWriteConflict); } - + //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys //we can translate it to an equivalent one with smaller keys KeyRef begin = range.begin; @@ -1676,6 +1690,8 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) { } KeyRangeRef r = singleKeyRange( key, arena ); + approximateSize += r.expectedSize() + sizeof(KeyRangeRef) + + (addWriteConflict ? sizeof(KeyRangeRef) + r.expectedSize() : 0); //SOMEDAY: add an optimized single key clear to write map writes.clear(r, addWriteConflict); @@ -1703,7 +1719,7 @@ Future ReadYourWritesTransaction::watch(const Key& key) { return RYWImpl::watch(this, key); } -void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) { +void ReadYourWritesTransaction::addWriteConflictRange(KeyRangeRef const& keys) { if(checkUsedDuringCommit()) { throw used_during_commit(); } @@ -1730,6 +1746,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) return; } + approximateSize += r.expectedSize() + sizeof(KeyRangeRef); if(options.readYourWritesDisabled) { tr.addWriteConflictRange(r); return; @@ -1854,6 +1871,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N r.resetPromise = Promise(); deferredError = std::move( r.deferredError ); retries = r.retries; + approximateSize = r.approximateSize; timeoutActor = r.timeoutActor; creationTime = r.creationTime; commitStarted = r.commitStarted; @@ -1870,6 +1888,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& arena( std::move(r.arena) ), reading( std::move(r.reading) ), retries( r.retries ), + approximateSize(r.approximateSize), creationTime( r.creationTime ), deferredError( std::move(r.deferredError) ), timeoutActor( std::move(r.timeoutActor) ), @@ -1921,6 +1940,7 @@ void ReadYourWritesTransaction::resetRyow() { readConflicts = CoalescedKeyRefRangeMap(); watchMap.clear(); reading = AndFuture(); + approximateSize = 0; commitStarted = false; deferredError = Error(); @@ -1941,6 +1961,7 @@ void ReadYourWritesTransaction::cancel() { void ReadYourWritesTransaction::reset() { retries = 0; + approximateSize = 0; creationTime = now(); timeoutActor.cancel(); persistentOptions.clear(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index f4b93eabcc..c26c7cbbe7 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -98,6 +98,7 @@ public: Future commit(); Version getCommittedVersion() { return tr.getCommittedVersion(); } + int64_t getApproximateSize() { return approximateSize; } Future> getVersionstamp(); void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); @@ -140,6 +141,7 @@ private: Promise resetPromise; AndFuture reading; int retries; + int64_t approximateSize; Future timeoutActor; double creationTime; bool commitStarted; @@ -149,7 +151,7 @@ private: void resetTimeout(); void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key) void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena - void writeRangeToNativeTransaction( KeyRangeRef const& keys ); + void writeRangeToNativeTransaction(KeyRangeRef const& keys); void resetRyow(); // doesn't reset the encapsulated transaction, or creation time/retry state KeyRef getMaxReadKey(); diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp index c26170c8c8..7fd897b98c 100644 --- a/fdbclient/ThreadSafeTransaction.actor.cpp +++ b/fdbclient/ThreadSafeTransaction.actor.cpp @@ -271,8 +271,12 @@ ThreadFuture< Void > ThreadSafeTransaction::commit() { Version ThreadSafeTransaction::getCommittedVersion() { // This should be thread safe when called legally, but it is fragile - Version v = tr->getCommittedVersion(); - return v; + return tr->getCommittedVersion(); +} + +ThreadFuture ThreadSafeTransaction::getApproximateSize() { + ReadYourWritesTransaction *tr = this->tr; + return onMainThread([tr]() -> Future { return tr->getApproximateSize(); }); } ThreadFuture> ThreadSafeTransaction::getVersionstamp() { diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 21c8de199a..c5832cec45 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -55,54 +55,54 @@ public: explicit ThreadSafeTransaction(DatabaseContext* cx); ~ThreadSafeTransaction(); - void cancel(); - void setVersion( Version v ); - ThreadFuture getReadVersion(); + void cancel() override; + void setVersion( Version v ) override; + ThreadFuture getReadVersion() override; - ThreadFuture< Optional > get( const KeyRef& key, bool snapshot = false ); - ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false ); - ThreadFuture< Standalone > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false ); - ThreadFuture< Standalone > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ); - ThreadFuture< Standalone > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) { + ThreadFuture< Optional > get( const KeyRef& key, bool snapshot = false ) override; + ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false ) override; + ThreadFuture< Standalone > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false ) override; + ThreadFuture< Standalone > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override; + ThreadFuture< Standalone > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) override { return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limit, snapshot, reverse ); } - ThreadFuture< Standalone > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) { + ThreadFuture< Standalone > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override { return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse ); } + ThreadFuture>> getAddressesForKey(const KeyRef& key) override; + ThreadFuture> getVersionstamp() override; - ThreadFuture>> getAddressesForKey(const KeyRef& key); - - void addReadConflictRange( const KeyRangeRef& keys ); + void addReadConflictRange( const KeyRangeRef& keys ) override; void makeSelfConflicting(); - void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ); - void set( const KeyRef& key, const ValueRef& value ); - void clear( const KeyRef& begin, const KeyRef& end); - void clear( const KeyRangeRef& range ); - void clear( const KeyRef& key ); + void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ) override; + void set( const KeyRef& key, const ValueRef& value ) override; + void clear( const KeyRef& begin, const KeyRef& end) override; + void clear( const KeyRangeRef& range ) override; + void clear( const KeyRef& key ) override; - ThreadFuture< Void > watch( const KeyRef& key ); + ThreadFuture< Void > watch( const KeyRef& key ) override; - void addWriteConflictRange( const KeyRangeRef& keys ); + void addWriteConflictRange( const KeyRangeRef& keys ) override; - ThreadFuture commit(); - Version getCommittedVersion(); - ThreadFuture> getVersionstamp(); + ThreadFuture commit() override; + Version getCommittedVersion() override; + ThreadFuture getApproximateSize() override; - void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); + void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ) override; ThreadFuture checkDeferredError(); - ThreadFuture onError( Error const& e ); + ThreadFuture onError( Error const& e ) override; // These are to permit use as state variables in actors: ThreadSafeTransaction() : tr(NULL) {} void operator=(ThreadSafeTransaction&& r) BOOST_NOEXCEPT; ThreadSafeTransaction(ThreadSafeTransaction&& r) BOOST_NOEXCEPT; - void reset(); + void reset() override; - void addref() { ThreadSafeReferenceCounted::addref(); } - void delref() { ThreadSafeReferenceCounted::delref(); } + void addref() override { ThreadSafeReferenceCounted::addref(); } + void delref() override { ThreadSafeReferenceCounted::delref(); } private: ReadYourWritesTransaction *tr; diff --git a/fdbserver/FDBExecHelper.actor.h b/fdbserver/FDBExecHelper.actor.h index 23e290f90e..6a847c912a 100644 --- a/fdbserver/FDBExecHelper.actor.h +++ b/fdbserver/FDBExecHelper.actor.h @@ -67,4 +67,5 @@ void unregisterTLog(UID uid); // checks if there is any non-stopped TLog instance bool isTLogInSameNode(); +#include "flow/unactorcompiler.h" #endif