Upgrade Tests: Introduce transaction retry limit; Trace long retry chains; An option to run the test binary with gdb

This commit is contained in:
Vaidas Gasiunas 2022-04-13 16:50:02 +02:00
parent 871cdece0e
commit a03ba928e4
7 changed files with 112 additions and 27 deletions

View File

@ -28,7 +28,7 @@ namespace {
void fdb_check(fdb_error_t e) {
if (e) {
fmt::print(stderr, "Unexpected error: %s\n", fdb_get_error(e));
fmt::print(stderr, "Unexpected error: {}\n", fdb_get_error(e));
std::abort();
}
}

View File

@ -42,6 +42,7 @@ public:
std::string testFile;
std::string inputPipeName;
std::string outputPipeName;
int transactionRetryLimit = 0;
int numFdbThreads;
int numClientThreads;
int numDatabases;

View File

@ -23,6 +23,7 @@
#include "foundationdb/fdb_c_types.h"
#include "test/apitester/TesterScheduler.h"
#include <memory>
#include <stdexcept>
#include <unordered_map>
#include <mutex>
#include <atomic>
@ -33,6 +34,7 @@
namespace FdbApiTester {
constexpr int LONG_WAIT_TIME_US = 1000000;
constexpr int LARGE_NUMBER_OF_RETRIES = 5;
void TransactionActorBase::complete(fdb_error_t err) {
error = err;
@ -72,9 +74,10 @@ public:
TransactionContextBase(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), txState(TxState::IN_PROGRESS),
commitCalled(false) {}
IScheduler* scheduler,
int retryLimit)
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
txState(TxState::IN_PROGRESS), commitCalled(false) {}
// A state machine:
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
@ -107,6 +110,11 @@ public:
}
txState = TxState::DONE;
lock.unlock();
if (retriedErrors.size() >= LARGE_NUMBER_OF_RETRIES) {
fmt::print("Transaction succeeded after {} retries on errors: {}\n",
retriedErrors.size(),
fmt::join(retriedErrors, ", "));
}
// cancel transaction so that any pending operations on it
// fail gracefully
fdbTx.cancel();
@ -151,12 +159,29 @@ protected:
} else {
std::unique_lock<std::mutex> lock(mutex);
txState = TxState::IN_PROGRESS;
lock.unlock();
commitCalled = false;
lock.unlock();
txActor->start();
}
}
// Checks if a transaction can be retried. Fails the transaction if the check fails
bool canRetry(fdb_error_t lastErr) {
ASSERT(txState == TxState::ON_ERROR);
retriedErrors.push_back(lastErr);
if (retryLimit == 0 || retriedErrors.size() <= retryLimit) {
if (retriedErrors.size() == LARGE_NUMBER_OF_RETRIES) {
fmt::print("Transaction already retried {} times, on errors: {}\n",
retriedErrors.size(),
fmt::join(retriedErrors, ", "));
}
return true;
}
fmt::print("Transaction retry limit reached. Retried on errors: {}\n", fmt::join(retriedErrors, ", "));
transactionFailed(lastErr);
return false;
}
// FDB transaction
Transaction fdbTx;
@ -172,6 +197,9 @@ protected:
// Reference to the scheduler
IScheduler* scheduler;
// Retry limit
int retryLimit;
// Transaction execution state
TxState txState;
@ -186,6 +214,9 @@ protected:
// Transaction is committed or being committed
bool commitCalled;
// A history of errors on which the transaction was retried
std::vector<fdb_error_t> retriedErrors;
};
/**
@ -196,8 +227,9 @@ public:
BlockingTransactionContext(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: TransactionContextBase(tx, txActor, cont, scheduler) {}
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -247,8 +279,13 @@ protected:
txState = TxState::ON_ERROR;
lock.unlock();
if (!canRetry(err)) {
return;
}
ASSERT(!onErrorFuture);
onErrorFuture = fdbTx.onError(err);
onErrorArg = err;
auto start = timeNow();
fdb_error_t err2 = fdb_future_block_until_ready(onErrorFuture.fdbFuture());
@ -260,7 +297,7 @@ protected:
if (waitTimeUs > LONG_WAIT_TIME_US) {
fdb_error_t err3 = onErrorFuture.getError();
fmt::print("Long waiting time on onError({}) future: {:.3f}s, return code {} ({})\n",
err,
onErrorArg,
microsecToSec(waitTimeUs),
err3,
fdb_get_error(err3));
@ -278,8 +315,9 @@ public:
AsyncTransactionContext(FDBTransaction* tx,
std::shared_ptr<ITransactionActor> txActor,
TTaskFct cont,
IScheduler* scheduler)
: TransactionContextBase(tx, txActor, cont, scheduler) {}
IScheduler* scheduler,
int retryLimit)
: TransactionContextBase(tx, txActor, cont, scheduler, retryLimit) {}
protected:
void doContinueAfter(Future f, TTaskFct cont, bool retryOnError) override {
@ -299,8 +337,16 @@ protected:
}
static void futureReadyCallback(FDBFuture* f, void* param) {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onFutureReady(f);
try {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onFutureReady(f);
} catch (std::runtime_error& err) {
fmt::print("Unexpected exception in callback {}\n", err.what());
abort();
} catch (...) {
fmt::print("Unknown error in callback\n");
abort();
}
}
void onFutureReady(FDBFuture* f) {
@ -345,6 +391,10 @@ protected:
txState = TxState::ON_ERROR;
lock.unlock();
if (!canRetry(err)) {
return;
}
ASSERT(!onErrorFuture);
onErrorArg = err;
onErrorFuture = tx()->onError(err);
@ -358,8 +408,16 @@ protected:
}
static void onErrorReadyCallback(FDBFuture* f, void* param) {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onErrorReady(f);
try {
AsyncTransactionContext* txCtx = (AsyncTransactionContext*)param;
txCtx->onErrorReady(f);
} catch (std::runtime_error& err) {
fmt::print("Unexpected exception in callback {}\n", err.what());
abort();
} catch (...) {
fmt::print("Unknown error in callback\n");
abort();
}
}
void onErrorReady(FDBFuture* f) {
@ -440,9 +498,11 @@ protected:
} else {
std::shared_ptr<ITransactionContext> ctx;
if (options.blockOnFutures) {
ctx = std::make_shared<BlockingTransactionContext>(tx, txActor, cont, scheduler);
ctx = std::make_shared<BlockingTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
} else {
ctx = std::make_shared<AsyncTransactionContext>(tx, txActor, cont, scheduler);
ctx = std::make_shared<AsyncTransactionContext>(
tx, txActor, cont, scheduler, options.transactionRetryLimit);
}
txActor->init(ctx);
txActor->start();

View File

@ -123,6 +123,9 @@ struct TransactionExecutorOptions {
// The size of the database instance pool
int numDatabases = 1;
// Maximum number of retries per transaction (0 - unlimited)
int transactionRetryLimit = 0;
};
/**

View File

@ -110,7 +110,7 @@ void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskF
if (tx->getErrorCode() == error_code_success) {
cont();
} else {
std::string msg = fmt::format("Transaction failed with error: {} ({}})", err, fdb_get_error(err));
std::string msg = fmt::format("Transaction failed with error: {} ({})", err, fdb_get_error(err));
if (failOnError) {
error(msg);
failed = true;

View File

@ -50,7 +50,8 @@ enum TesterOptionId {
OPT_TEST_FILE,
OPT_INPUT_PIPE,
OPT_OUTPUT_PIPE,
OPT_FDB_API_VERSION
OPT_FDB_API_VERSION,
OPT_TRANSACTION_RETRY_LIMIT
};
CSimpleOpt::SOption TesterOptionDefs[] = //
@ -71,6 +72,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
{ OPT_INPUT_PIPE, "--input-pipe", SO_REQ_SEP },
{ OPT_OUTPUT_PIPE, "--output-pipe", SO_REQ_SEP },
{ OPT_FDB_API_VERSION, "--api-version", SO_REQ_SEP },
{ OPT_TRANSACTION_RETRY_LIMIT, "--transaction-retry-limit", SO_REQ_SEP },
SO_END_OF_OPTIONS };
void printProgramUsage(const char* execName) {
@ -104,6 +106,8 @@ void printProgramUsage(const char* execName) {
" Name of the output pipe for communication with the test controller.\n"
" --api-version VERSION\n"
" Required FDB API version (default %d).\n"
" --transaction-retry-limit NUMBER\n"
" Maximum number of retries per tranaction (default: 0 - unlimited)\n"
" -f, --test-file FILE\n"
" Test file to run.\n"
" -h, --help Display this help and exit.\n",
@ -129,15 +133,14 @@ bool validateTraceFormat(std::string_view format) {
const int MIN_TESTABLE_API_VERSION = 400;
void processApiVersionOption(const std::string& optionName, const std::string& value, int& res) {
void processIntOption(const std::string& optionName, const std::string& value, int minValue, int maxValue, int& res) {
char* endptr;
res = strtol(value.c_str(), &endptr, 10);
if (*endptr != '\0') {
throw TesterError(fmt::format("Invalid value {} for {}", value, optionName));
}
if (res < MIN_TESTABLE_API_VERSION || res > FDB_API_VERSION) {
throw TesterError(fmt::format(
"Value for {} must be between {} and {}", optionName, MIN_TESTABLE_API_VERSION, FDB_API_VERSION));
if (res < minValue || res > maxValue) {
throw TesterError(fmt::format("Value for {} must be between {} and {}", optionName, minValue, maxValue));
}
}
@ -191,7 +194,11 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
options.outputPipeName = args.OptionArg();
break;
case OPT_FDB_API_VERSION:
processApiVersionOption(args.OptionText(), args.OptionArg(), options.apiVersion);
processIntOption(
args.OptionText(), args.OptionArg(), MIN_TESTABLE_API_VERSION, FDB_API_VERSION, options.apiVersion);
break;
case OPT_TRANSACTION_RETRY_LIMIT:
processIntOption(args.OptionText(), args.OptionArg(), 0, 1000, options.transactionRetryLimit);
break;
}
return true;
@ -284,6 +291,7 @@ bool runWorkloads(TesterOptions& options) {
txExecOptions.blockOnFutures = options.testSpec.blockOnFutures;
txExecOptions.numDatabases = options.numDatabases;
txExecOptions.databasePerTransaction = options.testSpec.databasePerTransaction;
txExecOptions.transactionRetryLimit = options.transactionRetryLimit;
std::unique_ptr<IScheduler> scheduler = createScheduler(options.numClientThreads);
std::unique_ptr<ITransactionExecutor> txExecutor = createTransactionExecutor(txExecOptions);

View File

@ -29,6 +29,8 @@ FDB_DOWNLOAD_ROOT = "https://github.com/apple/foundationdb/releases/download/"
CURRENT_VERSION = "7.2.0"
HEALTH_CHECK_TIMEOUT_SEC = 5
PROGRESS_CHECK_TIMEOUT_SEC = 30
TRANSACTION_RETRY_LIMIT = 100
RUN_WITH_GDB = False
def make_executable(path):
@ -243,7 +245,10 @@ class UpgradeTest:
'--output-pipe', self.output_pipe_path,
'--api-version', str(self.api_version),
'--log',
'--log-dir', self.log]
'--log-dir', self.log,
'--transaction-retry-limit', str(TRANSACTION_RETRY_LIMIT)]
if (RUN_WITH_GDB):
cmd_args = ['gdb', '-ex', 'run', '--args'] + cmd_args
print("Executing test command: {}".format(
" ".join([str(c) for c in cmd_args])))
@ -264,7 +269,8 @@ class UpgradeTest:
def progress_check(self, ctrl_pipe):
self.progress_event.clear()
os.write(ctrl_pipe, b"CHECK\n")
self.progress_event.wait(PROGRESS_CHECK_TIMEOUT_SEC)
self.progress_event.wait(
None if RUN_WITH_GDB else PROGRESS_CHECK_TIMEOUT_SEC)
if (self.progress_event.is_set()):
print("Progress check: OK")
else:
@ -327,7 +333,7 @@ class UpgradeTest:
test_retcode = 1
try:
workload_thread = Thread(
target=self.exec_workload, args=(args.test_file))
target=self.exec_workload, args=(args.test_file,))
workload_thread.start()
reader_thread = Thread(target=self.output_pipe_reader)
@ -414,7 +420,6 @@ if __name__ == "__main__":
)
parser.add_argument(
'--test-file',
nargs='+',
help='A .toml file describing a test workload to be generated with fdb_c_api_tester',
required=True,
)
@ -430,11 +435,19 @@ if __name__ == "__main__":
help='Do not dump cluster log on error',
action="store_true"
)
parser.add_argument(
'--run-with-gdb',
help='Execute the tester binary from gdb',
action="store_true"
)
args = parser.parse_args()
if (args.process_number == 0):
args.process_number = random.randint(1, 5)
print("Testing with {} processes".format(args.process_number))
if (args.run_with_gdb):
RUN_WITH_GDB = True
errcode = 1
with UpgradeTest(args.build_dir, args.upgrade_path, args.process_number) as test:
print("log-dir: {}".format(test.log))