Merge pull request #1756 from jzhou77/db-option

Add transaction getApproximateSize() API
This commit is contained in:
A.J. Beamon 2019-07-19 08:33:24 -07:00 committed by GitHub
commit bc5c65e5ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 527 additions and 177 deletions

View File

@ -277,6 +277,12 @@ futures must apply the following rules to the result:
internal stack machine state as the last seen version. Pushes the byte
string "GOT_COMMITTED_VERSION" onto the stack.
#### GET_APPROXIMATE_SIZE
Calls get_approximate_size and pushes the byte string "GOT_APPROXIMATE_SIZE"
onto the stack. Note bindings may issue GET_RANGE calls with different
limits, so these bindings can obtain different sizes back.
#### WAIT_FUTURE
Pops the top item off the stack and pushes it back on. If the top item on

View File

@ -156,6 +156,7 @@ class ApiTest(Test):
resets = ['ON_ERROR', 'RESET', 'CANCEL']
read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY']
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
txn_sizes = ['GET_APPROXIMATE_SIZE']
op_choices += reads
op_choices += mutations
@ -168,6 +169,7 @@ class ApiTest(Test):
op_choices += read_conflicts
op_choices += write_conflicts
op_choices += resets
op_choices += txn_sizes
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR', u'APPEND_IF_FITS']
@ -434,6 +436,10 @@ class ApiTest(Test):
self.can_set_version = True
self.can_use_key_selectors = True
elif op == 'GET_APPROXIMATE_SIZE':
instructions.append(op)
self.add_strings(1)
elif op == 'TUPLE_PACK' or op == 'TUPLE_RANGE':
tup = self.random.random_tuple(10)
instructions.push_args(len(tup), *tup)

View File

@ -60,16 +60,20 @@ if(NOT WIN32)
if(OPEN_FOR_IDE)
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h)
add_library(mako OBJECT ${MAKO_SRCS})
else()
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h)
add_executable(mako ${MAKO_SRCS})
strip_debug_symbols(fdb_c_performance_test)
strip_debug_symbols(fdb_c_ryw_benchmark)
strip_debug_symbols(fdb_c_txn_size_test)
endif()
target_link_libraries(fdb_c_performance_test PRIVATE fdb_c)
target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c)
target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c)
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
target_link_libraries(mako PRIVATE fdb_c)

View File

@ -215,10 +215,15 @@ fdb_error_t fdb_future_get_error_v22( FDBFuture* f, const char** description ) {
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_version( FDBFuture* f, int64_t* out_version ) {
fdb_error_t fdb_future_get_version_v619( FDBFuture* f, int64_t* out_version ) {
CATCH_AND_RETURN( *out_version = TSAV(Version, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_int64( FDBFuture* f, int64_t* out_value ) {
CATCH_AND_RETURN( *out_value = TSAV(int64_t, f)->get(); );
}
extern "C" DLLEXPORT
fdb_error_t fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
int* out_key_length ) {
@ -584,6 +589,11 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr,
*out_version = TXN(tr)->getCommittedVersion(); );
}
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
}
extern "C" DLLEXPORT
FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr )
{
@ -670,6 +680,7 @@ fdb_error_t fdb_select_api_version_impl( int runtime_version, int header_version
// Versioned API changes -- descending order by version (new changes at top)
// FDB_API_CHANGED( function, ver ) means there is a new implementation as of ver, and a function function_(ver-1) is the old implementation
// FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and function_(ver-1) is the old implementation
FDB_API_REMOVED( fdb_future_get_version, 620 );
FDB_API_REMOVED( fdb_create_cluster, 610 );
FDB_API_REMOVED( fdb_cluster_create_database, 610 );
FDB_API_REMOVED( fdb_cluster_set_option, 610 );

View File

@ -120,8 +120,13 @@ extern "C" {
fdb_future_get_error( FDBFuture* f );
#endif
#if FDB_API_VERSION < 620
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_version( FDBFuture* f, int64_t* out_version );
#endif
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_int64( FDBFuture* f, int64_t* out );
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_future_get_key( FDBFuture* f, uint8_t const** out_key,
@ -225,6 +230,13 @@ extern "C" {
fdb_transaction_get_committed_version( FDBTransaction* tr,
int64_t* out_version );
// This function intentionally returns an FDBFuture instead of an integer directly,
// so that calling this API can see the effect of previous mutations on the transaction.
// Specifically, mutations are applied asynchronously by the main thread. In order to
// see them, this call has to be serviced by the main thread too.
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*

View File

@ -290,13 +290,22 @@ int64_t run_op_getreadversion(FDBTransaction *transaction) {
} while (err && retry--);
if (err) {
fprintf(stderr, "ERROR: fdb_transaction_get_version: %s\n", fdb_get_error(err));
fprintf(stderr, "ERROR: fdb_transaction_get_read_version: %s\n", fdb_get_error(err));
return -1;
}
#if FDB_API_VERSION < 620
err = fdb_future_get_version(f, &rv);
#else
err = fdb_future_get_int64(f, &rv);
#endif
if (err) {
#if FDB_API_VERSION < 620
fprintf(stderr, "ERROR: fdb_future_get_version: %s\n", fdb_get_error(err));
#else
fprintf(stderr, "ERROR: fdb_future_get_int64: %s\n", fdb_get_error(err));
#endif
}
fdb_future_destroy(f);
return rv;

View File

@ -224,7 +224,7 @@ void runTests(struct ResultSet *rs) {
checkError(fdb_future_block_until_ready(f), "block for read version", rs);
int64_t version;
checkError(fdb_future_get_version(f, &version), "get version", rs);
checkError(fdb_future_get_int64(f, &version), "get version", rs);
fdb_future_destroy(f);
insertData(tr);

View File

@ -0,0 +1,110 @@
/*
* txn_size_test.c
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "test.h"
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <foundationdb/fdb_c.h>
#include <foundationdb/fdb_c_options.g.h>
pthread_t netThread;
const int numKeys = 100;
uint8_t** keys = NULL;
#define KEY_SIZE 16
#define VALUE_SIZE 100
uint8_t valueStr[VALUE_SIZE];
fdb_error_t getSize(struct ResultSet* rs, FDBTransaction* tr, int64_t* out_size) {
fdb_error_t e;
FDBFuture* future = fdb_transaction_get_approximate_size(tr);
e = maybeLogError(fdb_future_block_until_ready(future), "waiting for get future", rs);
if (e) {
fdb_future_destroy(future);
return e;
}
e = maybeLogError(fdb_future_get_int64(future, out_size), "getting future value", rs);
if (e) {
fdb_future_destroy(future);
return e;
}
fdb_future_destroy(future);
return 0;
}
void runTests(struct ResultSet *rs) {
int64_t sizes[numKeys];
int i = 0, j = 0;
FDBDatabase *db = openDatabase(rs, &netThread);
FDBTransaction *tr = NULL;
fdb_error_t e = fdb_database_create_transaction(db, &tr);
checkError(e, "create transaction", rs);
memset(sizes, 0, numKeys * sizeof(uint32_t));
fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_set(tr, keys[i], KEY_SIZE, valueStr, VALUE_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_clear(tr, keys[i], KEY_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
fdb_transaction_clear_range(tr, keys[i], KEY_SIZE, keys[i+1], KEY_SIZE);
e = getSize(rs, tr, sizes + i);
checkError(e, "transaction get size", rs);
printf("size %d: %u\n", i, sizes[i]);
i++;
for (j = 0; j + 1 < i; j++) {
assert(sizes[j] < sizes[j + 1]);
}
printf("Test passed!\n");
}
int main(int argc, char **argv) {
srand(time(NULL));
struct ResultSet *rs = newResultSet();
checkError(fdb_select_api_version(620), "select API version", rs);
printf("Running performance test at client version: %s\n", fdb_get_client_version());
keys = generateKeys(numKeys, KEY_SIZE);
runTests(rs);
freeResultSet(rs);
freeKeys(keys, numKeys);
return 0;
}

View File

@ -20,12 +20,13 @@
#include "fdb_flow.h"
#include "flow/DeterministicRandom.h"
#include "flow/SystemMonitor.h"
#include <stdio.h>
#include <cinttypes>
#include "flow/DeterministicRandom.h"
#include "flow/SystemMonitor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace FDB;
THREAD_FUNC networkThread(void* fdb) {
@ -147,6 +148,7 @@ namespace FDB {
void setOption(FDBTransactionOption option, Optional<StringRef> value = Optional<StringRef>()) override;
Future<int64_t> getApproximateSize() override;
Future<Void> onError(Error const& e) override;
void cancel() override;
@ -290,7 +292,7 @@ namespace FDB {
return backToFuture<Version>( fdb_transaction_get_read_version( tr ), [](Reference<CFuture> f){
Version value;
throw_on_error( fdb_future_get_version( f->f, &value ) );
throw_on_error( fdb_future_get_int64( f->f, &value ) );
return value;
} );
@ -408,6 +410,14 @@ namespace FDB {
}
}
Future<int64_t> TransactionImpl::getApproximateSize() {
return backToFuture<int64_t>(fdb_transaction_get_approximate_size(tr), [](Reference<CFuture> f) {
int64_t size = 0;
throw_on_error(fdb_future_get_int64(f->f, &size));
return size;
});
}
Future<Void> TransactionImpl::onError(Error const& e) {
return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference<CFuture> f) {
throw_on_error( fdb_future_get_error( f->f ) );
@ -422,4 +432,5 @@ namespace FDB {
void TransactionImpl::reset() {
fdb_transaction_reset( tr );
}
}
} // namespace FDB

View File

@ -112,6 +112,7 @@ namespace FDB {
virtual Future<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual Future<int64_t> getApproximateSize() = 0;
virtual Future<FDBStandalone<StringRef>> getVersionstamp() = 0;
};

View File

@ -19,6 +19,7 @@
*/
#include "Tester.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace FDB;

View File

@ -18,16 +18,18 @@
* limitations under the License.
*/
#include "fdbrpc/fdbrpc.h"
#include "flow/DeterministicRandom.h"
#include "bindings/flow/Tuple.h"
#include "bindings/flow/FDBLoanerTypes.h"
#include "Tester.actor.h"
#include <cinttypes>
#ifdef __linux__
#include <string.h>
#endif
#include "bindings/flow/Tuple.h"
#include "bindings/flow/FDBLoanerTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/DeterministicRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Otherwise we have to type setupNetwork(), FDB::open(), etc.
using namespace FDB;
@ -292,7 +294,7 @@ ACTOR Future<Void> printFlowTesterStack(FlowTesterStack* stack) {
state int idx;
for (idx = stack->data.size() - 1; idx >= 0; --idx) {
Standalone<StringRef> value = wait(stack->data[idx].value);
// printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, printable(value).c_str());
// printf("==========stack item:%d, index:%d, value:%s\n", idx, stack->data[idx].index, value.printable().c_str());
}
return Void();
}
@ -703,6 +705,20 @@ struct GetCommittedVersionFunc : InstructionFunc {
const char* GetCommittedVersionFunc::name = "GET_COMMITTED_VERSION";
REGISTER_INSTRUCTION_FUNC(GetCommittedVersionFunc);
// GET_APPROXIMATE_SIZE
struct GetApproximateSizeFunc : InstructionFunc {
static const char* name;
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
int64_t _ = wait(instruction->tr->getApproximateSize());
(void) _; // disable unused variable warning
data->stack.pushTuple(LiteralStringRef("GOT_APPROXIMATE_SIZE"));
return Void();
}
};
const char* GetApproximateSizeFunc::name = "GET_APPROXIMATE_SIZE";
REGISTER_INSTRUCTION_FUNC(GetApproximateSizeFunc);
// GET_VERSIONSTAMP
struct GetVersionstampFunc : InstructionFunc {
static const char* name;
@ -1638,7 +1654,7 @@ ACTOR static Future<Void> doInstructions(Reference<FlowTesterData> data) {
op = op.substr(0, op.size() - 9);
// printf("[==========]%ld/%ld:%s:%s: isDatabase:%d, isSnapshot:%d, stack count:%ld\n",
// idx, data->instructions.size(), printable(StringRef(data->instructions[idx].key)).c_str(), printable(StringRef(data->instructions[idx].value)).c_str(),
// idx, data->instructions.size(), StringRef(data->instructions[idx].key).printable().c_str(), StringRef(data->instructions[idx].value).printable().c_str(),
// isDatabase, isSnapshot, data->stack.data.size());
//wait(printFlowTesterStack(&(data->stack)));

View File

@ -590,6 +590,9 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
panic(e)
}
sm.store(idx, []byte("GOT_COMMITTED_VERSION"))
case op == "GET_APPROXIMATE_SIZE":
_ = sm.currentTransaction().GetApproximateSize().MustGet()
sm.store(idx, []byte("GOT_APPROXIMATE_SIZE"))
case op == "GET_VERSIONSTAMP":
sm.store(idx, sm.currentTransaction().GetVersionstamp())
case op == "GET_KEY":

View File

@ -331,7 +331,7 @@ func (f *futureInt64) Get() (int64, error) {
f.BlockUntilReady()
var ver C.int64_t
if err := C.fdb_future_get_version(f.ptr, &ver); err != 0 {
if err := C.fdb_future_get_int64(f.ptr, &ver); err != 0 {
return 0, Error{int(err)}
}

View File

@ -372,6 +372,16 @@ func (t Transaction) GetVersionstamp() FutureKey {
return &futureKey{future: newFuture(C.fdb_transaction_get_versionstamp(t.ptr))}
}
func (t *transaction) getApproximateSize() FutureInt64 {
return &futureInt64{
future: newFuture(C.fdb_transaction_get_approximate_size(t.ptr)),
}
}
func (t Transaction) GetApproximateSize() FutureInt64 {
return t.getApproximateSize()
}
// Reset rolls back a transaction, completely resetting it to its initial
// state. This is logically equivalent to destroying the transaction and
// creating a new one.

View File

@ -25,11 +25,11 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/FDB.java
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTransaction.java
src/main/com/apple/foundationdb/FutureInt64.java
src/main/com/apple/foundationdb/FutureKey.java
src/main/com/apple/foundationdb/FutureResult.java
src/main/com/apple/foundationdb/FutureResults.java
src/main/com/apple/foundationdb/FutureStrings.java
src/main/com/apple/foundationdb/FutureVersion.java
src/main/com/apple/foundationdb/FutureVoid.java
src/main/com/apple/foundationdb/JNIUtil.java
src/main/com/apple/foundationdb/KeySelector.java

View File

@ -243,21 +243,21 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_NativeFuture_Future_1releaseM
fdb_future_release_memory(var);
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureVersion_FutureVersion_1get(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FutureInt64_FutureInt64_1get(JNIEnv *jenv, jobject, jlong future) {
if (!future) {
throwParamNotNull(jenv);
return 0;
}
FDBFuture *f = (FDBFuture *)future;
int64_t version = 0;
fdb_error_t err = fdb_future_get_version(f, &version);
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
int64_t value = 0;
fdb_error_t err = fdb_future_get_int64(f, &value);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return 0;
}
return (jlong)version;
return (jlong)value;
}
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureStrings_1get(JNIEnv *jenv, jobject, jlong future) {
@ -805,6 +805,15 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)version;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);
return 0;
}
FDBFuture* f = fdb_transaction_get_approximate_size((FDBTransaction*)tPtr);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getVersionstamp(JNIEnv *jenv, jobject, jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);

View File

@ -216,7 +216,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
public CompletableFuture<Long> getReadVersion() {
pointerReadLock.lock();
try {
return new FutureVersion(Transaction_getReadVersion(getPtr()), executor);
return new FutureInt64(Transaction_getReadVersion(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
@ -514,6 +514,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
}
}
@Override
public CompletableFuture<Long> getApproximateSize() {
pointerReadLock.lock();
try {
return new FutureInt64(Transaction_getApproximateSize(getPtr()), executor);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Void> watch(byte[] key) throws FDBException {
pointerReadLock.lock();
@ -642,6 +652,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
private native long Transaction_commit(long cPtr);
private native long Transaction_getCommittedVersion(long cPtr);
private native long Transaction_getVersionstamp(long cPtr);
private native long Transaction_getApproximateSize(long cPtr);
private native long Transaction_onError(long cPtr, int errorCode);
private native void Transaction_dispose(long cPtr);
private native void Transaction_reset(long cPtr);

View File

@ -1,9 +1,9 @@
/*
* FutureVersion.java
* FutureInt64.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,16 +22,16 @@ package com.apple.foundationdb;
import java.util.concurrent.Executor;
class FutureVersion extends NativeFuture<Long> {
FutureVersion(long cPtr, Executor executor) {
class FutureInt64 extends NativeFuture<Long> {
FutureInt64(long cPtr, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
}
@Override
protected Long getIfDone_internal(long cPtr) throws FDBException {
return FutureVersion_get(cPtr);
return FutureInt64_get(cPtr);
}
private native long FutureVersion_get(long cPtr) throws FDBException;
private native long FutureInt64_get(long cPtr) throws FDBException;
}

View File

@ -260,6 +260,15 @@ public interface Transaction extends AutoCloseable, ReadTransaction, Transaction
*/
CompletableFuture<byte[]> getVersionstamp();
/**
* Returns a future that will contain the approximated size of the commit, which is the
* summation of mutations, read conflict ranges, and write conflict ranges. This can be
* called multiple times before transaction commit.
*
* @return a future that will contain the approximated size of the commit.
*/
CompletableFuture<Long> getApproximateSize();
/**
* Resets a transaction and returns a delayed signal for error recovery. If the error
* encountered by the {@code Transaction} could not be recovered from, the returned

View File

@ -282,6 +282,11 @@ public class AsyncStackTester {
return AsyncUtil.DONE;
}
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
return inst.tr.getApproximateSize().thenAcceptAsync(size -> {
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
}, FDB.DEFAULT_EXECUTOR);
}
else if(op == StackOperation.GET_VERSIONSTAMP) {
try {
inst.push(inst.tr.getVersionstamp());

View File

@ -54,6 +54,7 @@ enum StackOperation {
GET_KEY,
GET_READ_VERSION,
GET_COMMITTED_VERSION,
GET_APPROXIMATE_SIZE,
GET_VERSIONSTAMP,
SET_READ_VERSION,
ON_ERROR,

View File

@ -261,6 +261,10 @@ public class StackTester {
inst.context.lastVersion = inst.tr.getCommittedVersion();
inst.push("GOT_COMMITTED_VERSION".getBytes());
}
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
Long size = inst.tr.getApproximateSize().join();
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
}
else if(op == StackOperation.GET_VERSIONSTAMP) {
inst.push(inst.tr.getVersionstamp());
}

View File

@ -405,7 +405,7 @@ class TransactionRead(_FDBBase):
def get_read_version(self):
"""Get the read version of the transaction."""
return FutureVersion(self.capi.fdb_transaction_get_read_version(self.tpointer))
return FutureInt64(self.capi.fdb_transaction_get_read_version(self.tpointer))
def get(self, key):
key = keyToBytes(key)
@ -541,6 +541,10 @@ class Transaction(TransactionRead):
self.capi.fdb_transaction_get_committed_version(self.tpointer, ctypes.byref(version))
return version.value
def get_approximate_size(self):
"""Get the approximate commit size of the transaction."""
return FutureInt64(self.capi.fdb_transaction_get_approximate_size(self.tpointer))
def get_versionstamp(self):
return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer))
@ -687,12 +691,12 @@ class FutureVoid(Future):
return None
class FutureVersion(Future):
class FutureInt64(Future):
def wait(self):
self.block_until_ready()
version = ctypes.c_int64()
self.capi.fdb_future_get_version(self.fpointer, ctypes.byref(version))
return version.value
value = ctypes.c_int64()
self.capi.fdb_future_get_int64(self.fpointer, ctypes.byref(value))
return value.value
class FutureKeyValueArray(Future):
@ -1359,9 +1363,9 @@ def init_c_api():
_capi.fdb_future_get_error.restype = int
_capi.fdb_future_get_error.errcheck = check_error_code
_capi.fdb_future_get_version.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)]
_capi.fdb_future_get_version.restype = ctypes.c_int
_capi.fdb_future_get_version.errcheck = check_error_code
_capi.fdb_future_get_int64.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int64)]
_capi.fdb_future_get_int64.restype = ctypes.c_int
_capi.fdb_future_get_int64.errcheck = check_error_code
_capi.fdb_future_get_key.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_byte)),
ctypes.POINTER(ctypes.c_int)]
@ -1453,6 +1457,9 @@ def init_c_api():
_capi.fdb_transaction_get_committed_version.restype = ctypes.c_int
_capi.fdb_transaction_get_committed_version.errcheck = check_error_code
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
_capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p

View File

@ -68,9 +68,31 @@ def test_size_limit_option(db):
except fdb.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
@fdb.transactional
def test_get_approximate_size(tr):
tr[b'key1'] = b'value1'
s1 = tr.get_approximate_size().wait()
tr[b'key2'] = b'value2'
s2 = tr.get_approximate_size().wait()
assert(s1 < s2)
tr.clear(b'key3')
s3 = tr.get_approximate_size().wait()
assert(s2 < s3)
tr.add_read_conflict_key(b'key3+')
s4 = tr.get_approximate_size().wait()
assert(s3 < s4)
tr.add_write_conflict_key(b'key4')
s5 = tr.get_approximate_size().wait()
assert(s4 < s5)
# Expect a cluster file as input. This test will write to the FDB cluster, so
# be aware of potential side effects.
if __name__ == '__main__':
clusterFile = sys.argv[1]
db = fdb.open(clusterFile)
test_size_limit_option(db)
test_get_approximate_size(db)

View File

@ -48,7 +48,7 @@ from cancellation_timeout_tests import test_retry_limits
from cancellation_timeout_tests import test_db_retry_limits
from cancellation_timeout_tests import test_combinations
from size_limit_tests import test_size_limit_option
from size_limit_tests import test_size_limit_option, test_get_approximate_size
random.seed(0)
@ -478,6 +478,9 @@ class Tester:
elif inst.op == six.u("GET_COMMITTED_VERSION"):
self.last_version = inst.tr.get_committed_version()
inst.push(b"GOT_COMMITTED_VERSION")
elif inst.op == six.u("GET_APPROXIMATE_SIZE"):
approximate_size = inst.tr.get_approximate_size().wait()
inst.push(b"GOT_APPROXIMATE_SIZE")
elif inst.op == six.u("GET_VERSIONSTAMP"):
inst.push(inst.tr.get_versionstamp())
elif inst.op == six.u("TUPLE_PACK"):
@ -561,6 +564,7 @@ class Tester:
test_predicates()
test_size_limit_option(db)
test_get_approximate_size(db)
except fdb.FDBError as e:
print("Unit tests failed: %s" % e.description)

View File

@ -85,7 +85,7 @@ module FDB
attach_function :fdb_future_set_callback, [ :pointer, :fdb_future_callback, :pointer ], :fdb_error
attach_function :fdb_future_get_error, [ :pointer ], :fdb_error
attach_function :fdb_future_get_version, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_int64, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_key, [ :pointer, :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_value, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error
attach_function :fdb_future_get_keyvalue_array, [ :pointer, :pointer, :pointer, :pointer ], :fdb_error
@ -114,6 +114,7 @@ module FDB
attach_function :fdb_transaction_watch, [ :pointer, :pointer, :int ], :pointer
attach_function :fdb_transaction_commit, [ :pointer ], :pointer
attach_function :fdb_transaction_get_committed_version, [ :pointer, :pointer ], :fdb_error
attach_function :fdb_transaction_get_approximate_size, [ :pointer ], :pointer
attach_function :fdb_transaction_get_versionstamp, [ :pointer ], :pointer
attach_function :fdb_transaction_on_error, [ :pointer, :fdb_error ], :pointer
attach_function :fdb_transaction_reset, [ :pointer ], :void
@ -443,11 +444,11 @@ module FDB
end
end
class Version < LazyFuture
class Int64Future < LazyFuture
def getter
version = FFI::MemoryPointer.new :int64
FDBC.check_error FDBC.fdb_future_get_version(@fpointer, version)
@value = version.read_long_long
val = FFI::MemoryPointer.new :int64
FDBC.check_error FDBC.fdb_future_get_int64(@fpointer, val)
@value = val.read_long_long
end
private :getter
end
@ -687,7 +688,7 @@ module FDB
end
def get_read_version
Version.new(FDBC.fdb_transaction_get_read_version @tpointer)
Int64Future.new(FDBC.fdb_transaction_get_read_version @tpointer)
end
def get(key)
@ -904,6 +905,10 @@ module FDB
version.read_long_long
end
def get_approximate_size
Int64Future.new(FDBC.fdb_transaction_get_approximate_size @tpointer)
end
def get_versionstamp
Key.new(FDBC.fdb_transaction_get_versionstamp(@tpointer))
end

View File

@ -381,6 +381,9 @@ class Tester
when "GET_COMMITTED_VERSION"
@last_version = inst.tr.get_committed_version
inst.push("GOT_COMMITTED_VERSION")
when "GET_APPROXIMATE_SIZE"
size = inst.tr.get_approximate_size.to_i
inst.push("GOT_APPROXIMATE_SIZE")
when "GET_VERSIONSTAMP"
inst.push(inst.tr.get_versionstamp)
when "TUPLE_PACK"

View File

@ -291,9 +291,9 @@ See :ref:`developer-guide-programming-with-futures` for further (language-indepe
|future-get-return1|.
.. function:: fdb_error_t fdb_future_get_version(FDBFuture* future, int64_t* out_version)
.. function:: fdb_error_t fdb_future_get_int64(FDBFuture* future, int64_t* out)
Extracts a version from an :type:`FDBFuture` into a caller-provided variable of type ``int64_t``. |future-warning|
Extracts a 64-bit integer from an :type:`FDBFuture*` into a caller-provided variable of type ``int64_t``. |future-warning|
|future-get-return1| |future-get-return2|.
@ -447,7 +447,7 @@ Applications must provide error handling and an appropriate retry loop around th
.. function:: FDBFuture* fdb_transaction_get_read_version(FDBTransaction* transaction)
|future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_version()` to extract the version into an int64_t that you provide, |future-return2|
|future-return0| the transaction snapshot read version. |future-return1| call :func:`fdb_future_get_int64()` to extract the version into an int64_t that you provide, |future-return2|
The transaction obtains a snapshot read version automatically at the time of the first call to :func:`fdb_transaction_get_*()` (including this one) and (unless causal consistency has been deliberately compromised by transaction options) is guaranteed to represent all transactions which were reported committed before that call.
@ -720,6 +720,12 @@ Applications must provide error handling and an appropriate retry loop around th
Most applications will not call this function.
.. function:: FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* transaction)
|future-return0| the approximate transaction size so far in the returned future, which is the summation of the estimated size of mutations, read conflict ranges, and write conflict ranges. |future-return1| call :func:`fdb_future_get_int64()` to extract the size, |future-return2|
This can be called multiple times before the transaction is committed.
.. function:: FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* transaction)
|future-return0| the versionstamp which was used by any versionstamp operations in this transaction. |future-return1| call :func:`fdb_future_get_key()` to extract the key, |future-return2|

View File

@ -28,6 +28,8 @@ Status
Bindings
--------
* Added a new API to get the approximated transaction size before commit, e.g., ``fdb_transaction_get_approximate_size`` in the C binding. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* C: ``fdb_future_get_version`` has been renamed to ``fdb_future_get_int64``. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
* Go: The Go bindings now require Go version 1.11 or later.
* Go: Fix issue with finalizers running too early that could lead to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
* Added transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.

View File

@ -843,4 +843,6 @@ public:
return updateErrorInfo(cx, e, details);
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -61,6 +61,7 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) = 0;

View File

@ -47,7 +47,7 @@ ThreadFuture<Version> DLTransaction::getReadVersion() {
return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t version;
FdbCApi::fdb_error_t error = api->futureGetVersion(f, &version);
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &version);
ASSERT(!error);
return version;
});
@ -195,6 +195,20 @@ Version DLTransaction::getCommittedVersion() {
return version;
}
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
if(!api->transactionGetApproximateSize) {
return unsupported_operation();
}
FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr);
return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t size = 0;
FdbCApi::fdb_error_t error = api->futureGetInt64(f, &size);
ASSERT(!error);
return size;
});
}
void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
}
@ -287,6 +301,7 @@ void DLApi::init() {
loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 620);
loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
@ -294,7 +309,7 @@ void DLApi::init() {
loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database");
loadClientFunction(&api->futureGetVersion, lib, fdbCPath, "fdb_future_get_version");
loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version");
loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
@ -595,6 +610,12 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());
return abortableFuture(f, tr.onChange);
}
void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
auto itr = FDBTransactionOptions::optionInfo.find(option);
if(itr == FDBTransactionOptions::optionInfo.end()) {

View File

@ -84,6 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
FDBFuture* (*transactionCommit)(FDBTransaction *tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr);
FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error);
void (*transactionReset)(FDBTransaction *tr);
@ -94,7 +95,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
//Future
fdb_error_t (*futureGetDatabase)(FDBFuture *f, FDBDatabase **outDb);
fdb_error_t (*futureGetVersion)(FDBFuture *f, int64_t *outVersion);
fdb_error_t (*futureGetInt64)(FDBFuture *f, int64_t *outValue);
fdb_error_t (*futureGetError)(FDBFuture *f);
fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength);
fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength);
@ -116,41 +117,42 @@ public:
DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction *tr) : api(api), tr(tr) {}
~DLTransaction() { api->transactionDestroy(tr); }
void cancel();
void setVersion(Version v);
ThreadFuture<Version> getReadVersion();
void cancel() override;
void setVersion(Version v) override;
ThreadFuture<Version> getReadVersion() override;
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
ThreadFuture<Standalone<StringRef>> getVersionstamp();
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
void addReadConflictRange(const KeyRangeRef& keys);
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
void set(const KeyRef& key, const ValueRef& value);
void clear(const KeyRef& begin, const KeyRef& end);
void clear(const KeyRangeRef& range);
void clear(const KeyRef& key);
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
void clear(const KeyRef& begin, const KeyRef& end) override;
void clear(const KeyRangeRef& range) override;
void clear(const KeyRef& key) override;
ThreadFuture<Void> watch(const KeyRef& key);
ThreadFuture<Void> watch(const KeyRef& key) override;
void addWriteConflictRange(const KeyRangeRef& keys);
void addWriteConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<Void> commit();
Version getCommittedVersion();
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
ThreadFuture<Void> onError(Error const& e);
void reset();
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
void addref() { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
void delref() { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
void addref() override { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
private:
const Reference<FdbCApi> api;
@ -165,11 +167,11 @@ public:
ThreadFuture<Void> onReady();
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void addref() { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
private:
const Reference<FdbCApi> api;
@ -181,18 +183,18 @@ class DLApi : public IClientApi {
public:
DLApi(std::string fdbCPath);
void selectApiVersion(int apiVersion);
const char* getClientVersion();
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void setupNetwork();
void runNetwork();
void stopNetwork();
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
void runNetwork() override;
void stopNetwork() override;
Reference<IDatabase> createDatabase(const char *clusterFilePath);
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
Reference<IDatabase> createDatabase609(const char *clusterFilePath); // legacy database creation
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
private:
const std::string fdbCPath;
@ -212,41 +214,42 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<
public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db, UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions);
void cancel();
void setVersion(Version v);
ThreadFuture<Version> getReadVersion();
void cancel() override;
void setVersion(Version v) override;
ThreadFuture<Version> getReadVersion() override;
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
ThreadFuture<Standalone<StringRef>> getVersionstamp();
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
void addReadConflictRange(const KeyRangeRef& keys);
void addReadConflictRange(const KeyRangeRef& keys) override;
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
void set(const KeyRef& key, const ValueRef& value);
void clear(const KeyRef& begin, const KeyRef& end);
void clear(const KeyRangeRef& range);
void clear(const KeyRef& key);
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
void set(const KeyRef& key, const ValueRef& value) override;
void clear(const KeyRef& begin, const KeyRef& end) override;
void clear(const KeyRangeRef& range) override;
void clear(const KeyRef& key) override;
ThreadFuture<Void> watch(const KeyRef& key);
ThreadFuture<Void> watch(const KeyRef& key) override;
void addWriteConflictRange(const KeyRangeRef& keys);
void addWriteConflictRange(const KeyRangeRef& keys) override;
ThreadFuture<Void> commit();
Version getCommittedVersion();
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
ThreadFuture<Void> onError(Error const& e);
void reset();
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
void addref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
void addref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
private:
const Reference<MultiVersionDatabase> db;
@ -289,11 +292,11 @@ public:
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
~MultiVersionDatabase();
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void addref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
@ -354,16 +357,16 @@ private:
class MultiVersionApi : public IClientApi {
public:
void selectApiVersion(int apiVersion);
const char* getClientVersion();
void selectApiVersion(int apiVersion) override;
const char* getClientVersion() override;
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void setupNetwork();
void runNetwork();
void stopNetwork();
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
void setupNetwork() override;
void runNetwork() override;
void stopNetwork() override;
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
Reference<IDatabase> createDatabase(const char *clusterFilePath);
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
static MultiVersionApi* api;
Reference<ClientInfo> getLocalClient();

View File

@ -2824,7 +2824,7 @@ Future<Void> Transaction::commitMutations() {
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
size_t transactionSize = getSize();
if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
.suppressFor(1.0)
@ -3191,6 +3191,12 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();
return s;
}
Future<Void> Transaction::onError( Error const& e ) {
if (e.code() == error_code_success) {
return client_invalid_operation();

View File

@ -287,6 +287,7 @@ public:
Promise<Standalone<StringRef>> versionstampPromise;
uint32_t getSize();
Future<Void> onError( Error const& e );
void flushTrLogsIfEnabled();

View File

@ -990,9 +990,10 @@ public:
if(!ryw->options.readYourWritesDisabled) {
ryw->watchMap[key].push_back(watch);
val = readWithConflictRange( ryw, GetValueReq(key), false );
}
else
} else {
ryw->approximateSize += 2 * key.expectedSize() + 1;
val = ryw->tr.get(key);
}
try {
wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture());
@ -1045,7 +1046,7 @@ public:
return Void();
}
ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), allKeys.end));
auto conflictRanges = ryw->readConflicts.ranges();
@ -1123,8 +1124,11 @@ public:
}
};
ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
options(tr), deferredError(cx->deferredError) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions));
applyPersistentOptions();
}
@ -1367,6 +1371,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys )
}
if(options.readYourWritesDisabled) {
approximateSize += r.expectedSize() + sizeof(KeyRangeRef);
tr.addReadConflictRange(r);
return;
}
@ -1381,6 +1386,7 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap::
//it.skip( key );
//ASSERT( it.beginKey() <= key && key < it.endKey() );
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
approximateSize += 2 * key.expectedSize() + 1 + sizeof(KeyRangeRef);
readConflicts.insert( singleKeyRange( key, arena ), true );
}
}
@ -1391,13 +1397,15 @@ void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, Writ
for(; it.beginKey() < keys.end; ++it ) {
if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) );
if( !insert_range.empty() )
if (!insert_range.empty()) {
approximateSize += keys.expectedSize() + sizeof(KeyRangeRef);
readConflicts.insert( insert_range, true );
}
}
}
}
void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) {
void ReadYourWritesTransaction::writeRangeToNativeTransaction(KeyRangeRef const& keys) {
WriteMap::iterator it( &writes );
it.skip(keys.begin);
@ -1410,12 +1418,12 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inClearRange = true;
} else if( !it.is_cleared_range() && inClearRange ) {
tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false );
tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena)), false);
inClearRange = false;
}
}
if( inClearRange ) {
if (inClearRange) {
tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false);
}
@ -1429,7 +1437,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
inConflictRange = true;
} else if( !it.is_conflict_range() && inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) );
tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena)));
inConflictRange = false;
}
@ -1440,9 +1448,9 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
switch(op[i].type) {
case MutationRef::SetValue:
if (op[i].value.present()) {
tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
tr.set(it.beginKey().assertRef(), op[i].value.get(), false);
} else {
tr.clear( it.beginKey().assertRef(), false );
tr.clear(it.beginKey().assertRef(), false);
}
break;
case MutationRef::AddValue:
@ -1469,7 +1477,7 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
}
if( inConflictRange ) {
tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) );
tr.addWriteConflictRange(KeyRangeRef(conflictBegin.toArenaOrRef(arena), keys.end));
}
}
@ -1574,7 +1582,9 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
throw client_invalid_operation();
}
if(options.readYourWritesDisabled) {
approximateSize += k.expectedSize() + v.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0);
if (options.readYourWritesDisabled) {
return tr.atomicOp(k, v, (MutationRef::Type) operationType, addWriteConflict);
}
@ -1604,7 +1614,9 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
if(options.readYourWritesDisabled ) {
approximateSize += key.expectedSize() + value.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + 2 * key.expectedSize() + 1 : 0);
if (options.readYourWritesDisabled) {
return tr.set(key, value, addWriteConflict);
}
@ -1632,10 +1644,12 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
if(range.begin > maxKey || range.end > maxKey)
throw key_outside_legal_range();
if( options.readYourWritesDisabled ) {
approximateSize += range.expectedSize() + sizeof(MutationRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + range.expectedSize() : 0);
if (options.readYourWritesDisabled) {
return tr.clear(range, addWriteConflict);
}
//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
//we can translate it to an equivalent one with smaller keys
KeyRef begin = range.begin;
@ -1676,6 +1690,8 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
}
KeyRangeRef r = singleKeyRange( key, arena );
approximateSize += r.expectedSize() + sizeof(KeyRangeRef) +
(addWriteConflict ? sizeof(KeyRangeRef) + r.expectedSize() : 0);
//SOMEDAY: add an optimized single key clear to write map
writes.clear(r, addWriteConflict);
@ -1703,7 +1719,7 @@ Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
return RYWImpl::watch(this, key);
}
void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) {
void ReadYourWritesTransaction::addWriteConflictRange(KeyRangeRef const& keys) {
if(checkUsedDuringCommit()) {
throw used_during_commit();
}
@ -1730,6 +1746,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys )
return;
}
approximateSize += r.expectedSize() + sizeof(KeyRangeRef);
if(options.readYourWritesDisabled) {
tr.addWriteConflictRange(r);
return;
@ -1854,6 +1871,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
r.resetPromise = Promise<Void>();
deferredError = std::move( r.deferredError );
retries = r.retries;
approximateSize = r.approximateSize;
timeoutActor = r.timeoutActor;
creationTime = r.creationTime;
commitStarted = r.commitStarted;
@ -1870,6 +1888,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
arena( std::move(r.arena) ),
reading( std::move(r.reading) ),
retries( r.retries ),
approximateSize(r.approximateSize),
creationTime( r.creationTime ),
deferredError( std::move(r.deferredError) ),
timeoutActor( std::move(r.timeoutActor) ),
@ -1921,6 +1940,7 @@ void ReadYourWritesTransaction::resetRyow() {
readConflicts = CoalescedKeyRefRangeMap<bool>();
watchMap.clear();
reading = AndFuture();
approximateSize = 0;
commitStarted = false;
deferredError = Error();
@ -1941,6 +1961,7 @@ void ReadYourWritesTransaction::cancel() {
void ReadYourWritesTransaction::reset() {
retries = 0;
approximateSize = 0;
creationTime = now();
timeoutActor.cancel();
persistentOptions.clear();

View File

@ -98,6 +98,7 @@ public:
Future<Void> commit();
Version getCommittedVersion() { return tr.getCommittedVersion(); }
int64_t getApproximateSize() { return approximateSize; }
Future<Standalone<StringRef>> getVersionstamp();
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
@ -140,6 +141,7 @@ private:
Promise<Void> resetPromise;
AndFuture reading;
int retries;
int64_t approximateSize;
Future<Void> timeoutActor;
double creationTime;
bool commitStarted;
@ -149,7 +151,7 @@ private:
void resetTimeout();
void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key)
void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena
void writeRangeToNativeTransaction( KeyRangeRef const& keys );
void writeRangeToNativeTransaction(KeyRangeRef const& keys);
void resetRyow(); // doesn't reset the encapsulated transaction, or creation time/retry state
KeyRef getMaxReadKey();

View File

@ -271,8 +271,12 @@ ThreadFuture< Void > ThreadSafeTransaction::commit() {
Version ThreadSafeTransaction::getCommittedVersion() {
// This should be thread safe when called legally, but it is fragile
Version v = tr->getCommittedVersion();
return v;
return tr->getCommittedVersion();
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
ReadYourWritesTransaction *tr = this->tr;
return onMainThread([tr]() -> Future<int64_t> { return tr->getApproximateSize(); });
}
ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {

View File

@ -55,54 +55,54 @@ public:
explicit ThreadSafeTransaction(DatabaseContext* cx);
~ThreadSafeTransaction();
void cancel();
void setVersion( Version v );
ThreadFuture<Version> getReadVersion();
void cancel() override;
void setVersion( Version v ) override;
ThreadFuture<Version> getReadVersion() override;
ThreadFuture< Optional<Value> > get( const KeyRef& key, bool snapshot = false );
ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false );
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false );
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false );
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) {
ThreadFuture< Optional<Value> > get( const KeyRef& key, bool snapshot = false ) override;
ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false ) override;
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false ) override;
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override;
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) override {
return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limit, snapshot, reverse );
}
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) {
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override {
return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse );
}
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
void addReadConflictRange( const KeyRangeRef& keys );
void addReadConflictRange( const KeyRangeRef& keys ) override;
void makeSelfConflicting();
void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType );
void set( const KeyRef& key, const ValueRef& value );
void clear( const KeyRef& begin, const KeyRef& end);
void clear( const KeyRangeRef& range );
void clear( const KeyRef& key );
void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ) override;
void set( const KeyRef& key, const ValueRef& value ) override;
void clear( const KeyRef& begin, const KeyRef& end) override;
void clear( const KeyRangeRef& range ) override;
void clear( const KeyRef& key ) override;
ThreadFuture< Void > watch( const KeyRef& key );
ThreadFuture< Void > watch( const KeyRef& key ) override;
void addWriteConflictRange( const KeyRangeRef& keys );
void addWriteConflictRange( const KeyRangeRef& keys ) override;
ThreadFuture<Void> commit();
Version getCommittedVersion();
ThreadFuture<Standalone<StringRef>> getVersionstamp();
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ) override;
ThreadFuture<Void> checkDeferredError();
ThreadFuture<Void> onError( Error const& e );
ThreadFuture<Void> onError( Error const& e ) override;
// These are to permit use as state variables in actors:
ThreadSafeTransaction() : tr(NULL) {}
void operator=(ThreadSafeTransaction&& r) BOOST_NOEXCEPT;
ThreadSafeTransaction(ThreadSafeTransaction&& r) BOOST_NOEXCEPT;
void reset();
void reset() override;
void addref() { ThreadSafeReferenceCounted<ThreadSafeTransaction>::addref(); }
void delref() { ThreadSafeReferenceCounted<ThreadSafeTransaction>::delref(); }
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTransaction>::delref(); }
private:
ReadYourWritesTransaction *tr;

View File

@ -67,4 +67,5 @@ void unregisterTLog(UID uid);
// checks if there is any non-stopped TLog instance
bool isTLogInSameNode();
#include "flow/unactorcompiler.h"
#endif