Add binding test for getApproximateSize API
This commit is contained in:
parent
cc47641bd3
commit
0ad2d2d16e
|
@ -277,6 +277,10 @@ futures must apply the following rules to the result:
|
|||
internal stack machine state as the last seen version. Pushes the byte
|
||||
string "GOT_COMMITTED_VERSION" onto the stack.
|
||||
|
||||
#### GET_APPROXIMATE_SIZE
|
||||
|
||||
Calls get_approximate_size and pushes the resulting future onto the stack.
|
||||
|
||||
#### WAIT_FUTURE
|
||||
|
||||
Pops the top item off the stack and pushes it back on. If the top item on
|
||||
|
|
|
@ -156,6 +156,7 @@ class ApiTest(Test):
|
|||
resets = ['ON_ERROR', 'RESET', 'CANCEL']
|
||||
read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY']
|
||||
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
|
||||
txn_sizes = ['GET_APPROXIMATE_SIZE']
|
||||
|
||||
op_choices += reads
|
||||
op_choices += mutations
|
||||
|
@ -168,6 +169,7 @@ class ApiTest(Test):
|
|||
op_choices += read_conflicts
|
||||
op_choices += write_conflicts
|
||||
op_choices += resets
|
||||
op_choices += txn_sizes
|
||||
|
||||
idempotent_atomic_ops = [u'BIT_AND', u'BIT_OR', u'MAX', u'MIN', u'BYTE_MIN', u'BYTE_MAX']
|
||||
atomic_ops = idempotent_atomic_ops + [u'ADD', u'BIT_XOR', u'APPEND_IF_FITS']
|
||||
|
@ -434,6 +436,10 @@ class ApiTest(Test):
|
|||
self.can_set_version = True
|
||||
self.can_use_key_selectors = True
|
||||
|
||||
elif op == 'GET_APPROXIMATE_SIZE':
|
||||
instructions.append(op)
|
||||
self.add_strings(1)
|
||||
|
||||
elif op == 'TUPLE_PACK' or op == 'TUPLE_RANGE':
|
||||
tup = self.random.random_tuple(10)
|
||||
instructions.push_args(len(tup), *tup)
|
||||
|
|
|
@ -704,6 +704,19 @@ struct GetCommittedVersionFunc : InstructionFunc {
|
|||
const char* GetCommittedVersionFunc::name = "GET_COMMITTED_VERSION";
|
||||
REGISTER_INSTRUCTION_FUNC(GetCommittedVersionFunc);
|
||||
|
||||
// GET_APPROXIMATE_SIZE
|
||||
struct GetApproximateSizeFunc : InstructionFunc {
|
||||
static const char* name;
|
||||
|
||||
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
|
||||
int64_t size = wait(instruction->tr->getApproximateSize());
|
||||
data->stack.pushTuple(LiteralStringRef("GOT_APPROXIMATE_SIZE"));
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
const char* GetApproximateSizeFunc::name = "GET_APPROXIMATE_SIZE";
|
||||
REGISTER_INSTRUCTION_FUNC(GetApproximateSizeFunc);
|
||||
|
||||
// GET_VERSIONSTAMP
|
||||
struct GetVersionstampFunc : InstructionFunc {
|
||||
static const char* name;
|
||||
|
|
|
@ -75,13 +75,14 @@ type stackEntry struct {
|
|||
}
|
||||
|
||||
type StackMachine struct {
|
||||
prefix []byte
|
||||
trName string
|
||||
stack []stackEntry
|
||||
lastVersion int64
|
||||
threads sync.WaitGroup
|
||||
verbose bool
|
||||
de *DirectoryExtension
|
||||
prefix []byte
|
||||
trName string
|
||||
stack []stackEntry
|
||||
lastVersion int64
|
||||
threads sync.WaitGroup
|
||||
verbose bool
|
||||
de *DirectoryExtension
|
||||
approximateSize int64
|
||||
}
|
||||
|
||||
func newStackMachine(prefix []byte, verbose bool) *StackMachine {
|
||||
|
@ -590,6 +591,9 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
|
|||
panic(e)
|
||||
}
|
||||
sm.store(idx, []byte("GOT_COMMITTED_VERSION"))
|
||||
case op == "GET_APPROXIMATE_SIZE":
|
||||
sm.approximateSize = sm.currentTransaction().GetApproximateSize().MustGet()
|
||||
sm.store(idx, []byte("GOT_APPROXIMATE_SIZE"))
|
||||
case op == "GET_VERSIONSTAMP":
|
||||
sm.store(idx, sm.currentTransaction().GetVersionstamp())
|
||||
case op == "GET_KEY":
|
||||
|
|
|
@ -372,6 +372,16 @@ func (t Transaction) GetVersionstamp() FutureKey {
|
|||
return &futureKey{future: newFuture(C.fdb_transaction_get_versionstamp(t.ptr))}
|
||||
}
|
||||
|
||||
func (t *transaction) getApproximateSize() FutureInt64 {
|
||||
return &futureInt64{
|
||||
future: newFuture(C.fdb_transaction_get_approximate_size(t.ptr)),
|
||||
}
|
||||
}
|
||||
|
||||
func (t Transaction) GetApproximateSize() FutureInt64 {
|
||||
return t.getApproximateSize()
|
||||
}
|
||||
|
||||
// Reset rolls back a transaction, completely resetting it to its initial
|
||||
// state. This is logically equivalent to destroying the transaction and
|
||||
// creating a new one.
|
||||
|
|
|
@ -805,19 +805,13 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
|||
return (jlong)version;
|
||||
}
|
||||
|
||||
JNIEXPORT uint32_t JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) {
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) {
|
||||
if (!tPtr) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTransaction* tr = (FDBTransaction*)tPtr;
|
||||
uint32_t size;
|
||||
fdb_error_t err = fdb_transaction_get_approximate_size(tr, &size);
|
||||
if (err) {
|
||||
safeThrow(jenv, getThrowable(jenv, err));
|
||||
return 0;
|
||||
}
|
||||
return size;
|
||||
FDBFuture* f = fdb_transaction_get_approximate_size((FDBTransaction*)tPtr);
|
||||
return (jlong)f;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getVersionstamp(JNIEnv *jenv, jobject, jlong tPtr) {
|
||||
|
|
|
@ -514,6 +514,16 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> getApproximateSize() {
|
||||
pointerReadLock.lock();
|
||||
try {
|
||||
return new FutureVersion(Transaction_getApproximateSize(getPtr()), executor);
|
||||
} finally {
|
||||
pointerReadLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> watch(byte[] key) throws FDBException {
|
||||
pointerReadLock.lock();
|
||||
|
@ -642,6 +652,7 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
|
|||
private native long Transaction_commit(long cPtr);
|
||||
private native long Transaction_getCommittedVersion(long cPtr);
|
||||
private native long Transaction_getVersionstamp(long cPtr);
|
||||
private native long Transaction_getApproximateSize(long cPtr);
|
||||
private native long Transaction_onError(long cPtr, int errorCode);
|
||||
private native void Transaction_dispose(long cPtr);
|
||||
private native void Transaction_reset(long cPtr);
|
||||
|
|
|
@ -260,6 +260,15 @@ public interface Transaction extends AutoCloseable, ReadTransaction, Transaction
|
|||
*/
|
||||
CompletableFuture<byte[]> getVersionstamp();
|
||||
|
||||
/**
|
||||
* Returns a future that will contain the approximated size of the commit, which is the
|
||||
* summation of mutations, read conflict ranges, and write conflict ranges. This can be
|
||||
* called multiple times before transaction commit.
|
||||
*
|
||||
* @return a future that will contain the approximated size of the commit.
|
||||
*/
|
||||
CompletableFuture<Long> getApproximateSize();
|
||||
|
||||
/**
|
||||
* Resets a transaction and returns a delayed signal for error recovery. If the error
|
||||
* encountered by the {@code Transaction} could not be recovered from, the returned
|
||||
|
|
|
@ -282,6 +282,12 @@ public class AsyncStackTester {
|
|||
|
||||
return AsyncUtil.DONE;
|
||||
}
|
||||
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
|
||||
return inst.tr.getApproximateSize().thenAcceptAsync(size -> {
|
||||
inst.context.approximateSize = size;
|
||||
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
}
|
||||
else if(op == StackOperation.GET_VERSIONSTAMP) {
|
||||
try {
|
||||
inst.push(inst.tr.getVersionstamp());
|
||||
|
|
|
@ -46,6 +46,7 @@ abstract class Context implements Runnable, AutoCloseable {
|
|||
int instructionIndex = 0;
|
||||
KeySelector nextKey, endKey;
|
||||
Long lastVersion = null;
|
||||
Long approximateSize = null;
|
||||
|
||||
private String trName;
|
||||
private List<Thread> children = new LinkedList<>();
|
||||
|
|
|
@ -54,6 +54,7 @@ enum StackOperation {
|
|||
GET_KEY,
|
||||
GET_READ_VERSION,
|
||||
GET_COMMITTED_VERSION,
|
||||
GET_APPROXIMATE_SIZE,
|
||||
GET_VERSIONSTAMP,
|
||||
SET_READ_VERSION,
|
||||
ON_ERROR,
|
||||
|
|
|
@ -261,6 +261,10 @@ public class StackTester {
|
|||
inst.context.lastVersion = inst.tr.getCommittedVersion();
|
||||
inst.push("GOT_COMMITTED_VERSION".getBytes());
|
||||
}
|
||||
else if(op == StackOperation.GET_APPROXIMATE_SIZE) {
|
||||
inst.context.approximateSize = inst.tr.getApproximateSize().join();
|
||||
inst.push("GOT_APPROXIMATE_SIZE".getBytes());
|
||||
}
|
||||
else if(op == StackOperation.GET_VERSIONSTAMP) {
|
||||
inst.push(inst.tr.getVersionstamp());
|
||||
}
|
||||
|
|
|
@ -255,6 +255,7 @@ class Tester:
|
|||
self.tr_name = prefix
|
||||
Tester.tr_map[self.tr_name] = None
|
||||
self.last_version = 0
|
||||
self.approximate_size = 0
|
||||
|
||||
self.threads = []
|
||||
self.directory_extension = DirectoryExtension()
|
||||
|
@ -475,6 +476,9 @@ class Tester:
|
|||
elif inst.op == six.u("GET_COMMITTED_VERSION"):
|
||||
self.last_version = inst.tr.get_committed_version()
|
||||
inst.push(b"GOT_COMMITTED_VERSION")
|
||||
elif inst.op == six.u("GET_APPROXIMATE_SIZE"):
|
||||
self.approximate_size = inst.tr.get_approximate_size().wait()
|
||||
inst.push(b"GOT_APPROXIMATE_SIZE")
|
||||
elif inst.op == six.u("GET_VERSIONSTAMP"):
|
||||
inst.push(inst.tr.get_versionstamp())
|
||||
elif inst.op == six.u("TUPLE_PACK"):
|
||||
|
|
|
@ -114,6 +114,7 @@ module FDB
|
|||
attach_function :fdb_transaction_watch, [ :pointer, :pointer, :int ], :pointer
|
||||
attach_function :fdb_transaction_commit, [ :pointer ], :pointer
|
||||
attach_function :fdb_transaction_get_committed_version, [ :pointer, :pointer ], :fdb_error
|
||||
attach_function :fdb_transaction_get_approximate_size, [ :pointer ], :pointer
|
||||
attach_function :fdb_transaction_get_versionstamp, [ :pointer ], :pointer
|
||||
attach_function :fdb_transaction_on_error, [ :pointer, :fdb_error ], :pointer
|
||||
attach_function :fdb_transaction_reset, [ :pointer ], :void
|
||||
|
@ -904,6 +905,10 @@ module FDB
|
|||
version.read_long_long
|
||||
end
|
||||
|
||||
def get_approximate_size
|
||||
Version.new(FDBC.fdb_transaction_get_approximate_size @tpointer)
|
||||
end
|
||||
|
||||
def get_versionstamp
|
||||
Key.new(FDBC.fdb_transaction_get_versionstamp(@tpointer))
|
||||
end
|
||||
|
|
|
@ -138,6 +138,7 @@ class Tester
|
|||
@stack = Stack.new
|
||||
@tr_name = prefix
|
||||
@last_version = nil
|
||||
@approximate_size = nil
|
||||
|
||||
@threads = []
|
||||
@directory_extension = DirectoryExtension::DirectoryTester.new
|
||||
|
@ -381,6 +382,9 @@ class Tester
|
|||
when "GET_COMMITTED_VERSION"
|
||||
@last_version = inst.tr.get_committed_version
|
||||
inst.push("GOT_COMMITTED_VERSION")
|
||||
when "GET_APPROXIMATE_SIZE"
|
||||
@approximate_size = inst.tr.get_approximate_size.to_i
|
||||
inst.push("GOT_APPROXIMATE_SIZE")
|
||||
when "GET_VERSIONSTAMP"
|
||||
inst.push(inst.tr.get_versionstamp)
|
||||
when "TUPLE_PACK"
|
||||
|
|
|
@ -718,6 +718,10 @@ Applications must provide error handling and an appropriate retry loop around th
|
|||
|
||||
Most applications will not call this function.
|
||||
|
||||
.. function:: FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr)
|
||||
|
||||
Retrieves the approximate transaction size so far in the returned future, which is the summation of the estimated size of mutations, read conflict ranges, and write conflict ranges. This can be called multiple times before transaction is committed.
|
||||
|
||||
.. function:: FDBFuture* fdb_transaction_get_versionstamp(FDBTransaction* transaction)
|
||||
|
||||
|future-return0| the versionstamp which was used by any versionstamp operations in this transaction. |future-return1| call :func:`fdb_future_get_key()` to extract the key, |future-return2|
|
||||
|
|
|
@ -2314,7 +2314,6 @@ void Transaction::addReadConflictRange( KeyRangeRef const& keys ) {
|
|||
}
|
||||
|
||||
tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r );
|
||||
printf("%x read_conflict_ranges size: %d, %d\n", this, tr.transaction.read_conflict_ranges.size(), tr.transaction.read_conflict_ranges.expectedSize());
|
||||
}
|
||||
|
||||
void Transaction::makeSelfConflicting() {
|
||||
|
@ -2338,7 +2337,6 @@ void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflic
|
|||
auto r = singleKeyRange( key, req.arena );
|
||||
auto v = ValueRef( req.arena, value );
|
||||
t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) );
|
||||
printf("%x set mutation size: %d, %d\n", this, t.mutations.size(), t.mutations.expectedSize());
|
||||
|
||||
if( addConflictRange ) {
|
||||
t.write_conflict_ranges.push_back( req.arena, r );
|
||||
|
@ -2472,7 +2470,6 @@ void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) {
|
|||
}
|
||||
|
||||
t.write_conflict_ranges.push_back_deep( req.arena, r );
|
||||
printf("%x write_conflict_ranges size: %d, %d\n", this, t.write_conflict_ranges.size(), t.write_conflict_ranges.expectedSize());
|
||||
}
|
||||
|
||||
double Transaction::getBackoff(int errCode) {
|
||||
|
@ -3182,7 +3179,6 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
|
|||
uint32_t Transaction::getSize() {
|
||||
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
|
||||
tr.transaction.write_conflict_ranges.expectedSize();
|
||||
printf("%x approximate size: %d, mutation size: %d\n", this, s, tr.transaction.mutations.size());
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue