Added a test in API tests for the new API(getEstimagedRangeSize).
This commit is contained in:
parent
980037f3a8
commit
527686ffe0
|
@ -157,6 +157,7 @@ class ApiTest(Test):
|
|||
read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY']
|
||||
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
|
||||
txn_sizes = ['GET_APPROXIMATE_SIZE']
|
||||
storage_metrics = ['GET_ESTIMATED_RANGE_SIZE']
|
||||
|
||||
op_choices += reads
|
||||
op_choices += mutations
|
||||
|
@ -170,6 +171,7 @@ class ApiTest(Test):
|
|||
op_choices += write_conflicts
|
||||
op_choices += resets
|
||||
op_choices += txn_sizes
|
||||
op_choices += storage_metrics
|
||||
|
||||
idempotent_atomic_ops = ['BIT_AND', 'BIT_OR', 'MAX', 'MIN', 'BYTE_MIN', 'BYTE_MAX']
|
||||
atomic_ops = idempotent_atomic_ops + ['ADD', 'BIT_XOR', 'APPEND_IF_FITS']
|
||||
|
@ -536,6 +538,21 @@ class ApiTest(Test):
|
|||
instructions.push_args(d)
|
||||
instructions.append(op)
|
||||
self.add_strings(1)
|
||||
elif op == 'GET_ESTIMATED_RANGE_SIZE':
|
||||
# Protect against inverted range and identical keys
|
||||
key1 = self.workspace.pack(self.random.random_tuple(1))
|
||||
key2 = self.workspace.pack(self.random.random_tuple(1))
|
||||
|
||||
while key1 == key2:
|
||||
key1 = self.workspace.pack(self.random.random_tuple(1))
|
||||
key2 = self.workspace.pack(self.random.random_tuple(1))
|
||||
|
||||
if key1 > key2:
|
||||
key1, key2 = key2, key1
|
||||
|
||||
instructions.push_args(key1, key2)
|
||||
instructions.append(op)
|
||||
self.add_strings(1)
|
||||
|
||||
else:
|
||||
assert False, 'Unknown operation: ' + op
|
||||
|
|
|
@ -430,9 +430,8 @@ struct LogStackFunc : InstructionFunc {
|
|||
wait(logStack(data, entries, prefix));
|
||||
entries.clear();
|
||||
}
|
||||
|
||||
wait(logStack(data, entries, prefix));
|
||||
}
|
||||
wait(logStack(data, entries, prefix));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1615,6 +1614,29 @@ struct UnitTestsFunc : InstructionFunc {
|
|||
const char* UnitTestsFunc::name = "UNIT_TESTS";
|
||||
REGISTER_INSTRUCTION_FUNC(UnitTestsFunc);
|
||||
|
||||
struct GetEstimatedRangeSize : InstructionFunc {
|
||||
static const char* name;
|
||||
|
||||
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
|
||||
state std::vector<StackItem> items = data->stack.pop(2);
|
||||
if (items.size() != 2)
|
||||
return Void();
|
||||
|
||||
Standalone<StringRef> s1 = wait(items[0].value);
|
||||
state Standalone<StringRef> beginKey = Tuple::unpack(s1).getString(0);
|
||||
|
||||
Standalone<StringRef> s2 = wait(items[1].value);
|
||||
state Standalone<StringRef> endKey = Tuple::unpack(s2).getString(0);
|
||||
Future<int64_t> fsize = instruction->tr->getEstimatedRangeSizeBytes(KeyRangeRef(beginKey, endKey));
|
||||
int64_t size = wait(fsize);
|
||||
data->stack.pushTuple(LiteralStringRef("GOT_ESTIMATED_RANGE_SIZE"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
const char* GetEstimatedRangeSize::name = "GET_ESTIMATED_RANGE_SIZE";
|
||||
REGISTER_INSTRUCTION_FUNC(GetEstimatedRangeSize);
|
||||
|
||||
ACTOR static Future<Void> getInstructions(Reference<FlowTesterData> data, StringRef prefix) {
|
||||
state Reference<Transaction> tr = data->db->createTransaction();
|
||||
|
||||
|
|
|
@ -865,6 +865,16 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
|
|||
|
||||
case strings.HasPrefix(op, "DIRECTORY_"):
|
||||
sm.de.processOp(sm, op[10:], isDB, idx, t, rt)
|
||||
case strings.HasPrefix(op, "GET_ESTIMATED_RANGE_SIZE"):
|
||||
r := sm.popKeyRange()
|
||||
_, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
|
||||
_ = rtr.GetEstimatedRangeSizeBytes(r).MustGet()
|
||||
sm.store(idx, []byte("GOT_ESTIMATED_RANGE_SIZE"))
|
||||
return nil, nil
|
||||
})
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
default:
|
||||
log.Fatalf("Unhandled operation %s\n", string(inst[0].([]byte)))
|
||||
}
|
||||
|
|
|
@ -330,7 +330,7 @@ func (o DatabaseOptions) SetTransactionCausalReadRisky() error {
|
|||
return o.setOpt(504, nil)
|
||||
}
|
||||
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. This will be enabled by default in api version 700, and this option will be deprecated.
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 700, this option is enabled by default and setting this has no effect.
|
||||
func (o DatabaseOptions) SetTransactionIncludePortInAddress() error {
|
||||
return o.setOpt(505, nil)
|
||||
}
|
||||
|
@ -350,7 +350,7 @@ func (o TransactionOptions) SetCausalReadDisable() error {
|
|||
return o.setOpt(21, nil)
|
||||
}
|
||||
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. This will be enabled by default in api version 700, and this option will be deprecated.
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 700, this option is enabled by default and setting this has no effect.
|
||||
func (o TransactionOptions) SetIncludePortInAddress() error {
|
||||
return o.setOpt(23, nil)
|
||||
}
|
||||
|
@ -512,13 +512,14 @@ const (
|
|||
// small portion of data is transferred to the client initially (in order to
|
||||
// minimize costs if the client doesn't read the entire range), and as the
|
||||
// caller iterates over more items in the range larger batches will be
|
||||
// transferred in order to minimize latency.
|
||||
// transferred in order to minimize latency. After enough iterations, the
|
||||
// iterator mode will eventually reach the same byte limit as ``WANT_ALL``
|
||||
StreamingModeIterator StreamingMode = 0
|
||||
|
||||
// Infrequently used. The client has passed a specific row limit and wants
|
||||
// that many rows delivered in a single batch. Because of iterator operation
|
||||
// in client drivers make request batches transparent to the user, consider
|
||||
// “WANT_ALL“ StreamingMode instead. A row limit must be specified if this
|
||||
// ``WANT_ALL`` StreamingMode instead. A row limit must be specified if this
|
||||
// mode is used.
|
||||
StreamingModeExact StreamingMode = 1
|
||||
|
||||
|
@ -635,15 +636,15 @@ type ErrorPredicate int
|
|||
|
||||
const (
|
||||
|
||||
// Returns “true“ if the error indicates the operations in the transactions
|
||||
// should be retried because of transient error.
|
||||
// Returns ``true`` if the error indicates the operations in the
|
||||
// transactions should be retried because of transient error.
|
||||
ErrorPredicateRetryable ErrorPredicate = 50000
|
||||
|
||||
// Returns “true“ if the error indicates the transaction may have succeeded,
|
||||
// though not in a way the system can verify.
|
||||
// Returns ``true`` if the error indicates the transaction may have
|
||||
// succeeded, though not in a way the system can verify.
|
||||
ErrorPredicateMaybeCommitted ErrorPredicate = 50001
|
||||
|
||||
// Returns “true“ if the error indicates the transaction has not committed,
|
||||
// though in a way that can be retried.
|
||||
// Returns ``true`` if the error indicates the transaction has not
|
||||
// committed, though in a way that can be retried.
|
||||
ErrorPredicateRetryableNotCommitted ErrorPredicate = 50002
|
||||
)
|
||||
|
|
|
@ -527,6 +527,11 @@ public class AsyncStackTester {
|
|||
}
|
||||
else if(op == StackOperation.LOG_STACK) {
|
||||
return inst.popParam().thenComposeAsync(prefix -> doLogStack(inst, (byte[])prefix), FDB.DEFAULT_EXECUTOR);
|
||||
} else if (op == StackOperation.GET_ESTIMATED_RANGE_SIZE) {
|
||||
List<Object> params = inst.popParams(2).join();
|
||||
return inst.readTr.getEstimatedRangeSizeBytes((byte[])params.get(0), (byte[])params.get(1)).thenAcceptAsync(size -> {
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes());
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Unrecognized (or unimplemented) operation");
|
||||
|
|
|
@ -56,6 +56,7 @@ enum StackOperation {
|
|||
GET_COMMITTED_VERSION,
|
||||
GET_APPROXIMATE_SIZE,
|
||||
GET_VERSIONSTAMP,
|
||||
GET_ESTIMATED_RANGE_SIZE,
|
||||
SET_READ_VERSION,
|
||||
ON_ERROR,
|
||||
SUB,
|
||||
|
|
|
@ -499,6 +499,11 @@ public class StackTester {
|
|||
|
||||
logStack(inst.context.db, entries, prefix);
|
||||
}
|
||||
else if (op == StackOperation.GET_ESTIMATED_RANGE_SIZE) {
|
||||
List<Object> params = inst.popParams(2).join();
|
||||
Long size = inst.readTr.getEstimatedRangeSizeBytes((byte[])params.get(0), (byte[])params.get(1)).join();
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes());
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unrecognized (or unimplemented) operation");
|
||||
}
|
||||
|
|
|
@ -576,6 +576,10 @@ class Tester:
|
|||
raise Exception("Unit tests failed: %s" % e.description)
|
||||
elif inst.op.startswith(six.u('DIRECTORY_')):
|
||||
self.directory_extension.process_instruction(inst)
|
||||
elif inst.op == six.u("GET_ESTIMATED_RANGE_SIZE"):
|
||||
begin, end = inst.pop(2)
|
||||
estimatedSize = obj.get_estimated_range_size_bytes(begin, end).wait()
|
||||
inst.push(b"GOT_ESTIMATED_RANGE_SIZE")
|
||||
else:
|
||||
raise Exception("Unknown op %s" % inst.op)
|
||||
except fdb.FDBError as e:
|
||||
|
|
|
@ -510,6 +510,9 @@ class Tester
|
|||
end
|
||||
|
||||
log_stack(entries, prefix)
|
||||
when "GET_ESTIMATED_RANGE_SIZE"
|
||||
inst.tr.get_estimated_range_size_bytes(inst.wait_and_pop, inst.wait_and_pop).to_i
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE")
|
||||
else
|
||||
if op.start_with?('DIRECTORY_')
|
||||
@directory_extension.process_instruction(inst)
|
||||
|
|
Loading…
Reference in New Issue