ApiTester: Fix handling retries of DB operations
This commit is contained in:
parent
b50276ff5a
commit
5d6cb4d3dd
|
@ -180,24 +180,13 @@ public:
|
|||
if (databaseCreateErrorInjected && canBeInjectedDatabaseCreateError(err.code())) {
|
||||
// Failed to create a database because of failure injection
|
||||
// Restart by recreating the transaction in a valid database
|
||||
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
|
||||
scheduler->schedule([thisRef]() {
|
||||
fdb::Database db = thisRef->executor->selectDatabase();
|
||||
thisRef->fdbDb.atomic_store(db);
|
||||
if (thisRef->transactional) {
|
||||
if (thisRef->tenantName) {
|
||||
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
|
||||
thisRef->fdbTx.atomic_store(tenant.createTransaction());
|
||||
} else {
|
||||
thisRef->fdbTx.atomic_store(db.createTransaction());
|
||||
}
|
||||
}
|
||||
thisRef->restartTransaction();
|
||||
});
|
||||
recreateAndRestartTransaction();
|
||||
} else if (transactional) {
|
||||
onErrorArg = err;
|
||||
onErrorFuture = tx().onError(err);
|
||||
handleOnErrorFuture();
|
||||
} else if (isRetriableDBOpError(err.code())) {
|
||||
restartTransaction();
|
||||
} else {
|
||||
transactionFailed(err);
|
||||
}
|
||||
|
@ -223,6 +212,8 @@ protected:
|
|||
return errCode == error_code_no_cluster_file_found || errCode == error_code_connection_string_invalid;
|
||||
}
|
||||
|
||||
bool isRetriableDBOpError(fdb::Error::CodeType errCode) { return errCode == error_code_cluster_version_changed; }
|
||||
|
||||
// Complete the transaction with an (unretriable) error
|
||||
void transactionFailed(fdb::Error err) {
|
||||
ASSERT(err);
|
||||
|
@ -262,6 +253,23 @@ protected:
|
|||
startFct(shared_from_this());
|
||||
}
|
||||
|
||||
void recreateAndRestartTransaction() {
|
||||
auto thisRef = std::static_pointer_cast<TransactionContextBase>(shared_from_this());
|
||||
scheduler->schedule([thisRef]() {
|
||||
fdb::Database db = thisRef->executor->selectDatabase();
|
||||
thisRef->fdbDb.atomic_store(db);
|
||||
if (thisRef->transactional) {
|
||||
if (thisRef->tenantName) {
|
||||
fdb::Tenant tenant = db.openTenant(*thisRef->tenantName);
|
||||
thisRef->fdbTx.atomic_store(tenant.createTransaction());
|
||||
} else {
|
||||
thisRef->fdbTx.atomic_store(db.createTransaction());
|
||||
}
|
||||
}
|
||||
thisRef->restartTransaction();
|
||||
});
|
||||
}
|
||||
|
||||
// Checks if a transaction can be retried. Fails the transaction if the check fails
|
||||
bool canRetry(fdb::Error lastErr) {
|
||||
ASSERT(txState == TxState::ON_ERROR);
|
||||
|
@ -671,11 +679,23 @@ public:
|
|||
try {
|
||||
std::shared_ptr<ITransactionContext> ctx;
|
||||
if (options.blockOnFutures) {
|
||||
ctx = std::make_shared<BlockingTransactionContext>(
|
||||
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
|
||||
ctx = std::make_shared<BlockingTransactionContext>(this,
|
||||
startFct,
|
||||
cont,
|
||||
scheduler,
|
||||
options.transactionRetryLimit,
|
||||
bgBasePath,
|
||||
tenantName,
|
||||
transactional);
|
||||
} else {
|
||||
ctx = std::make_shared<AsyncTransactionContext>(
|
||||
this, startFct, cont, scheduler, options.transactionRetryLimit, bgBasePath, tenantName, true);
|
||||
ctx = std::make_shared<AsyncTransactionContext>(this,
|
||||
startFct,
|
||||
cont,
|
||||
scheduler,
|
||||
options.transactionRetryLimit,
|
||||
bgBasePath,
|
||||
tenantName,
|
||||
transactional);
|
||||
}
|
||||
startFct(ctx);
|
||||
} catch (...) {
|
||||
|
|
Loading…
Reference in New Issue