Extend ApiTester with support for testing database operations (#8221)

* ApiTester: Refactoring removing unnecessary transaction actor classes

* ApiTester: enable executing non-transactional database operations

* ApiTester: a test for fdb_database_list_blobbified_ranges

* ApiTester: Fix memory ownership of the end key in randomGetRangeOp

* ApiTester: reuse get range result validation in blob granule tests
This commit is contained in:
Vaidas Gasiunas 2022-09-20 16:21:38 +02:00 committed by GitHub
parent 2e2dd119fe
commit 97eddbc06a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 205 additions and 166 deletions

View File

@ -36,7 +36,16 @@ public:
private:
// FIXME: use other new blob granule apis!
enum OpType { OP_INSERT, OP_CLEAR, OP_CLEAR_RANGE, OP_READ, OP_GET_RANGES, OP_SUMMARIZE, OP_LAST = OP_SUMMARIZE };
enum OpType {
OP_INSERT,
OP_CLEAR,
OP_CLEAR_RANGE,
OP_READ,
OP_GET_GRANULES,
OP_SUMMARIZE,
OP_GET_BLOB_RANGES,
OP_LAST = OP_GET_BLOB_RANGES
};
std::vector<OpType> excludedOpTypes;
// Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet
@ -120,7 +129,7 @@ private:
getTenant(tenantId));
}
void randomGetRangesOp(TTaskFct cont, std::optional<int> tenantId) {
void randomGetGranulesOp(TTaskFct cont, std::optional<int> tenantId) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
@ -140,36 +149,7 @@ private:
true);
},
[this, begin, end, results, cont]() {
if (seenReadSuccess) {
ASSERT(results->size() > 0);
ASSERT(results->front().beginKey <= begin);
ASSERT(results->back().endKey >= end);
}
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
if ((*results)[i].beginKey >= (*results)[i].endKey) {
error(fmt::format("Empty/inverted range [{0} - {1}) for getBlobGranuleRanges({2} - {3})",
fdb::toCharsRef((*results)[i].beginKey),
fdb::toCharsRef((*results)[i].endKey),
fdb::toCharsRef(begin),
fdb::toCharsRef(end)));
}
ASSERT((*results)[i].beginKey < (*results)[i].endKey);
}
for (int i = 1; i < results->size(); i++) {
// ranges contain entire requested key range
if ((*results)[i].beginKey != (*results)[i].endKey) {
error(fmt::format("Non-contiguous range [{0} - {1}) for getBlobGranuleRanges({2} - {3})",
fdb::toCharsRef((*results)[i].beginKey),
fdb::toCharsRef((*results)[i].endKey),
fdb::toCharsRef(begin),
fdb::toCharsRef(end)));
}
ASSERT((*results)[i].beginKey == (*results)[i - 1].endKey);
}
this->validateRanges(results, begin, end, seenReadSuccess);
schedule(cont);
},
getTenant(tenantId));
@ -218,6 +198,62 @@ private:
getTenant(tenantId));
}
void validateRanges(std::shared_ptr<std::vector<fdb::KeyRange>> results,
fdb::Key begin,
fdb::Key end,
bool shouldBeRanges) {
if (shouldBeRanges) {
ASSERT(results->size() > 0);
ASSERT(results->front().beginKey <= begin);
ASSERT(results->back().endKey >= end);
}
for (int i = 0; i < results->size(); i++) {
// no empty or inverted ranges
if ((*results)[i].beginKey >= (*results)[i].endKey) {
error(fmt::format("Empty/inverted range [{0} - {1}) for getBlobGranuleRanges({2} - {3})",
fdb::toCharsRef((*results)[i].beginKey),
fdb::toCharsRef((*results)[i].endKey),
fdb::toCharsRef(begin),
fdb::toCharsRef(end)));
}
ASSERT((*results)[i].beginKey < (*results)[i].endKey);
}
for (int i = 1; i < results->size(); i++) {
// ranges contain entire requested key range
if ((*results)[i].beginKey != (*results)[i].endKey) {
error(fmt::format("Non-contiguous range [{0} - {1}) for getBlobGranuleRanges({2} - {3})",
fdb::toCharsRef((*results)[i].beginKey),
fdb::toCharsRef((*results)[i].endKey),
fdb::toCharsRef(begin),
fdb::toCharsRef(end)));
}
ASSERT((*results)[i].beginKey == (*results)[i - 1].endKey);
}
}
void randomGetBlobRangesOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
auto results = std::make_shared<std::vector<fdb::KeyRange>>();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end, results](auto ctx) {
fdb::Future f = ctx->db().listBlobbifiedRanges(begin, end, 1000).eraseType();
ctx->continueAfter(f, [ctx, f, results]() {
*results = copyKeyRangeArray(f.get<fdb::future_var::KeyRangeRefArray>());
ctx->done();
});
},
[this, begin, end, results, cont]() {
this->validateRanges(results, begin, end, seenReadSuccess);
schedule(cont);
},
/* failOnError = */ false);
}
void randomOperation(TTaskFct cont) {
std::optional<int> tenantId = randomTenant();
@ -239,12 +275,15 @@ private:
case OP_READ:
randomReadOp(cont, tenantId);
break;
case OP_GET_RANGES:
randomGetRangesOp(cont, tenantId);
case OP_GET_GRANULES:
randomGetGranulesOp(cont, tenantId);
break;
case OP_SUMMARIZE:
randomSummarizeOp(cont, tenantId);
break;
case OP_GET_BLOB_RANGES:
randomGetBlobRangesOp(cont);
break;
}
}
};

View File

@ -204,23 +204,23 @@ private:
void getRangeLoop(std::shared_ptr<ITransactionContext> ctx,
fdb::KeySelector begin,
fdb::KeySelector end,
fdb::Key endKey,
std::shared_ptr<std::vector<fdb::KeyValue>> results) {
auto f = ctx->tx().getRange(begin,
end,
fdb::key_select::firstGreaterOrEqual(endKey),
0 /*limit*/,
0 /*target_bytes*/,
FDB_STREAMING_MODE_WANT_ALL,
0 /*iteration*/,
false /*snapshot*/,
false /*reverse*/);
ctx->continueAfter(f, [this, ctx, f, end, results]() {
ctx->continueAfter(f, [this, ctx, f, endKey, results]() {
auto out = copyKeyValueArray(f.get());
results->insert(results->end(), out.first.begin(), out.first.end());
const bool more = out.second;
if (more) {
// Fetch the remaining results.
getRangeLoop(ctx, fdb::key_select::firstGreaterThan(results->back().key), end, results);
getRangeLoop(ctx, fdb::key_select::firstGreaterThan(results->back().key), endKey, results);
} else {
ctx->done();
}
@ -237,10 +237,7 @@ private:
// Clear the results vector, in case the transaction is retried.
results->clear();
getRangeLoop(ctx,
fdb::key_select::firstGreaterOrEqual(begin),
fdb::key_select::firstGreaterOrEqual(end),
results);
getRangeLoop(ctx, fdb::key_select::firstGreaterOrEqual(begin), end, results);
},
[this, begin, end, results, cont, tenantId]() {
auto expected = stores[tenantId].getRange(begin, end, results->size() + 10, false);

View File

@ -40,11 +40,6 @@ namespace FdbApiTester {
constexpr int LONG_WAIT_TIME_US = 2000000;
constexpr int LARGE_NUMBER_OF_RETRIES = 10;
void TransactionActorBase::complete(fdb::Error err) {
error = err;
context = {};
}
void ITransactionContext::continueAfterAll(std::vector<fdb::Future> futures, TTaskFct cont) {
auto counter = std::make_shared<std::atomic<int>>(futures.size());
auto errorCode = std::make_shared<std::atomic<fdb::Error>>(fdb::Error::success());
@ -76,28 +71,31 @@ void ITransactionContext::continueAfterAll(std::vector<fdb::Future> futures, TTa
class TransactionContextBase : public ITransactionContext {
public:
TransactionContextBase(ITransactionExecutor* executor,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
TOpStartFct startFct,
TOpContFct cont,
IScheduler* scheduler,
int retryLimit,
std::string bgBasePath,
std::optional<fdb::BytesRef> tenantName)
: executor(executor), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath), tenantName(tenantName) {
std::optional<fdb::BytesRef> tenantName,
bool transactional)
: executor(executor), startFct(startFct), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath), tenantName(tenantName),
transactional(transactional) {
databaseCreateErrorInjected = executor->getOptions().injectDatabaseCreateErrors &&
Random::get().randomBool(executor->getOptions().databaseCreateErrorRatio);
fdb::Database db;
if (databaseCreateErrorInjected) {
db = fdb::Database(executor->getClusterFileForErrorInjection());
fdbDb = fdb::Database(executor->getClusterFileForErrorInjection());
} else {
db = executor->selectDatabase();
fdbDb = executor->selectDatabase();
}
if (tenantName) {
fdb::Tenant tenant = db.openTenant(*tenantName);
fdbTx = tenant.createTransaction();
} else {
fdbTx = db.createTransaction();
if (transactional) {
if (tenantName) {
fdb::Tenant tenant = fdbDb.openTenant(*tenantName);
fdbTx = tenant.createTransaction();
} else {
fdbTx = fdbDb.createTransaction();
}
}
}
@ -107,6 +105,8 @@ public:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
enum class TxState { IN_PROGRESS, ON_ERROR, DONE };
fdb::Database db() override { return fdbDb.atomic_load(); }
fdb::Transaction tx() override { return fdbTx.atomic_load(); }
// Set a continuation to be executed when a future gets ready
@ -116,6 +116,7 @@ public:
// Complete the transaction with a commit
void commit() override {
ASSERT(transactional);
std::unique_lock<std::mutex> lock(mutex);
if (txState != TxState::IN_PROGRESS) {
return;
@ -146,14 +147,14 @@ public:
fmt::join(retriedErrorCodes(), ", "));
}
// cancel transaction so that any pending operations on it
// fail gracefully
fdbTx.cancel();
txActor->complete(fdb::Error::success());
cleanUp();
if (transactional) {
// cancel transaction so that any pending operations on it
// fail gracefully
fdbTx.cancel();
cleanUp();
}
ASSERT(txState == TxState::DONE);
contAfterDone();
contAfterDone(fdb::Error::success());
}
std::string getBGBasePath() override { return bgBasePath; }
@ -179,20 +180,26 @@ public:
if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) {
// Failed to create a database because of failure injection
// Restart by recreating the transaction in a valid database
scheduler->schedule([this]() {
fdb::Database db = executor->selectDatabase();
if (tenantName) {
fdb::Tenant tenant = db.openTenant(*tenantName);
fdbTx.atomic_store(tenant.createTransaction());
} else {
fdbTx.atomic_store(db.createTransaction());
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
scheduler->schedule([thisRef]() {
fdb::Database db = thisRef->executor->selectDatabase();
thisRef->fdbDb.atomic_store(db);
if (thisRef->transactional) {
if (thisRef->tenantName) {
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
thisRef->fdbTx.atomic_store(tenant.createTransaction());
} else {
thisRef->fdbTx.atomic_store(db.createTransaction());
}
}
restartTransaction();
thisRef->restartTransaction();
});
} else {
} else if (transactional) {
onErrorArg = err;
onErrorFuture = tx().onError(err);
handleOnErrorFuture();
} else {
transactionFailed(err);
}
}
@ -207,7 +214,6 @@ protected:
void cleanUp() {
ASSERT(txState == TxState::DONE);
ASSERT(!onErrorFuture);
txActor = {};
cancelPendingFutures();
}
@ -230,9 +236,8 @@ protected:
// No need for lock from here on, because only one thread
// can enter DONE state and handle it
txActor->complete(err);
cleanUp();
contAfterDone();
contAfterDone(err);
}
// Handle result of an a transaction onError call
@ -254,7 +259,7 @@ protected:
txState = TxState::IN_PROGRESS;
commitCalled = false;
lock.unlock();
txActor->start();
startFct(shared_from_this());
}
// Checks if a transaction can be retried. Fails the transaction if the check fails
@ -286,13 +291,17 @@ protected:
// Set in contructor, stays immutable
ITransactionExecutor* const executor;
// FDB database
// Provides a thread safe interface by itself (no need for mutex)
fdb::Database fdbDb;
// FDB transaction
// Provides a thread safe interface by itself (no need for mutex)
fdb::Transaction fdbTx;
// Actor implementing the transaction worklflow
// The function implementing the starting point of the transaction
// Set in constructor and reset on cleanup (no need for mutex)
std::shared_ptr<ITransactionActor> txActor;
TOpStartFct startFct;
// Mutex protecting access to shared mutable state
// Only the state that is accessible unter IN_PROGRESS state
@ -301,7 +310,7 @@ protected:
// Continuation to be called after completion of the transaction
// Set in contructor, stays immutable
const TTaskFct contAfterDone;
const TOpContFct contAfterDone;
// Reference to the scheduler
// Set in contructor, stays immutable
@ -346,6 +355,9 @@ protected:
// The tenant that we will run this transaction in
const std::optional<fdb::BytesRef> tenantName;
// Specifies whether the operation is transactional
const bool transactional;
};
/**
@ -354,13 +366,15 @@ protected:
class BlockingTransactionContext : public TransactionContextBase {
public:
BlockingTransactionContext(ITransactionExecutor* executor,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
TOpStartFct startFct,
TOpContFct cont,
IScheduler* scheduler,
int retryLimit,
std::string bgBasePath,
std::optional<fdb::BytesRef> tenantName)
: TransactionContextBase(executor, txActor, cont, scheduler, retryLimit, bgBasePath, tenantName) {}
std::optional<fdb::BytesRef> tenantName,
bool transactional)
: TransactionContextBase(executor, startFct, cont, scheduler, retryLimit, bgBasePath, tenantName, transactional) {
}
protected:
void doContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
@ -430,13 +444,15 @@ protected:
class AsyncTransactionContext : public TransactionContextBase {
public:
AsyncTransactionContext(ITransactionExecutor* executor,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
TOpStartFct startFct,
TOpContFct cont,
IScheduler* scheduler,
int retryLimit,
std::string bgBasePath,
std::optional<fdb::BytesRef> tenantName)
: TransactionContextBase(executor, txActor, cont, scheduler, retryLimit, bgBasePath, tenantName) {}
std::optional<fdb::BytesRef> tenantName,
bool transactional)
: TransactionContextBase(executor, startFct, cont, scheduler, retryLimit, bgBasePath, tenantName, transactional) {
}
protected:
void doContinueAfter(fdb::Future f, TTaskFct cont, bool retryOnError) override {
@ -648,23 +664,22 @@ public:
const TransactionExecutorOptions& getOptions() override { return options; }
void execute(std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
std::optional<fdb::BytesRef> tenantName = {}) override {
void execute(TOpStartFct startFct,
TOpContFct cont,
std::optional<fdb::BytesRef> tenantName,
bool transactional) override {
try {
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(
this, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName);
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
} else {
ctx = std::make_shared<AsyncTransactionContext>(
this, txActor, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName);
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
}
txActor->init(ctx);
txActor->start();
startFct(ctx);
} catch (...) {
txActor->complete(fdb::Error(error_code_operation_failed));
cont();
cont(fdb::Error(error_code_operation_failed));
}
}

View File

@ -38,6 +38,9 @@ class ITransactionContext : public std::enable_shared_from_this<ITransactionCont
public:
virtual ~ITransactionContext() {}
// Current FDB database
virtual fdb::Database db() = 0;
// Current FDB transaction
virtual fdb::Transaction tx() = 0;
@ -62,57 +65,11 @@ public:
virtual void continueAfterAll(std::vector<fdb::Future> futures, TTaskFct cont);
};
/**
* Interface of an actor object implementing a concrete transaction
*/
class ITransactionActor {
public:
virtual ~ITransactionActor() {}
// Type of the lambda functions implementing a database operation
using TOpStartFct = std::function<void(std::shared_ptr<ITransactionContext>)>;
// Initialize with the given transaction context
virtual void init(std::shared_ptr<ITransactionContext> ctx) = 0;
// Start execution of the transaction, also called on retries
virtual void start() = 0;
// Transaction completion result (error_code_success in case of success)
virtual fdb::Error getError() = 0;
// Notification about the completion of the transaction
virtual void complete(fdb::Error err) = 0;
};
/**
* A helper base class for transaction actors
*/
class TransactionActorBase : public ITransactionActor {
public:
void init(std::shared_ptr<ITransactionContext> ctx) override { context = ctx; }
fdb::Error getError() override { return error; }
void complete(fdb::Error err) override;
protected:
std::shared_ptr<ITransactionContext> ctx() { return context; }
private:
std::shared_ptr<ITransactionContext> context;
fdb::Error error = fdb::Error::success();
};
// Type of the lambda functions implementing a transaction
using TTxStartFct = std::function<void(std::shared_ptr<ITransactionContext>)>;
/**
* A wrapper class for transactions implemented by lambda functions
*/
class TransactionFct : public TransactionActorBase {
public:
TransactionFct(TTxStartFct startFct) : startFct(startFct) {}
void start() override { startFct(this->ctx()); }
private:
TTxStartFct startFct;
};
// Type of the lambda functions implementing a database operation
using TOpContFct = std::function<void(fdb::Error)>;
/**
* Configuration of transaction execution mode
@ -156,9 +113,10 @@ class ITransactionExecutor {
public:
virtual ~ITransactionExecutor() {}
virtual void init(IScheduler* sched, const char* clusterFile, const std::string& bgBasePath) = 0;
virtual void execute(std::shared_ptr<ITransactionActor> tx,
TTaskFct cont,
std::optional<fdb::BytesRef> tenantName = {}) = 0;
virtual void execute(TOpStartFct start,
TOpContFct cont,
std::optional<fdb::BytesRef> tenantName,
bool transactional) = 0;
virtual fdb::Database selectDatabase() = 0;
virtual std::string getClusterFileForErrorInjection() = 0;
virtual const TransactionExecutorOptions& getOptions() = 0;

View File

@ -106,10 +106,23 @@ void WorkloadBase::schedule(TTaskFct task) {
});
}
void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx,
void WorkloadBase::execTransaction(TOpStartFct startFct,
TTaskFct cont,
std::optional<fdb::BytesRef> tenant,
bool failOnError) {
doExecute(startFct, cont, tenant, failOnError, true);
}
// Execute a non-transactional database operation within the workload
void WorkloadBase::execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError) {
doExecute(startFct, cont, {}, failOnError, false);
}
void WorkloadBase::doExecute(TOpStartFct startFct,
TTaskFct cont,
std::optional<fdb::BytesRef> tenant,
bool failOnError,
bool transactional) {
ASSERT(inProgress);
if (failed) {
return;
@ -117,10 +130,9 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx,
tasksScheduled++;
numTxStarted++;
manager->txExecutor->execute(
tx,
[this, tx, cont, failOnError]() {
startFct,
[this, startFct, cont, failOnError](fdb::Error err) {
numTxCompleted++;
fdb::Error err = tx->getError();
if (err.code() == error_code_success) {
cont();
} else {
@ -135,7 +147,8 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx,
}
scheduledTaskDone();
},
tenant);
tenant,
transactional);
}
void WorkloadBase::info(const std::string& msg) {

View File

@ -119,18 +119,13 @@ protected:
void schedule(TTaskFct task);
// Execute a transaction within the workload
void execTransaction(std::shared_ptr<ITransactionActor> tx,
void execTransaction(TOpStartFct startFct,
TTaskFct cont,
std::optional<fdb::BytesRef> tenant = std::optional<fdb::BytesRef>(),
bool failOnError = true);
// Execute a transaction within the workload, a convenience method for a tranasaction defined by a lambda function
void execTransaction(TTxStartFct start,
TTaskFct cont,
std::optional<fdb::BytesRef> tenant = std::optional<fdb::BytesRef>(),
bool failOnError = true) {
execTransaction(std::make_shared<TransactionFct>(start), cont, tenant, failOnError);
}
// Execute a non-transactional database operation within the workload
void execOperation(TOpStartFct startFct, TTaskFct cont, bool failOnError = true);
// Log an error message, increase error counter
void error(const std::string& msg);
@ -144,6 +139,12 @@ protected:
private:
WorkloadManager* manager;
void doExecute(TOpStartFct startFct,
TTaskFct cont,
std::optional<fdb::BytesRef> tenant,
bool failOnError,
bool transactional);
// Decrease scheduled task counter, notify the workload manager
// that the task is done if no more tasks schedule
void scheduledTaskDone();

View File

@ -349,6 +349,7 @@ public:
class Future {
protected:
friend class Transaction;
friend class Database;
friend std::hash<Future>;
std::shared_ptr<native::FDBFuture> f;
@ -718,6 +719,14 @@ public:
}
Database() noexcept : db(nullptr) {}
void atomic_store(Database other) { std::atomic_store(&db, other.db); }
Database atomic_load() {
Database retVal;
retVal.db = std::atomic_load(&db);
return retVal;
}
Error setOptionNothrow(FDBDatabaseOption option, int64_t value) noexcept {
return Error(native::fdb_database_set_option(
db.get(), option, reinterpret_cast<const uint8_t*>(&value), static_cast<int>(sizeof(value))));
@ -763,6 +772,13 @@ public:
throwError("Failed to create transaction: ", err);
return Transaction(tx_native);
}
TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) {
if (!db)
throw std::runtime_error("list_blobbified_ranges from null database");
return native::fdb_database_list_blobbified_ranges(
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit);
}
};
inline Error selectApiVersionNothrow(int version) {