Break mako.cpp into separate files

- future wait and common error handling
- operations and sub-op steps
- blob granules API invocation prep
- asynchronous benchmark execution
This commit is contained in:
Junhyun Shim 2022-04-01 09:25:43 -07:00
parent 077d82c01c
commit d0b7eddbcc
10 changed files with 946 additions and 846 deletions

View File

@ -81,8 +81,20 @@ endif()
# check later whether this works
if(NOT WIN32 AND NOT IS_ARM_MAC)
set(MAKO_SRCS
test/mako/async.hpp
test/mako/async.cpp
test/mako/blob_granules.hpp
test/mako/blob_granules.cpp
test/mako/future.hpp
test/mako/logger.hpp
test/mako/mako.cpp
test/mako/mako.hpp
test/mako/operations.hpp
test/mako/operations.cpp
test/mako/process.hpp
test/mako/shm.hpp
test/mako/stats.hpp
test/mako/time.hpp
test/mako/utils.cpp
test/mako/utils.hpp)
add_subdirectory(test/unit/third_party)

View File

@ -0,0 +1,274 @@
#include <boost/asio.hpp>
#include "async.hpp"
#include "future.hpp"
#include "logger.hpp"
#include "operations.hpp"
#include "stats.hpp"
#include "time.hpp"
#include "utils.hpp"
extern thread_local mako::Logger logr;
using namespace fdb;
namespace mako {
void ResumableStateForPopulate::postNextTick() {
boost::asio::post(io_context, [this, state=shared_from_this()]() { runOneTick(); });
}
void ResumableStateForPopulate::runOneTick() {
const auto num_commit_every = args.txnspec.ops[OP_INSERT][OP_COUNT];
for (auto i = key_checkpoint; i <= key_end; i++) {
genKey(keystr, KEY_PREFIX, args, i);
randomString(valstr, args.value_length);
tx.set(keystr, valstr);
stats.incrOpCount(OP_INSERT);
if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) {
watch_commit.start();
auto f = tx.commit();
f.then([this, state=shared_from_this(), i](Future f) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"commit for populate returned '{}'",
err.what());
auto err_f = tx.onError(err);
err_f.then([this, state=shared_from_this()](Future f) {
const auto f_rc = handleForOnError(tx, f, "ON_ERROR_FOR_POPULATE");
if (f_rc == FutureRC::ABORT) {
signalEnd();
return;
} else {
postNextTick();
}
});
} else {
// successfully committed
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
stats.addLatency(OP_TASK, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
key_checkpoint = i + 1;
if (i != key_end) {
postNextTick();
} else {
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
key_end - key_begin + 1,
key_begin,
key_end,
toDoubleSeconds(watch_total.stop().diff()));
signalEnd();
return;
}
}
});
break;
}
}
}
void ResumableStateForRunWorkload::postNextTick() {
boost::asio::post(io_context, [this, state=shared_from_this()]() { runOneTick(); });
}
#define UNPACK_OP_INFO() \
[[maybe_unused]] auto& [op, count, step] = op_iter; \
[[maybe_unused]] const auto step_kind = opTable[op].stepKind(step)
void ResumableStateForRunWorkload::runOneTick() {
UNPACK_OP_INFO();
assert(op_iter != OpEnd);
watch_step.start();
if (step == 0 /* first step */) {
watch_per_op[op] = Stopwatch(watch_step.getStart());
}
auto f = opTable[op].stepFunction(step)(tx, args, key1, key2, val);
if (!f) {
// immediately completed client-side ops: e.g. set, setrange, clear, clearrange, ...
onStepSuccess();
} else {
f.then([this, state=shared_from_this()](Future f) {
UNPACK_OP_INFO();
if (step_kind != StepKind::ON_ERROR) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"step {} of op {} returned '{}'",
step,
opTable[op].name(),
err.what());
tx.onError(err).then([this, state=shared_from_this()](Future f) {
UNPACK_OP_INFO();
const auto rc = handleForOnError(tx, f, fmt::format("{}_STEP_{}", opTable[op].name(), step));
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
stopcount.fetch_add(1);
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
// restart this iteration from beginning
postNextTick();
});
return;
}
} else {
auto rc = handleForOnError(tx, f, "BG_ON_ERROR");
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
stopcount.fetch_add(1);
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
// restart this iteration from beginning
postNextTick();
return;
}
onStepSuccess();
});
}
}
void ResumableStateForRunWorkload::onStepSuccess() {
UNPACK_OP_INFO();
logr.debug("Step {}:{} succeeded", getOpName(op), step);
// step successful
watch_step.stop();
const auto do_sample = stats.getOpCount(OP_TASK) % args.sampling == 0;
if (step_kind == StepKind::COMMIT) {
// reset transaction boundary
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
if (do_sample) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, step_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
tx.reset();
watch_tx.startFromStop();
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
needs_commit = false;
}
// op completed successfully
if (step + 1 == opTable[op].steps()) {
if (opTable[op].needsCommit())
needs_commit = true;
watch_per_op[op].setStop(watch_step.getStop());
if (do_sample) {
const auto op_latency = watch_per_op[op].diff();
stats.addLatency(op, op_latency);
sample_bins[op].put(op_latency);
}
stats.incrOpCount(op);
}
op_iter = getOpNext(args, op_iter);
if (op_iter == OpEnd) {
if (needs_commit || args.commit_get) {
// task completed, need to commit before finish
watch_commit.start();
auto f = tx.commit().eraseType();
f.then([this, state=shared_from_this()](Future f) {
UNPACK_OP_INFO();
if (auto err = f.error()) {
// commit had errors
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"Post-iteration commit returned error: {}",
err.what());
tx.onError(err).then([this, state=shared_from_this()](Future f) {
const auto rc = handleForOnError(tx, f, "ON_ERROR");
if (rc == FutureRC::CONFLICT)
stats.incrConflictCount();
else
stats.incrErrorCount(OP_COMMIT);
if (rc == FutureRC::ABORT) {
signalEnd();
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
postNextTick();
});
} else {
// commit successful
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
watch_task.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
const auto task_duration = watch_task.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, commit_latency);
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(task_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
watch_task.startFromStop();
if (ended()) {
signalEnd();
return;
}
// start next iteration
op_iter = getOpBegin(args);
postNextTick();
}
});
} else {
// task completed but no need to commit
watch_task.stop();
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto task_duration = watch_task.diff();
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_TASK].put(task_duration);
}
stats.incrOpCount(OP_TASK);
watch_tx.startFromStop();
watch_task.startFromStop();
tx.reset();
if (ended()) {
signalEnd();
return;
}
op_iter = getOpBegin(args);
// run next iteration
postNextTick();
}
} else {
postNextTick();
}
}
} // namespace mako

View File

@ -0,0 +1,105 @@
#ifndef MAKO_ASYNC_HPP
#define MAKO_ASYNC_HPP
#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include "logger.hpp"
#include "mako.hpp"
#include "stats.hpp"
#include "time.hpp"
namespace mako {
// as we don't have coroutines yet, we need to store in heap the complete state of execution,
// such that we can resume exactly where we were from last database op.
struct ResumableStateForPopulate : std::enable_shared_from_this<ResumableStateForPopulate> {
Logger logr;
fdb::Database db;
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
LatencySampleBinArray sample_bins;
int key_begin;
int key_end;
int key_checkpoint;
fdb::ByteString keystr;
fdb::ByteString valstr;
Stopwatch watch_tx;
Stopwatch watch_commit;
Stopwatch watch_total;
ResumableStateForPopulate(
Logger logr,
fdb::Database db, fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
std::atomic<int>& stopcount, int key_begin, int key_end) :
logr(logr), db(db), tx(tx),
io_context(io_context), args(args), stats(stats), stopcount(stopcount),
key_begin(key_begin), key_end(key_end), key_checkpoint(key_begin)
{
keystr.reserve(args.key_length);
valstr.reserve(args.value_length);
}
void runOneTick();
void postNextTick();
void signalEnd() { stopcount.fetch_add(1); }
};
using PopulateStateHandle = std::shared_ptr<ResumableStateForPopulate>;
struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStateForRunWorkload> {
Logger logr;
fdb::Database db;
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
std::atomic<int> const& signal;
int max_iters;
OpIterator op_iter;
LatencySampleBinArray sample_bins;
fdb::ByteString key1;
fdb::ByteString key2;
fdb::ByteString val;
std::array<Stopwatch, MAX_OP> watch_per_op;
Stopwatch watch_step;
Stopwatch watch_commit;
Stopwatch watch_tx;
Stopwatch watch_task;
bool needs_commit;
ResumableStateForRunWorkload(
Logger logr,
fdb::Database db, fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
std::atomic<int>& stopcount, std::atomic<int> const& signal,
int max_iters, OpIterator op_iter) :
logr(logr), db(db), tx(tx), io_context(io_context), args(args), stats(stats),
stopcount(stopcount), signal(signal), max_iters(max_iters), op_iter(op_iter), needs_commit(false)
{
key1.reserve(args.key_length);
key2.reserve(args.key_length);
val.reserve(args.value_length);
}
void signalEnd() noexcept { stopcount.fetch_add(1); }
bool ended() noexcept {
return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TASK)) || signal.load() == SIGNAL_RED;
}
void postNextTick();
void runOneTick();
void onStepSuccess();
};
using RunWorkloadStateHandle = std::shared_ptr<ResumableStateForRunWorkload>;
} // namespace mako
#endif /*MAKO_ASYNC_HPP*/

View File

@ -0,0 +1,93 @@
#include "blob_granules.hpp"
#include "logger.hpp"
#include <cstdio>
#include <fdb.hpp>
extern thread_local mako::Logger logr;
namespace mako::blob_granules::local_file {
int64_t startLoad(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
void* userContext) {
FILE* fp;
char full_fname[PATH_MAX]{
0,
};
int loadId;
uint8_t* data;
size_t readSize;
auto context = static_cast<UserContext*>(userContext);
loadId = context->nextId;
if (context->dataById[loadId] != 0) {
logr.error("too many granule file loads at once: {}", MAX_BG_IDS);
return -1;
}
context->nextId = (context->nextId + 1) % MAX_BG_IDS;
int ret = snprintf(full_fname, PATH_MAX, "%s%s", context->bgFilePath, filename);
if (ret < 0 || ret >= PATH_MAX) {
logr.error("BG filename too long: {}{}", context->bgFilePath, filename);
return -1;
}
fp = fopen(full_fname, "r");
if (!fp) {
logr.error("BG could not open file: {}", full_fname);
return -1;
}
// don't seek if offset == 0
if (offset && fseek(fp, offset, SEEK_SET)) {
// if fseek was non-zero, it failed
logr.error("BG could not seek to %{} in file {}", offset, full_fname);
fclose(fp);
return -1;
}
data = new uint8_t[length];
readSize = fread(data, sizeof(uint8_t), length, fp);
fclose(fp);
if (readSize != length) {
logr.error("BG could not read {} bytes from file: {}", length, full_fname);
return -1;
}
context->dataById[loadId] = data;
return loadId;
}
uint8_t* getLoad(int64_t loadId, void* userContext) {
auto context = static_cast<UserContext*>(userContext);
if (context->dataById[loadId] == 0) {
logr.error("BG loadId invalid for get_load: {}", loadId);
return 0;
}
return context->dataById[loadId];
}
void freeLoad(int64_t loadId, void* userContext) {
auto context = static_cast<UserContext*>(userContext);
if (context->dataById[loadId] == 0) {
logr.error("BG loadId invalid for free_load: {}", loadId);
}
delete[] context->dataById[loadId];
context->dataById[loadId] = 0;
}
fdb::native::FDBReadBlobGranuleContext createApiContext(UserContext& ctx, bool materialize_files) {
auto ret = fdb::native::FDBReadBlobGranuleContext{};
ret.userContext = &ctx;
ret.start_load_f = &startLoad;
ret.get_load_f = &getLoad;
ret.free_load_f = &freeLoad;
ret.debugNoMaterialize = !materialize_files;
return ret;
}
} // namespace mako::blob_granules::local_file

View File

@ -0,0 +1,29 @@
#ifndef MAKO_BLOB_GRANULES_HPP
#define MAKO_BLOB_GRANULES_HPP
#include <cstdint>
#include <memory>
#include <fdb.hpp>
namespace mako::blob_granules::local_file {
constexpr const int MAX_BG_IDS = 1000;
// TODO: could always abstract this into something more generically usable by something other than mako.
// But outside of testing there are likely few use cases for local granules
struct UserContext {
char const* bgFilePath;
int nextId;
std::unique_ptr<uint8_t*[]> dataByIdMem;
uint8_t** dataById;
UserContext(char const* filePath) : bgFilePath(filePath), nextId(0), dataByIdMem(new uint8_t*[MAX_BG_IDS]()), dataById(dataByIdMem.get()) {}
void clear() { dataByIdMem.reset(); }
};
fdb::native::FDBReadBlobGranuleContext createApiContext(UserContext& ctx, bool materialize_files);
} // namespace mako::blob_granules::local_file
#endif /*MAKO_BLOB_GRANULES_HPP*/

View File

@ -0,0 +1,68 @@
#ifndef MAKO_FUTURE_HPP
#define MAKO_FUTURE_HPP
#include <fdb.hpp>
#include <cassert>
#include <string_view>
#include "logger.hpp"
extern thread_local mako::Logger logr;
namespace mako {
enum class FutureRC { OK, RETRY, CONFLICT, ABORT };
template <class FutureType>
FutureRC handleForOnError(fdb::Transaction tx, FutureType f, std::string_view step) {
if (auto err = f.error()) {
if (err.is(1020 /*not_committed*/)) {
return FutureRC::CONFLICT;
} else if (err.retryable()) {
logr.warn("Retryable error '{}' found at on_error(), step: {}", err.what(), step);
return FutureRC::RETRY;
} else {
logr.error("Unretryable error '{}' found at on_error(), step: {}", err.what(), step);
tx.reset();
return FutureRC::ABORT;
}
} else {
return FutureRC::RETRY;
}
}
template <class FutureType>
FutureRC waitAndHandleForOnError(fdb::Transaction tx, FutureType f, std::string_view step) {
assert(f);
if (auto err = f.blockUntilReady()) {
logr.error("'{}' found while waiting for on_error() future, step: {}", err.what(), step);
return FutureRC::ABORT;
}
return handleForOnError(tx, f, step);
}
// wait on any non-immediate tx-related step to complete. Follow up with on_error().
template <class FutureType>
FutureRC waitAndHandleError(fdb::Transaction tx, FutureType f, std::string_view step) {
assert(f);
auto err = fdb::Error{};
if ((err = f.blockUntilReady())) {
const auto retry = err.retryable();
logr.error("{} error '{}' found during step: {}", (retry ? "Retryable" : "Unretryable"), err.what(), step);
return retry ? FutureRC::RETRY : FutureRC::ABORT;
}
err = f.error();
if (!err)
return FutureRC::OK;
if (err.retryable()) {
logr.warn("step {} returned '{}'", step, err.what());
} else {
logr.error("step {} returned '{}'", step, err.what());
}
// implicit backoff
auto follow_up = tx.onError(err);
return waitAndHandleForOnError(tx, f, step);
}
} // namespace mako
#endif /*MAKO_FUTURE_HPP*/

View File

@ -39,66 +39,14 @@
#include "mako.hpp"
#include "process.hpp"
#include "utils.hpp"
#include "future.hpp"
#include "async.hpp"
using namespace fdb;
using namespace mako;
thread_local Logger logr = Logger(MainProcess{}, VERBOSE_DEFAULT);
enum class FutureRC { OK, RETRY, CONFLICT, ABORT };
template <class FutureType>
FutureRC handleForOnError(Transaction tx, FutureType f, std::string_view step) {
if (auto err = f.error()) {
if (err.is(1020 /*not_committed*/)) {
return FutureRC::CONFLICT;
} else if (err.retryable()) {
logr.warn("Retryable error '{}' found at on_error(), step: {}", err.what(), step);
return FutureRC::RETRY;
} else {
logr.error("Unretryable error '{}' found at on_error(), step: {}", err.what(), step);
tx.reset();
return FutureRC::ABORT;
}
} else {
return FutureRC::RETRY;
}
}
template <class FutureType>
FutureRC waitAndHandleForOnError(Transaction tx, FutureType f, std::string_view step) {
assert(f);
auto err = Error{};
if ((err = f.blockUntilReady())) {
logr.error("'{}' found while waiting for on_error() future, step: {}", err.what(), step);
return FutureRC::ABORT;
}
return handleForOnError(tx, f, step);
}
// wait on any non-immediate tx-related step to complete. Follow up with on_error().
template <class FutureType>
FutureRC waitAndHandleError(Transaction tx, FutureType f, std::string_view step) {
assert(f);
auto err = Error{};
if ((err = f.blockUntilReady())) {
const auto retry = err.retryable();
logr.error("{} error '{}' found during step: {}", (retry ? "Retryable" : "Unretryable"), err.what(), step);
return retry ? FutureRC::RETRY : FutureRC::ABORT;
}
err = f.error();
if (!err)
return FutureRC::OK;
if (err.retryable()) {
logr.warn("step {} returned '{}'", step, err.what());
} else {
logr.error("step {} returned '{}'", step, err.what());
}
// implicit backoff
auto follow_up = tx.onError(err);
return waitAndHandleForOnError(tx, f, step);
}
/* cleanup database */
int cleanup(Transaction tx, Arguments const& args) {
auto beginstr = ByteString{};
@ -233,378 +181,6 @@ int populate(Transaction tx,
return 0;
}
// TODO: could always abstract this into something more generically usable by something other than mako.
// But outside of testing there are likely few use cases for local granules
typedef struct {
char const* bgFilePath;
int nextId;
uint8_t** data_by_id;
} BGLocalFileContext;
int64_t granule_start_load(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
void* userContext) {
FILE* fp;
char full_fname[PATH_MAX]{
0,
};
int loadId;
uint8_t* data;
size_t readSize;
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
loadId = context->nextId;
if (context->data_by_id[loadId] != 0) {
logr.error("too many granule file loads at once: {}", MAX_BG_IDS);
return -1;
}
context->nextId = (context->nextId + 1) % MAX_BG_IDS;
int ret = snprintf(full_fname, PATH_MAX, "%s%s", context->bgFilePath, filename);
if (ret < 0 || ret >= PATH_MAX) {
logr.error("BG filename too long: {}{}", context->bgFilePath, filename);
return -1;
}
fp = fopen(full_fname, "r");
if (!fp) {
logr.error("BG could not open file: {}", full_fname);
return -1;
}
// don't seek if offset == 0
if (offset && fseek(fp, offset, SEEK_SET)) {
// if fseek was non-zero, it failed
logr.error("BG could not seek to %{} in file {}", offset, full_fname);
fclose(fp);
return -1;
}
data = new uint8_t[length];
readSize = fread(data, sizeof(uint8_t), length, fp);
fclose(fp);
if (readSize != length) {
logr.error("BG could not read {} bytes from file: {}", length, full_fname);
return -1;
}
context->data_by_id[loadId] = data;
return loadId;
}
uint8_t* granule_get_load(int64_t loadId, void* userContext) {
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
if (context->data_by_id[loadId] == 0) {
logr.error("BG loadId invalid for get_load: {}", loadId);
return 0;
}
return context->data_by_id[loadId];
}
void granule_free_load(int64_t loadId, void* userContext) {
BGLocalFileContext* context = (BGLocalFileContext*)userContext;
if (context->data_by_id[loadId] == 0) {
logr.error("BG loadId invalid for free_load: {}", loadId);
}
delete[] context->data_by_id[loadId];
context->data_by_id[loadId] = 0;
}
inline int nextKey(Arguments const& args) {
if (args.zipf)
return zipfian_next();
return urand(0, args.rows - 1);
}
const std::array<Operation, MAX_OP> opTable{
{ { "GRV",
{ { StepKind::READ,
[](Transaction tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
return tx.getReadVersion().eraseType();
} } },
false },
{ "GET",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
} } },
false },
{ "GETRANGE",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
auto num_end = num_begin + args.txnspec.ops[OP_GETRANGE][OP_RANGE] - 1;
if (num_end > args.rows - 1)
num_end = args.rows - 1;
genKey(end, KEY_PREFIX, args, num_end);
return tx
.getRange<key_select::Inclusive, key_select::Inclusive>(begin,
end,
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
false /*snapshot*/,
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
.eraseType();
} } },
false },
{ "SGET",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, true /*snapshot*/).eraseType();
} } },
false },
{ "SGETRANGE",
{ {
StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
auto num_end = num_begin + args.txnspec.ops[OP_SGETRANGE][OP_RANGE] - 1;
if (num_end > args.rows - 1)
num_end = args.rows - 1;
genKey(end, KEY_PREFIX, args, num_end);
return tx
.getRange<key_select::Inclusive, key_select::Inclusive>(begin,
end,
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
true /*snapshot*/,
args.txnspec.ops[OP_SGETRANGE][OP_REVERSE])
.eraseType();
}
} },
false },
{ "UPDATE",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "INSERT",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
// concat([padding], key_prefix, random_string): reasonably unique
randomString<false /*clear-before-append*/>(key, args.key_length - static_cast<int>(key.size()));
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "INSERTRANGE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
const auto range = args.txnspec.ops[OP_INSERTRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
assert(args.key_length - prefix_len >= range_digits);
const auto rand_len = args.key_length - prefix_len - range_digits;
// concat([padding], prefix, random_string, range_digits)
randomString<false /*clear-before-append*/>(key, rand_len);
randomString(value, args.value_length);
for (auto i = 0; i < range; i++) {
fmt::format_to(std::back_inserter(key), "{0:0{1}d}", i, range_digits);
tx.set(key, value);
key.resize(key.size() - static_cast<size_t>(range_digits));
}
return Future();
} } },
true },
{ "OVERWRITE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKey(key, KEY_PREFIX, args, nextKey(args));
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "CLEAR",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
genKey(key, KEY_PREFIX, args, nextKey(args));
tx.clear(key);
return Future();
} } },
true },
{ "SETCLEAR",
{ { StepKind::COMMIT,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
randomString<false /*append-after-clear*/>(key, args.key_length - prefix_len);
randomString(value, args.value_length);
tx.set(key, value);
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
tx.reset(); // assuming commit from step 0 worked.
tx.clear(key); // key should forward unchanged from step 0
return Future();
} } },
true },
{ "CLEARRANGE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
const auto range = args.txnspec.ops[OP_CLEARRANGE][OP_RANGE];
assert(range > 0);
genKey(end, KEY_PREFIX, args, std::min(args.rows - 1, num_begin + range - 1));
tx.clearRange(begin, end);
return Future();
} } },
true },
{ "SETCLEARRANGE",
{ { StepKind::COMMIT,
[](Transaction tx, Arguments const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
const auto range = args.txnspec.ops[OP_SETCLEARRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
assert(args.key_length - prefix_len >= range_digits);
const auto rand_len = args.key_length - prefix_len - range_digits;
// concat([padding], prefix, random_string, range_digits)
randomString<false /*clear-before-append*/>(key, rand_len);
randomString(value, args.value_length);
for (auto i = 0; i <= range; i++) {
fmt::format_to(std::back_inserter(key), "{0:0{1}d}", i, range_digits);
if (i == range)
break; // preserve "exclusive last"
// preserve first key for step 1
if (i == 0)
key_begin = key;
tx.set(key, value);
// preserve last key for step 1
key.resize(key.size() - static_cast<size_t>(range_digits));
}
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
tx.reset();
tx.clearRange(begin, end);
return Future();
} } },
true },
{ "COMMIT", { { StepKind::NONE, nullptr } }, false },
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, false },
{ "TASK", { { StepKind::NONE, nullptr } }, false },
{ "READBLOBGRANULE",
{ { StepKind::ON_ERROR,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
const auto range = args.txnspec.ops[OP_READ_BG][OP_RANGE];
assert(range > 0);
genKey(end, KEY_PREFIX, args, std::min(args.rows - 1, num_begin + range - 1));
auto err = Error{};
err = tx.setOptionNothrow(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, BytesRef());
if (err) {
// Issuing read/writes before disabling RYW results in error.
// Possible malformed workload?
// As workloads execute in sequence, retrying would likely repeat this error.
fmt::print(stderr, "ERROR: TR_OPTION_READ_YOUR_WRITES_DISABLE: {}", err.what());
return Future();
}
auto mem = std::unique_ptr<uint8_t*[]>(new uint8_t*[MAX_BG_IDS]);
// Allocate a separate context per call to avoid multiple threads accessing
BGLocalFileContext fileContext;
fileContext.bgFilePath = args.bg_file_path;
fileContext.nextId = 0;
fileContext.data_by_id = mem.get();
memset(fileContext.data_by_id, 0, MAX_BG_IDS * sizeof(uint8_t*));
native::FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = &fileContext;
granuleContext.start_load_f = &granule_start_load;
granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load;
granuleContext.debugNoMaterialize = !args.bg_materialize_files;
auto r =
tx.readBlobGranules(begin, end, 0 /*begin_version*/, -1 /*end_version, use txn's*/, granuleContext);
mem.reset();
auto out = Result::KeyValueArray{};
err = r.getKeyValueArrayNothrow(out);
if (!err || err.is(2037 /*blob_granule_not_materialized*/))
return Future();
const auto level = (err.is(1020 /*not_committed*/) || err.is(1021 /*commit_unknown_result*/) ||
err.is(1213 /*tag_throttled*/))
? VERBOSE_WARN
: VERBOSE_NONE;
logr.printWithLogLevel(level, "ERROR", "get_keyvalue_array() after readBlobGranules(): {}", err.what());
return tx.onError(err).eraseType();
} } },
false } }
};
char const* getOpName(int ops_code) {
if (ops_code >= 0 && ops_code < MAX_OP)
return opTable[ops_code].name().data();
return "";
}
using OpIterator = std::tuple<int /*op*/, int /*count*/, int /*step*/>;
constexpr const OpIterator OpEnd = OpIterator(MAX_OP, -1, -1);
OpIterator getOpBegin(Arguments const& args) noexcept {
for (auto op = 0; op < MAX_OP; op++) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] == 0)
continue;
return OpIterator(op, 0, 0);
}
return OpEnd;
}
OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept {
if (OpEnd == current)
return OpEnd;
auto [op, count, step] = current;
assert(op < MAX_OP && !isAbstractOp(op));
if (opTable[op].steps() > step + 1)
return OpIterator(op, count, step + 1);
count++;
for (; op < MAX_OP; op++, count = 0) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] <= count)
continue;
return OpIterator(op, count, 0);
}
return OpEnd;
}
/* run one transaction */
int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, LatencySampleBinArray& sample_bins) {
// reuse memory for keys to avoid realloc overhead
@ -867,353 +443,6 @@ void dumpThreadSamples(Arguments const& args,
}
}
// as we don't have coroutines yet, we need to store in heap the complete state of execution,
// such that we can reconstruct and resume exactly where we were from last database op.
struct ResumableStateForPopulate {
Logger logr;
Database db;
Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
LatencySampleBinArray sample_bins;
int key_begin;
int key_end;
int key_checkpoint;
ByteString keystr;
ByteString valstr;
Stopwatch watch_tx;
Stopwatch watch_commit;
Stopwatch watch_total;
};
using PopulateStateHandle = std::shared_ptr<ResumableStateForPopulate>;
// avoid redeclaring each variable for each continuation after async op
#define UNPACK_RESUMABLE_STATE_FOR_POPULATE(p_state) \
[[maybe_unused]] auto& [logr, \
db, \
tx, \
io_context, \
args, \
stats, \
stopcount, \
sample_bins, \
key_begin, \
key_end, \
key_checkpoint, \
keystr, \
valstr, \
watch_tx, \
watch_commit, \
watch_total] = *p_state
void resumablePopulate(PopulateStateHandle state) {
UNPACK_RESUMABLE_STATE_FOR_POPULATE(state);
const auto num_commit_every = args.txnspec.ops[OP_INSERT][OP_COUNT];
for (auto i = key_checkpoint; i <= key_end; i++) {
genKey(keystr, KEY_PREFIX, args, i);
randomString(valstr, args.value_length);
tx.set(keystr, valstr);
stats.incrOpCount(OP_INSERT);
if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) {
watch_commit.start();
auto f = tx.commit();
f.then([state, i](Future f) {
UNPACK_RESUMABLE_STATE_FOR_POPULATE(state);
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"commit for populate returned '{}'",
err.what());
auto err_f = tx.onError(err);
err_f.then([state](Future f) {
UNPACK_RESUMABLE_STATE_FOR_POPULATE(state);
const auto f_rc = handleForOnError(tx, f, "ON_ERROR_FOR_POPULATE");
if (f_rc == FutureRC::ABORT) {
stopcount.fetch_add(1);
return;
} else {
boost::asio::post(io_context, [state]() { resumablePopulate(state); });
}
});
} else {
// successfully committed
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
stats.addLatency(OP_TASK, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
key_checkpoint = i + 1;
if (i != key_end) {
boost::asio::post(io_context, [state]() { resumablePopulate(state); });
} else {
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
key_end - key_begin + 1,
key_begin,
key_end,
toDoubleSeconds(watch_total.stop().diff()));
stopcount.fetch_add(1);
return;
}
}
});
break;
}
}
}
struct ResumableStateForRunWorkload {
Logger logr;
Database db;
Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
std::atomic<int> const& signal;
int max_iters;
OpIterator op_iter;
LatencySampleBinArray sample_bins;
ByteString key1;
ByteString key2;
ByteString val;
std::array<Stopwatch, MAX_OP> watch_per_op;
Stopwatch watch_step;
Stopwatch watch_commit;
Stopwatch watch_tx;
Stopwatch watch_task;
bool needs_commit;
void signalEnd() noexcept { stopcount.fetch_add(1); }
bool ended() noexcept {
return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TASK)) || signal.load() == SIGNAL_RED;
}
};
using RunWorkloadStateHandle = std::shared_ptr<ResumableStateForRunWorkload>;
#define UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(p_state) \
[[maybe_unused]] auto& [logr, \
db, \
tx, \
io_context, \
args, \
stats, \
stopcount, \
signal, \
max_iters, \
op_iter, \
sample_bins, \
key1, \
key2, \
val, \
watch_per_op, \
watch_step, \
watch_commit, \
watch_tx, \
watch_task, \
needs_commit] = *p_state; \
[[maybe_unused]] auto& [op, count, step] = op_iter; \
[[maybe_unused]] const auto step_kind = opTable[op].stepKind(step)
void resumableRunWorkload(RunWorkloadStateHandle state);
void onStepSuccess(RunWorkloadStateHandle state) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
logr.debug("Step {}:{} succeeded", getOpName(op), step);
// step successful
watch_step.stop();
const auto do_sample = stats.getOpCount(OP_TASK) % args.sampling == 0;
if (step_kind == StepKind::COMMIT) {
// reset transaction boundary
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
if (do_sample) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, step_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
tx.reset();
watch_tx.startFromStop();
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
needs_commit = false;
}
// op completed successfully
if (step + 1 == opTable[op].steps()) {
if (opTable[op].needsCommit())
needs_commit = true;
watch_per_op[op].setStop(watch_step.getStop());
if (do_sample) {
const auto op_latency = watch_per_op[op].diff();
stats.addLatency(op, op_latency);
sample_bins[op].put(op_latency);
}
stats.incrOpCount(op);
}
op_iter = getOpNext(args, op_iter);
if (op_iter == OpEnd) {
if (needs_commit || args.commit_get) {
// task completed, need to commit before finish
watch_commit.start();
auto f = tx.commit().eraseType();
f.then([state](Future f) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
if (auto err = f.error()) {
// commit had errors
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"Post-iteration commit returned error: {}",
err.what());
tx.onError(err).then([state](Future f) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
const auto rc = handleForOnError(tx, f, "ON_ERROR");
if (rc == FutureRC::CONFLICT)
stats.incrConflictCount();
else
stats.incrErrorCount(OP_COMMIT);
if (rc == FutureRC::ABORT) {
state->signalEnd();
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
});
} else {
// commit successful
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
watch_task.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
const auto task_duration = watch_task.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, commit_latency);
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(task_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
watch_task.startFromStop();
if (state->ended()) {
state->signalEnd();
return;
}
// start next iteration
op_iter = getOpBegin(args);
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
}
});
} else {
// task completed but no need to commit
watch_task.stop();
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto task_duration = watch_task.diff();
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_TASK].put(task_duration);
}
stats.incrOpCount(OP_TASK);
watch_tx.startFromStop();
watch_task.startFromStop();
tx.reset();
if (state->ended()) {
state->signalEnd();
return;
}
op_iter = getOpBegin(args);
// run next iteration
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
}
} else {
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
}
}
void resumableRunWorkload(RunWorkloadStateHandle state) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
assert(op_iter != OpEnd);
watch_step.start();
if (step == 0 /* first step */) {
watch_per_op[op] = Stopwatch(watch_step.getStart());
}
auto f = opTable[op].stepFunction(step)(tx, args, key1, key2, val);
if (!f) {
// immediately completed client-side ops: e.g. set, setrange, clear, clearrange, ...
onStepSuccess(state);
} else {
f.then([state](Future f) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
if (step_kind != StepKind::ON_ERROR) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"step {} of op {} returned '{}'",
step,
opTable[op].name(),
err.what());
tx.onError(err).then([state](Future f) {
UNPACK_RESUMABLE_STATE_FOR_RUN_WORKLOAD(state);
const auto rc = handleForOnError(tx, f, fmt::format("{}_STEP_{}", opTable[op].name(), step));
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
stopcount.fetch_add(1);
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
// restart this iteration from beginning
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
});
return;
}
} else {
auto rc = handleForOnError(tx, f, "BG_ON_ERROR");
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
stopcount.fetch_add(1);
return;
}
op_iter = getOpBegin(args);
needs_commit = false;
// restart this iteration from beginning
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
return;
}
onStepSuccess(state);
});
}
}
void runAsyncWorkload(Arguments const& args,
pid_t pid_main,
int worker_id,
@ -1227,39 +456,31 @@ void runAsyncWorkload(Arguments const& args,
overwrite = false;
}
};
std::atomic<int> stopcount = ATOMIC_VAR_INIT(0);
auto stopcount = std::atomic<int>{};
if (args.mode == MODE_BUILD) {
auto states = std::vector<PopulateStateHandle>(args.async_xacts);
for (auto i = 0; i < args.async_xacts; i++) {
const auto key_begin = insertBegin(args.rows, worker_id, i, args.num_processes, args.async_xacts);
const auto key_end = insertEnd(args.rows, worker_id, i, args.num_processes, args.async_xacts);
auto db = databases[i % args.num_databases];
auto state =
PopulateStateHandle(new ResumableStateForPopulate{ Logger(WorkerProcess{}, args.verbose, worker_id, i),
db,
db.createTransaction(),
io_context,
args,
shm.statsSlot(worker_id, i),
stopcount,
LatencySampleBinArray(),
key_begin,
key_end,
key_begin,
ByteString{},
ByteString{},
Stopwatch(StartAtCtor{}),
Stopwatch(),
Stopwatch(StartAtCtor{}) });
auto state = std::make_shared<ResumableStateForPopulate>(
Logger(WorkerProcess{}, args.verbose, worker_id, i),
db,
db.createTransaction(),
io_context,
args,
shm.statsSlot(worker_id, i),
stopcount,
key_begin,
key_end);
state->watch_tx.start();
state->watch_total.start();
states[i] = state;
state->keystr.reserve(args.key_length);
state->valstr.reserve(args.value_length);
}
while (shm.headerConst().signal.load() != SIGNAL_GREEN)
usleep(1000);
// launch [async_xacts] concurrent transactions
for (auto state : states)
boost::asio::post(io_context, [state]() { resumablePopulate(state); });
for (auto state : states) state->postNextTick();
while (stopcount.load() != args.async_xacts)
usleep(1000);
dump_samples(states);
@ -1271,8 +492,8 @@ void runAsyncWorkload(Arguments const& args,
args.iteration == 0
? -1
: computeThreadIters(args.iteration, worker_id, i, args.num_processes, args.async_xacts);
auto state = RunWorkloadStateHandle(
new ResumableStateForRunWorkload{ Logger(WorkerProcess{}, args.verbose, worker_id, i),
auto state = std::make_shared<ResumableStateForRunWorkload>(
Logger(WorkerProcess{}, args.verbose, worker_id, i),
db,
db.createTransaction(),
io_context,
@ -1281,26 +502,14 @@ void runAsyncWorkload(Arguments const& args,
stopcount,
shm.headerConst().signal,
max_iters,
getOpBegin(args),
LatencySampleBinArray(),
ByteString{},
ByteString{},
ByteString{},
std::array<Stopwatch, MAX_OP>{},
Stopwatch() /*watch_step*/,
Stopwatch() /*watch_commit*/,
Stopwatch(StartAtCtor{}) /*watch_tx*/,
Stopwatch(StartAtCtor{}) /*watch_task*/,
false /*needs_commit*/ });
getOpBegin(args));
states[i] = state;
state->key1.reserve(args.key_length);
state->key2.reserve(args.key_length);
state->val.reserve(args.value_length);
state->watch_task.start();
state->watch_tx.start();
}
while (shm.headerConst().signal.load() != SIGNAL_GREEN)
usleep(1000);
for (auto state : states)
boost::asio::post(io_context, [state]() { resumableRunWorkload(state); });
for (auto state : states) state->postNextTick();
logr.debug("Launched {} concurrent transactions", states.size());
while (stopcount.load() != args.async_xacts)
usleep(1000);

View File

@ -81,7 +81,6 @@ constexpr const int KNOB_MAX = 256;
constexpr const int TAGPREFIXLENGTH_MAX = 8;
constexpr const int NUM_CLUSTERS_MAX = 3;
constexpr const int NUM_DATABASES_MAX = 10;
constexpr const int MAX_BG_IDS = 1000;
constexpr const std::string_view KEY_PREFIX{ "mako" };
constexpr const std::string_view TEMP_DATA_STORE{ "/tmp/makoTemp" };
@ -145,37 +144,6 @@ struct alignas(64) ThreadArgs {
fdb::Database database; // database to work with
};
using StepFunction = fdb::Future (*)(fdb::Transaction tx,
Arguments const&,
fdb::ByteString& /*key1*/,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
class Operation {
using Step = std::pair<StepKind, StepFunction>;
std::string_view name_;
std::vector<Step> steps_;
bool needs_commit_;
public:
Operation(std::string_view name, std::vector<Step>&& steps, bool needs_commit)
: name_(name), steps_(std::move(steps)), needs_commit_(needs_commit) {}
std::string_view name() const noexcept { return name_; }
StepKind stepKind(int step) const noexcept {
assert(step < steps());
return steps_[step].first;
}
StepFunction stepFunction(int step) const noexcept { return steps_[step].second; }
// how many steps in this op?
int steps() const noexcept { return static_cast<int>(steps_.size()); }
// does the op needs to commit some time after its final step?
bool needsCommit() const noexcept { return needs_commit_; }
};
} // namespace mako
#endif /* MAKO_HPP */

View File

@ -0,0 +1,292 @@
#include "blob_granules.hpp"
#include "operations.hpp"
#include "mako.hpp"
#include "logger.hpp"
#include "utils.hpp"
#include "fdbclient/zipf.h"
#include <array>
extern thread_local mako::Logger logr;
namespace mako {
OpIterator getOpBegin(Arguments const& args) noexcept {
for (auto op = 0; op < MAX_OP; op++) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] == 0)
continue;
return OpIterator(op, 0, 0);
}
return OpEnd;
}
OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept {
if (OpEnd == current)
return OpEnd;
auto [op, count, step] = current;
assert(op < MAX_OP && !isAbstractOp(op));
if (opTable[op].steps() > step + 1)
return OpIterator(op, count, step + 1);
count++;
for (; op < MAX_OP; op++, count = 0) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] <= count)
continue;
return OpIterator(op, count, 0);
}
return OpEnd;
}
using namespace fdb;
inline int nextKey(Arguments const& args) {
if (args.zipf)
return zipfian_next();
return urand(0, args.rows - 1);
}
char const* getOpName(int ops_code) {
if (ops_code >= 0 && ops_code < MAX_OP)
return opTable[ops_code].name().data();
return "";
}
const std::array<Operation, MAX_OP> opTable{
{ { "GRV",
{ { StepKind::READ,
[](Transaction tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
return tx.getReadVersion().eraseType();
} } },
false },
{ "GET",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
} } },
false },
{ "GETRANGE",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
auto num_end = num_begin + args.txnspec.ops[OP_GETRANGE][OP_RANGE] - 1;
if (num_end > args.rows - 1)
num_end = args.rows - 1;
genKey(end, KEY_PREFIX, args, num_end);
return tx
.getRange<key_select::Inclusive, key_select::Inclusive>(begin,
end,
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
false /*snapshot*/,
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
.eraseType();
} } },
false },
{ "SGET",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, true /*snapshot*/).eraseType();
} } },
false },
{ "SGETRANGE",
{ {
StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
auto num_end = num_begin + args.txnspec.ops[OP_SGETRANGE][OP_RANGE] - 1;
if (num_end > args.rows - 1)
num_end = args.rows - 1;
genKey(end, KEY_PREFIX, args, num_end);
return tx
.getRange<key_select::Inclusive, key_select::Inclusive>(begin,
end,
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
true /*snapshot*/,
args.txnspec.ops[OP_SGETRANGE][OP_REVERSE])
.eraseType();
}
} },
false },
{ "UPDATE",
{ { StepKind::READ,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "INSERT",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
// concat([padding], key_prefix, random_string): reasonably unique
randomString<false /*clear-before-append*/>(key, args.key_length - static_cast<int>(key.size()));
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "INSERTRANGE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
const auto range = args.txnspec.ops[OP_INSERTRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
assert(args.key_length - prefix_len >= range_digits);
const auto rand_len = args.key_length - prefix_len - range_digits;
// concat([padding], prefix, random_string, range_digits)
randomString<false /*clear-before-append*/>(key, rand_len);
randomString(value, args.value_length);
for (auto i = 0; i < range; i++) {
fmt::format_to(std::back_inserter(key), "{0:0{1}d}", i, range_digits);
tx.set(key, value);
key.resize(key.size() - static_cast<size_t>(range_digits));
}
return Future();
} } },
true },
{ "OVERWRITE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKey(key, KEY_PREFIX, args, nextKey(args));
randomString(value, args.value_length);
tx.set(key, value);
return Future();
} } },
true },
{ "CLEAR",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
genKey(key, KEY_PREFIX, args, nextKey(args));
tx.clear(key);
return Future();
} } },
true },
{ "SETCLEAR",
{ { StepKind::COMMIT,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
randomString<false /*append-after-clear*/>(key, args.key_length - prefix_len);
randomString(value, args.value_length);
tx.set(key, value);
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
tx.reset(); // assuming commit from step 0 worked.
tx.clear(key); // key should forward unchanged from step 0
return Future();
} } },
true },
{ "CLEARRANGE",
{ { StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
const auto range = args.txnspec.ops[OP_CLEARRANGE][OP_RANGE];
assert(range > 0);
genKey(end, KEY_PREFIX, args, std::min(args.rows - 1, num_begin + range - 1));
tx.clearRange(begin, end);
return Future();
} } },
true },
{ "SETCLEARRANGE",
{ { StepKind::COMMIT,
[](Transaction tx, Arguments const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
genKeyPrefix(key, KEY_PREFIX, args);
const auto prefix_len = static_cast<int>(key.size());
const auto range = args.txnspec.ops[OP_SETCLEARRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
assert(args.key_length - prefix_len >= range_digits);
const auto rand_len = args.key_length - prefix_len - range_digits;
// concat([padding], prefix, random_string, range_digits)
randomString<false /*clear-before-append*/>(key, rand_len);
randomString(value, args.value_length);
for (auto i = 0; i <= range; i++) {
fmt::format_to(std::back_inserter(key), "{0:0{1}d}", i, range_digits);
if (i == range)
break; // preserve "exclusive last"
// preserve first key for step 1
if (i == 0)
key_begin = key;
tx.set(key, value);
// preserve last key for step 1
key.resize(key.size() - static_cast<size_t>(range_digits));
}
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
tx.reset();
tx.clearRange(begin, end);
return Future();
} } },
true },
{ "COMMIT", { { StepKind::NONE, nullptr } }, false },
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, false },
{ "TASK", { { StepKind::NONE, nullptr } }, false },
{ "READBLOBGRANULE",
{ { StepKind::ON_ERROR,
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
const auto num_begin = nextKey(args);
genKey(begin, KEY_PREFIX, args, num_begin);
const auto range = args.txnspec.ops[OP_READ_BG][OP_RANGE];
assert(range > 0);
genKey(end, KEY_PREFIX, args, std::min(args.rows - 1, num_begin + range - 1));
auto err = Error{};
err = tx.setOptionNothrow(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, BytesRef());
if (err) {
// Issuing read/writes before disabling RYW results in error.
// Possible malformed workload?
// As workloads execute in sequence, retrying would likely repeat this error.
fmt::print(stderr, "ERROR: TR_OPTION_READ_YOUR_WRITES_DISABLE: {}", err.what());
return Future();
}
// Allocate a separate context per call to avoid multiple threads accessing
auto user_context = blob_granules::local_file::UserContext(args.bg_file_path);
auto api_context = blob_granules::local_file::createApiContext(user_context, args.bg_materialize_files);
auto r =
tx.readBlobGranules(begin, end, 0 /*begin_version*/, -1 /*end_version, use txn's*/, api_context);
user_context.clear();
auto out = Result::KeyValueArray{};
err = r.getKeyValueArrayNothrow(out);
if (!err || err.is(2037 /*blob_granule_not_materialized*/))
return Future();
const auto level = (err.is(1020 /*not_committed*/) || err.is(1021 /*commit_unknown_result*/) ||
err.is(1213 /*tag_throttled*/))
? VERBOSE_WARN
: VERBOSE_NONE;
logr.printWithLogLevel(level, "ERROR", "get_keyvalue_array() after readBlobGranules(): {}", err.what());
return tx.onError(err).eraseType();
} } },
false } }
};
} // namespace mako

View File

@ -1,11 +1,18 @@
#ifndef MAKO_OPERATIONS_HPP
#define MAKO_OPERATIONS_HPP
#include <vector>
#include <fdb.hpp>
#include <array>
#include <cassert>
#include <string_view>
#include <tuple>
#include <utility>
#include <vector>
namespace mako {
struct Arguments;
/* transaction specification */
enum OpKind {
OP_GETREADVERSION,
@ -46,6 +53,49 @@ inline bool isAbstractOp(int op) noexcept {
return op == OP_COMMIT || op == OP_TRANSACTION; // || op == OP_TASK;
}
using StepFunction = fdb::Future (*)(fdb::Transaction tx,
Arguments const&,
fdb::ByteString& /*key1*/,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
class Operation {
using Step = std::pair<StepKind, StepFunction>;
std::string_view name_;
std::vector<Step> steps_;
bool needs_commit_;
public:
Operation(std::string_view name, std::vector<Step>&& steps, bool needs_commit)
: name_(name), steps_(std::move(steps)), needs_commit_(needs_commit) {}
std::string_view name() const noexcept { return name_; }
StepKind stepKind(int step) const noexcept {
assert(step < steps());
return steps_[step].first;
}
StepFunction stepFunction(int step) const noexcept { return steps_[step].second; }
// how many steps in this op?
int steps() const noexcept { return static_cast<int>(steps_.size()); }
// does the op needs to commit some time after its final step?
bool needsCommit() const noexcept { return needs_commit_; }
};
char const* getOpName(int ops_code);
extern const std::array<Operation, MAX_OP> opTable;
using OpIterator = std::tuple<int /*op*/, int /*count*/, int /*step*/>;
constexpr const OpIterator OpEnd = OpIterator(MAX_OP, -1, -1);
OpIterator getOpBegin(Arguments const& args) noexcept;
OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept;
} // namespace mako
#endif /* MAKO_OPERATIONS_HPP */