Merge pull request #6489 from sfc-gh-jshim/mako-cpp-async

Refactor Mako for C++ and add asynchronous execution support
This commit is contained in:
Junhyun Shim 2022-04-27 10:49:21 +02:00 committed by GitHub
commit 1ee7702779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 4785 additions and 3536 deletions

View File

@ -80,10 +80,23 @@ endif()
# The tests don't build on windows
if(NOT WIN32)
set(MAKO_SRCS
test/mako/mako.c
test/mako/mako.h
test/mako/utils.c
test/mako/utils.h)
test/mako/async.hpp
test/mako/async.cpp
test/mako/blob_granules.hpp
test/mako/blob_granules.cpp
test/mako/future.hpp
test/mako/limit.hpp
test/mako/logger.hpp
test/mako/mako.cpp
test/mako/mako.hpp
test/mako/operations.hpp
test/mako/operations.cpp
test/mako/process.hpp
test/mako/shm.hpp
test/mako/stats.hpp
test/mako/time.hpp
test/mako/utils.cpp
test/mako/utils.hpp)
add_subdirectory(test/unit/third_party)
find_package(Threads REQUIRED)
set(UNIT_TEST_SRCS
@ -98,6 +111,11 @@ if(NOT WIN32)
test/unit/fdb_api.cpp
test/unit/fdb_api.hpp)
add_library(fdb_cpp INTERFACE)
target_sources(fdb_cpp INTERFACE test/fdb_api.hpp)
target_include_directories(fdb_cpp INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/test)
target_link_libraries(fdb_cpp INTERFACE fmt::fmt)
set(API_TESTER_SRCS
test/apitester/fdb_c_api_tester.cpp
test/apitester/TesterApiWorkload.cpp
@ -179,7 +197,11 @@ endif()
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
target_link_libraries(mako PRIVATE fdb_c fdbclient)
if (USE_SANITIZER)
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_asan)
else()
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_target)
endif()
if(NOT OPEN_FOR_IDE)
# Make sure that fdb_c.h is compatible with c90

561
bindings/c/test/fdb_api.hpp Normal file
View File

@ -0,0 +1,561 @@
/*
* fdb_api.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDB_API_HPP
#define FDB_API_HPP
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 720
#endif
#include <cassert>
#include <cstdint>
#include <memory>
#include <stdexcept>
#include <string>
#include <string_view>
#include <fmt/format.h>
// introduce the option enums
#include <fdb_c_options.g.h>
namespace fdb {
// hide C API to discourage mixing C/C++ API
namespace native {
#include <foundationdb/fdb_c.h>
}
using ByteString = std::basic_string<uint8_t>;
using BytesRef = std::basic_string_view<uint8_t>;
using CharsRef = std::string_view;
using KeyRef = BytesRef;
using ValueRef = BytesRef;
inline uint8_t const* toBytePtr(char const* ptr) noexcept {
return reinterpret_cast<uint8_t const*>(ptr);
}
// get bytestring view from charstring: e.g. std::basic_string{_view}<char>
template <template <class...> class StringLike, class Char>
BytesRef toBytesRef(const StringLike<Char>& s) noexcept {
static_assert(sizeof(Char) == 1);
return BytesRef(reinterpret_cast<uint8_t const*>(s.data()), s.size());
}
// get charstring view from bytestring: e.g. std::basic_string{_view}<uint8_t>
template <template <class...> class StringLike, class Char>
CharsRef toCharsRef(const StringLike<Char>& s) noexcept {
static_assert(sizeof(Char) == 1);
return CharsRef(reinterpret_cast<char const*>(s.data()), s.size());
}
[[maybe_unused]] constexpr const bool OverflowCheck = false;
inline int intSize(BytesRef b) {
if constexpr (OverflowCheck) {
if (b.size() > static_cast<size_t>(std::numeric_limits<int>::max()))
throw std::overflow_error("byte strlen goes beyond int bounds");
}
return static_cast<int>(b.size());
}
class Error {
public:
using CodeType = native::fdb_error_t;
Error() noexcept : err(0) {}
explicit Error(CodeType err) noexcept : err(err) {}
char const* what() noexcept { return native::fdb_get_error(err); }
explicit operator bool() const noexcept { return err != 0; }
bool is(CodeType other) const noexcept { return err == other; }
CodeType code() const noexcept { return err; }
bool retryable() const noexcept { return native::fdb_error_predicate(FDB_ERROR_PREDICATE_RETRYABLE, err) != 0; }
private:
CodeType err;
};
/* Traits of value types held by ready futures.
Holds type and value extraction function. */
namespace future_var {
struct None {
struct Type {};
static Error extract(native::FDBFuture*, Type&) noexcept { return Error(0); }
};
struct Int64 {
using Type = int64_t;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
return Error(native::fdb_future_get_int64(f, &out));
}
};
struct Key {
using Type = std::pair<uint8_t const*, int>;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
auto& [out_key, out_key_length] = out;
return Error(native::fdb_future_get_key(f, &out_key, &out_key_length));
}
};
struct Value {
using Type = std::tuple<bool, uint8_t const*, int>;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
auto& [out_present, out_value, out_value_length] = out;
auto out_present_native = native::fdb_bool_t{};
auto err = native::fdb_future_get_value(f, &out_present_native, &out_value, &out_value_length);
out_present = (out_present_native != 0);
return Error(err);
}
};
struct StringArray {
using Type = std::pair<const char**, int>;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
auto& [out_strings, out_count] = out;
return Error(native::fdb_future_get_string_array(f, &out_strings, &out_count));
}
};
struct KeyValueArray {
using Type = std::tuple<native::FDBKeyValue const*, int, bool>;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
auto& [out_kv, out_count, out_more] = out;
auto out_more_native = native::fdb_bool_t{};
auto err = native::fdb_future_get_keyvalue_array(f, &out_kv, &out_count, &out_more_native);
out_more = (out_more_native != 0);
return Error(err);
}
};
} // namespace future_var
[[noreturn]] inline void throwError(std::string_view preamble, Error err) {
auto msg = std::string(preamble);
msg.append(err.what());
throw std::runtime_error(msg);
}
inline int maxApiVersion() {
return native::fdb_get_max_api_version();
}
inline Error selectApiVersionNothrow(int version) {
return Error(native::fdb_select_api_version(version));
}
inline void selectApiVersion(int version) {
if (auto err = selectApiVersionNothrow(version)) {
throwError(fmt::format("ERROR: fdb_select_api_version({}): ", version), err);
}
}
namespace network {
inline Error setOptionNothrow(FDBNetworkOption option, BytesRef str) noexcept {
return Error(native::fdb_network_set_option(option, str.data(), intSize(str)));
}
inline Error setOptionNothrow(FDBNetworkOption option, int64_t value) noexcept {
return Error(native::fdb_network_set_option(
option, reinterpret_cast<const uint8_t*>(&value), static_cast<int>(sizeof(value))));
}
inline void setOption(FDBNetworkOption option, BytesRef str) {
if (auto err = setOptionNothrow(option, str)) {
throwError(fmt::format("ERROR: fdb_network_set_option({}): ",
static_cast<std::underlying_type_t<FDBNetworkOption>>(option)),
err);
}
}
inline void setOption(FDBNetworkOption option, int64_t value) {
if (auto err = setOptionNothrow(option, value)) {
throwError(fmt::format("ERROR: fdb_network_set_option({}, {}): ",
static_cast<std::underlying_type_t<FDBNetworkOption>>(option),
value),
err);
}
}
inline Error setupNothrow() noexcept {
return Error(native::fdb_setup_network());
}
inline void setup() {
if (auto err = setupNothrow())
throwError("ERROR: fdb_network_setup(): ", err);
}
inline Error run() {
return Error(native::fdb_run_network());
}
inline Error stop() {
return Error(native::fdb_stop_network());
}
} // namespace network
class Transaction;
class Database;
class Result {
friend class Transaction;
std::shared_ptr<native::FDBResult> r;
Result(native::FDBResult* result) {
if (result)
r = std::shared_ptr<native::FDBResult>(result, &native::fdb_result_destroy);
}
public:
using KeyValueArray = future_var::KeyValueArray::Type;
Error getKeyValueArrayNothrow(KeyValueArray& out) const noexcept {
auto out_more_native = native::fdb_bool_t{};
auto& [out_kv, out_count, out_more] = out;
auto err_raw = native::fdb_result_get_keyvalue_array(r.get(), &out_kv, &out_count, &out_more_native);
out_more = out_more_native != 0;
return Error(err_raw);
}
KeyValueArray getKeyValueArray() const {
auto ret = KeyValueArray{};
if (auto err = getKeyValueArrayNothrow(ret))
throwError("ERROR: result_get_keyvalue_array(): ", err);
return ret;
}
};
class Future {
protected:
friend class Transaction;
std::shared_ptr<native::FDBFuture> f;
Future(native::FDBFuture* future) {
if (future)
f = std::shared_ptr<native::FDBFuture>(future, &native::fdb_future_destroy);
}
// wrap any capturing lambda as callback passable to fdb_future_set_callback().
// destroy after invocation.
template <class Fn>
static void callback(native::FDBFuture*, void* param) {
auto fp = static_cast<Fn*>(param);
try {
(*fp)();
} catch (const std::exception& e) {
fmt::print(stderr, "ERROR: Exception thrown in user callback: {}", e.what());
}
delete fp;
}
// set as callback user-defined completion handler of signature void(Future)
template <class FutureType, class UserFunc>
void then(UserFunc&& fn) {
auto cb = [fut = FutureType(*this), fn = std::forward<UserFunc>(fn)]() { fn(fut); };
using cb_type = std::decay_t<decltype(cb)>;
auto fp = new cb_type(std::move(cb));
if (auto err = Error(native::fdb_future_set_callback(f.get(), &callback<cb_type>, fp))) {
throwError("ERROR: future_set_callback: ", err);
}
}
public:
Future() noexcept : Future(nullptr) {}
Future(const Future&) noexcept = default;
Future& operator=(const Future&) noexcept = default;
bool valid() const noexcept { return f != nullptr; }
explicit operator bool() const noexcept { return valid(); }
bool ready() const noexcept {
assert(valid());
return native::fdb_future_is_ready(f.get()) != 0;
}
Error blockUntilReady() const noexcept {
assert(valid());
return Error(native::fdb_future_block_until_ready(f.get()));
}
Error error() const noexcept {
assert(valid());
return Error(native::fdb_future_get_error(f.get()));
}
void cancel() noexcept { native::fdb_future_cancel(f.get()); }
template <class VarTraits>
typename VarTraits::Type get() const {
assert(valid());
assert(!error());
auto out = typename VarTraits::Type{};
if (auto err = VarTraits::extract(f.get(), out)) {
throwError("future_get: ", err);
}
return out;
}
template <class VarTraits>
Error getNothrow(typename VarTraits::Type& var) const noexcept {
assert(valid());
assert(!error());
auto out = typename VarTraits::Type{};
return VarTraits::extract(f.get(), out);
}
template <class UserFunc>
void then(UserFunc&& fn) {
then<Future>(std::forward<UserFunc>(fn));
}
};
template <typename VarTraits>
class TypedFuture : public Future {
friend class Future;
friend class Transaction;
using SelfType = TypedFuture<VarTraits>;
using Future::Future;
// hide type-unsafe inherited functions
using Future::get;
using Future::getNothrow;
using Future::then;
TypedFuture(const Future& f) noexcept : Future(f) {}
public:
using ContainedType = typename VarTraits::Type;
Future eraseType() const noexcept { return static_cast<Future const&>(*this); }
ContainedType get() const { return get<VarTraits>(); }
Error getNothrow(ContainedType& out) const noexcept { return getNothrow<VarTraits>(out); }
template <class UserFunc>
void then(UserFunc&& fn) {
Future::then<SelfType>(std::forward<UserFunc>(fn));
}
};
struct KeySelector {
const uint8_t* key;
int keyLength;
bool orEqual;
int offset;
};
namespace key_select {
inline KeySelector firstGreaterThan(KeyRef key, int offset = 0) {
return KeySelector{ FDB_KEYSEL_FIRST_GREATER_THAN(key.data(), intSize(key)) + offset };
}
inline KeySelector firstGreaterOrEqual(KeyRef key, int offset = 0) {
return KeySelector{ FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(key.data(), intSize(key)) + offset };
}
inline KeySelector lastLessThan(KeyRef key, int offset = 0) {
return KeySelector{ FDB_KEYSEL_LAST_LESS_THAN(key.data(), intSize(key)) + offset };
}
inline KeySelector lastLessOrEqual(KeyRef key, int offset = 0) {
return KeySelector{ FDB_KEYSEL_LAST_LESS_OR_EQUAL(key.data(), intSize(key)) + offset };
}
} // namespace key_select
class Transaction {
friend class Database;
std::shared_ptr<native::FDBTransaction> tr;
explicit Transaction(native::FDBTransaction* tr_raw) {
if (tr_raw)
tr = std::shared_ptr<native::FDBTransaction>(tr_raw, &native::fdb_transaction_destroy);
}
public:
Transaction() noexcept : Transaction(nullptr) {}
Transaction(const Transaction&) noexcept = default;
Transaction& operator=(const Transaction&) noexcept = default;
bool valid() const noexcept { return tr != nullptr; }
explicit operator bool() const noexcept { return valid(); }
Error setOptionNothrow(FDBTransactionOption option, int64_t value) noexcept {
return Error(native::fdb_transaction_set_option(
tr.get(), option, reinterpret_cast<const uint8_t*>(&value), static_cast<int>(sizeof(value))));
}
Error setOptionNothrow(FDBTransactionOption option, BytesRef str) noexcept {
return Error(native::fdb_transaction_set_option(tr.get(), option, str.data(), intSize(str)));
}
void setOption(FDBTransactionOption option, int64_t value) {
if (auto err = setOptionNothrow(option, value)) {
throwError(fmt::format("transaction_set_option({}, {}) returned error: ",
static_cast<std::underlying_type_t<FDBTransactionOption>>(option),
value),
err);
}
}
void setOption(FDBTransactionOption option, BytesRef str) {
if (auto err = setOptionNothrow(option, str)) {
throwError(fmt::format("transaction_set_option({}) returned error: ",
static_cast<std::underlying_type_t<FDBTransactionOption>>(option)),
err);
}
}
TypedFuture<future_var::Int64> getReadVersion() { return native::fdb_transaction_get_read_version(tr.get()); }
Error getCommittedVersionNothrow(int64_t& out) {
return Error(native::fdb_transaction_get_committed_version(tr.get(), &out));
}
int64_t getCommittedVersion() {
auto out = int64_t{};
if (auto err = getCommittedVersionNothrow(out)) {
throwError("get_committed_version: ", err);
}
return out;
}
TypedFuture<future_var::Key> getKey(KeySelector sel, bool snapshot) {
return native::fdb_transaction_get_key(tr.get(), sel.key, sel.keyLength, sel.orEqual, sel.offset, snapshot);
}
TypedFuture<future_var::Value> get(KeyRef key, bool snapshot) {
return native::fdb_transaction_get(tr.get(), key.data(), intSize(key), snapshot);
}
// Usage: tx.getRange(key_select::firstGreaterOrEqual(firstKey), key_select::lastLessThan(lastKey), ...)
// gets key-value pairs in key range [begin, end)
TypedFuture<future_var::KeyValueArray> getRange(KeySelector first,
KeySelector last,
int limit,
int target_bytes,
FDBStreamingMode mode,
int iteration,
bool snapshot,
bool reverse) {
return native::fdb_transaction_get_range(tr.get(),
first.key,
first.keyLength,
first.orEqual,
first.offset,
last.key,
last.keyLength,
last.orEqual,
last.offset,
limit,
target_bytes,
mode,
iteration,
snapshot,
reverse);
}
Result readBlobGranules(KeyRef begin,
KeyRef end,
int64_t begin_version,
int64_t read_version,
native::FDBReadBlobGranuleContext context) {
return Result(native::fdb_transaction_read_blob_granules(
tr.get(), begin.data(), intSize(begin), end.data(), intSize(end), begin_version, read_version, context));
}
TypedFuture<future_var::None> commit() { return native::fdb_transaction_commit(tr.get()); }
TypedFuture<future_var::None> onError(Error err) { return native::fdb_transaction_on_error(tr.get(), err.code()); }
void reset() { return native::fdb_transaction_reset(tr.get()); }
void set(KeyRef key, ValueRef value) {
native::fdb_transaction_set(tr.get(), key.data(), intSize(key), value.data(), intSize(value));
}
void clear(KeyRef key) { native::fdb_transaction_clear(tr.get(), key.data(), intSize(key)); }
void clearRange(KeyRef begin, KeyRef end) {
native::fdb_transaction_clear_range(tr.get(), begin.data(), intSize(begin), end.data(), intSize(end));
}
};
class Database {
std::shared_ptr<native::FDBDatabase> db;
public:
Database(const Database&) noexcept = default;
Database& operator=(const Database&) noexcept = default;
Database(const std::string& cluster_file_path) : db(nullptr) {
auto db_raw = static_cast<native::FDBDatabase*>(nullptr);
if (auto err = Error(native::fdb_create_database(cluster_file_path.c_str(), &db_raw)))
throwError(fmt::format("Failed to create database with '{}': ", cluster_file_path), err);
db = std::shared_ptr<native::FDBDatabase>(db_raw, &native::fdb_database_destroy);
}
Database() noexcept : db(nullptr) {}
Error setOptionNothrow(FDBDatabaseOption option, int64_t value) noexcept {
return Error(native::fdb_database_set_option(
db.get(), option, reinterpret_cast<const uint8_t*>(&value), static_cast<int>(sizeof(value))));
}
Error setOptionNothrow(FDBDatabaseOption option, BytesRef str) noexcept {
return Error(native::fdb_database_set_option(db.get(), option, str.data(), intSize(str)));
}
void setOption(FDBDatabaseOption option, int64_t value) {
if (auto err = setOptionNothrow(option, value)) {
throwError(fmt::format("database_set_option({}, {}) returned error: ",
static_cast<std::underlying_type_t<FDBDatabaseOption>>(option),
value),
err);
}
}
void setOption(FDBDatabaseOption option, BytesRef str) {
if (auto err = setOptionNothrow(option, str)) {
throwError(fmt::format("database_set_option({}) returned error: ",
static_cast<std::underlying_type_t<FDBDatabaseOption>>(option)),
err);
}
}
Transaction createTransaction() {
if (!db)
throw std::runtime_error("create_transaction from null database");
auto tx_native = static_cast<native::FDBTransaction*>(nullptr);
auto err = Error(native::fdb_database_create_transaction(db.get(), &tx_native));
if (err)
throwError("Failed to create transaction: ", err);
return Transaction(tx_native);
}
};
} // namespace fdb
#endif /*FDB_API_HPP*/

View File

@ -0,0 +1,288 @@
/*
* async.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/asio.hpp>
#include "async.hpp"
#include "future.hpp"
#include "logger.hpp"
#include "operations.hpp"
#include "stats.hpp"
#include "time.hpp"
#include "utils.hpp"
extern thread_local mako::Logger logr;
using namespace fdb;
namespace mako {
void ResumableStateForPopulate::postNextTick() {
boost::asio::post(io_context, [this, state = shared_from_this()]() { runOneTick(); });
}
void ResumableStateForPopulate::runOneTick() {
const auto num_commit_every = args.txnspec.ops[OP_INSERT][OP_COUNT];
for (auto i = key_checkpoint; i <= key_end; i++) {
genKey(keystr.data(), KEY_PREFIX, args, i);
randomString(valstr.data(), args.value_length);
tx.set(keystr, valstr);
stats.incrOpCount(OP_INSERT);
if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) {
watch_commit.start();
tx.commit().then([this, state = shared_from_this(), i](Future f) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"commit for populate returned '{}'",
err.what());
tx.onError(err).then([this, state = shared_from_this()](Future f) {
const auto f_rc = handleForOnError(tx, f, "ON_ERROR_FOR_POPULATE");
if (f_rc == FutureRC::ABORT) {
signalEnd();
return;
} else {
postNextTick();
}
});
} else {
// successfully committed
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
tx.reset();
watch_tx.startFromStop();
key_checkpoint = i + 1;
if (i != key_end) {
postNextTick();
} else {
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
key_end - key_begin + 1,
key_begin,
key_end,
toDoubleSeconds(watch_total.stop().diff()));
signalEnd();
return;
}
}
});
break;
}
}
}
void ResumableStateForRunWorkload::postNextTick() {
boost::asio::post(io_context, [this, state = shared_from_this()]() { runOneTick(); });
}
void ResumableStateForRunWorkload::runOneTick() {
assert(iter != OpEnd);
if (iter.step == 0 /* first step */)
prepareKeys(iter.op, key1, key2, args);
watch_step.start();
if (iter.step == 0)
watch_op = Stopwatch(watch_step.getStart());
auto f = Future{};
// to minimize context switch overhead, repeat immediately completed ops
// in a loop, not an async continuation.
repeat_immediate_steps:
f = opTable[iter.op].stepFunction(iter.step)(tx, args, key1, key2, val);
if (!f) {
// immediately completed client-side ops: e.g. set, setrange, clear, clearrange, ...
updateStepStats();
iter = getOpNext(args, iter);
if (iter == OpEnd)
onTransactionSuccess();
else
goto repeat_immediate_steps;
} else {
// step is blocking. register a continuation and return
f.then([this, state = shared_from_this()](Future f) {
if (auto postStepFn = opTable[iter.op].postStepFunction(iter.step))
postStepFn(f, tx, args, key1, key2, val);
if (iter.stepKind() != StepKind::ON_ERROR) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"{}:{} returned '{}'",
iter.opName(),
iter.step,
err.what());
tx.onError(err).then([this, state = shared_from_this()](Future f) {
const auto rc = handleForOnError(tx, f, fmt::format("{}:{}", iter.opName(), iter.step));
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(iter.op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
signalEnd();
return;
}
// restart this iteration from beginning
iter = getOpBegin(args);
needs_commit = false;
postNextTick();
});
} else {
// async step succeeded
updateStepStats();
iter = getOpNext(args, iter);
if (iter == OpEnd) {
onTransactionSuccess();
} else {
postNextTick();
}
}
} else {
// blob granules op error
auto rc = handleForOnError(tx, f, "BG_ON_ERROR");
if (rc == FutureRC::RETRY) {
stats.incrErrorCount(iter.op);
} else if (rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
} else if (rc == FutureRC::ABORT) {
tx.reset();
stopcount.fetch_add(1);
return;
}
iter = getOpBegin(args);
needs_commit = false;
// restart this iteration from beginning
postNextTick();
}
});
}
}
void ResumableStateForRunWorkload::updateStepStats() {
logr.debug("Step {}:{} succeeded", iter.opName(), iter.step);
// step successful
watch_step.stop();
const auto do_sample = stats.getOpCount(OP_TRANSACTION) % args.sampling == 0;
if (iter.stepKind() == StepKind::COMMIT) {
// reset transaction boundary
const auto step_latency = watch_step.diff();
if (do_sample) {
stats.addLatency(OP_COMMIT, step_latency);
sample_bins[OP_COMMIT].put(step_latency);
}
tx.reset();
stats.incrOpCount(OP_COMMIT);
needs_commit = false;
}
// op completed successfully
if (iter.step + 1 == opTable[iter.op].steps()) {
if (opTable[iter.op].needsCommit())
needs_commit = true;
watch_op.setStop(watch_step.getStop());
if (do_sample) {
const auto op_latency = watch_op.diff();
stats.addLatency(iter.op, op_latency);
sample_bins[iter.op].put(op_latency);
}
stats.incrOpCount(iter.op);
}
}
void ResumableStateForRunWorkload::onTransactionSuccess() {
if (needs_commit || args.commit_get) {
// task completed, need to commit before finish
watch_commit.start();
tx.commit().then([this, state = shared_from_this()](Future f) {
if (auto err = f.error()) {
// commit had errors
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,
"ERROR",
"Post-iteration commit returned error: {}",
err.what());
tx.onError(err).then([this, state = shared_from_this()](Future f) {
const auto rc = handleForOnError(tx, f, "ON_ERROR");
if (rc == FutureRC::CONFLICT)
stats.incrConflictCount();
else
stats.incrErrorCount(OP_COMMIT);
if (rc == FutureRC::ABORT) {
signalEnd();
return;
}
if (ended()) {
signalEnd();
} else {
iter = getOpBegin(args);
needs_commit = false;
postNextTick();
}
});
} else {
// commit successful
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, commit_latency);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
tx.reset();
watch_tx.startFromStop();
if (ended()) {
signalEnd();
} else {
// start next iteration
iter = getOpBegin(args);
postNextTick();
}
}
});
} else {
// transaction completed but no need to commit
watch_tx.stop();
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_TRANSACTION);
watch_tx.startFromStop();
tx.reset();
if (ended()) {
signalEnd();
} else {
iter = getOpBegin(args);
// start next iteration
postNextTick();
}
}
}
} // namespace mako

View File

@ -0,0 +1,127 @@
/*
* async.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_ASYNC_HPP
#define MAKO_ASYNC_HPP
#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include "logger.hpp"
#include "mako.hpp"
#include "shm.hpp"
#include "stats.hpp"
#include "time.hpp"
namespace mako {
// as we don't have coroutines yet, we need to store in heap the complete state of execution,
// such that we can resume exactly where we were from last database op.
struct ResumableStateForPopulate : std::enable_shared_from_this<ResumableStateForPopulate> {
Logger logr;
fdb::Database db;
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
LatencySampleBinArray sample_bins;
int key_begin;
int key_end;
int key_checkpoint;
fdb::ByteString keystr;
fdb::ByteString valstr;
Stopwatch watch_tx;
Stopwatch watch_commit;
Stopwatch watch_total;
ResumableStateForPopulate(Logger logr,
fdb::Database db,
fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
std::atomic<int>& stopcount,
int key_begin,
int key_end)
: logr(logr), db(db), tx(tx), io_context(io_context), args(args), stats(stats), stopcount(stopcount),
key_begin(key_begin), key_end(key_end), key_checkpoint(key_begin) {
keystr.resize(args.key_length);
valstr.resize(args.value_length);
}
void runOneTick();
void postNextTick();
void signalEnd() { stopcount.fetch_add(1); }
};
using PopulateStateHandle = std::shared_ptr<ResumableStateForPopulate>;
struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStateForRunWorkload> {
Logger logr;
fdb::Database db;
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
std::atomic<int> const& signal;
int max_iters;
OpIterator iter;
LatencySampleBinArray sample_bins;
fdb::ByteString key1;
fdb::ByteString key2;
fdb::ByteString val;
Stopwatch watch_step;
Stopwatch watch_op;
Stopwatch watch_commit;
Stopwatch watch_tx;
bool needs_commit;
ResumableStateForRunWorkload(Logger logr,
fdb::Database db,
fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
std::atomic<int>& stopcount,
std::atomic<int> const& signal,
int max_iters,
OpIterator iter)
: logr(logr), db(db), tx(tx), io_context(io_context), args(args), stats(stats), stopcount(stopcount),
signal(signal), max_iters(max_iters), iter(iter), needs_commit(false) {
key1.resize(args.key_length);
key2.resize(args.key_length);
val.resize(args.value_length);
}
void signalEnd() noexcept { stopcount.fetch_add(1); }
bool ended() noexcept {
return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TRANSACTION)) || signal.load() == SIGNAL_RED;
}
void postNextTick();
void runOneTick();
void updateStepStats();
void onTransactionSuccess();
};
using RunWorkloadStateHandle = std::shared_ptr<ResumableStateForRunWorkload>;
} // namespace mako
#endif /*MAKO_ASYNC_HPP*/

View File

@ -0,0 +1,116 @@
/*
* blob_granules.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "blob_granules.hpp"
#include "limit.hpp"
#include "logger.hpp"
#include <cstdio>
#include <fdb_api.hpp>
extern thread_local mako::Logger logr;
namespace mako::blob_granules::local_file {
int64_t startLoad(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* userContext) {
FILE* fp;
char full_fname[PATH_MAX]{
0,
};
int loadId;
uint8_t* data;
size_t readSize;
auto context = static_cast<UserContext*>(userContext);
loadId = context->nextId;
if (context->dataById[loadId] != 0) {
logr.error("too many granule file loads at once: {}", MAX_BG_IDS);
return -1;
}
context->nextId = (context->nextId + 1) % MAX_BG_IDS;
int ret = snprintf(full_fname, PATH_MAX, "%s%s", context->bgFilePath, filename);
if (ret < 0 || ret >= PATH_MAX) {
logr.error("BG filename too long: {}{}", context->bgFilePath, filename);
return -1;
}
fp = fopen(full_fname, "r");
if (!fp) {
logr.error("BG could not open file: {}", full_fname);
return -1;
}
// don't seek if offset == 0
if (offset && fseek(fp, offset, SEEK_SET)) {
// if fseek was non-zero, it failed
logr.error("BG could not seek to %{} in file {}", offset, full_fname);
fclose(fp);
return -1;
}
data = new uint8_t[length];
readSize = fread(data, sizeof(uint8_t), length, fp);
fclose(fp);
if (readSize != length) {
logr.error("BG could not read {} bytes from file: {}", length, full_fname);
return -1;
}
context->dataById[loadId] = data;
return loadId;
}
uint8_t* getLoad(int64_t loadId, void* userContext) {
auto context = static_cast<UserContext*>(userContext);
if (context->dataById[loadId] == 0) {
logr.error("BG loadId invalid for get_load: {}", loadId);
return 0;
}
return context->dataById[loadId];
}
void freeLoad(int64_t loadId, void* userContext) {
auto context = static_cast<UserContext*>(userContext);
if (context->dataById[loadId] == 0) {
logr.error("BG loadId invalid for free_load: {}", loadId);
}
delete[] context->dataById[loadId];
context->dataById[loadId] = 0;
}
fdb::native::FDBReadBlobGranuleContext createApiContext(UserContext& ctx, bool materialize_files) {
auto ret = fdb::native::FDBReadBlobGranuleContext{};
ret.userContext = &ctx;
ret.start_load_f = &startLoad;
ret.get_load_f = &getLoad;
ret.free_load_f = &freeLoad;
ret.debugNoMaterialize = !materialize_files;
ret.granuleParallelism = 2; // TODO make knob or setting for changing this?
return ret;
}
} // namespace mako::blob_granules::local_file

View File

@ -0,0 +1,50 @@
/*
* blob_granules.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_BLOB_GRANULES_HPP
#define MAKO_BLOB_GRANULES_HPP
#include <cstdint>
#include <memory>
#include <fdb_api.hpp>
namespace mako::blob_granules::local_file {
constexpr const int MAX_BG_IDS = 1000;
// TODO: could always abstract this into something more generically usable by something other than mako.
// But outside of testing there are likely few use cases for local granules
struct UserContext {
char const* bgFilePath;
int nextId;
std::unique_ptr<uint8_t*[]> dataByIdMem;
uint8_t** dataById;
UserContext(char const* filePath)
: bgFilePath(filePath), nextId(0), dataByIdMem(new uint8_t*[MAX_BG_IDS]()), dataById(dataByIdMem.get()) {}
void clear() { dataByIdMem.reset(); }
};
fdb::native::FDBReadBlobGranuleContext createApiContext(UserContext& ctx, bool materialize_files);
} // namespace mako::blob_granules::local_file
#endif /*MAKO_BLOB_GRANULES_HPP*/

View File

@ -0,0 +1,89 @@
/*
* future.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_FUTURE_HPP
#define MAKO_FUTURE_HPP
#include <fdb_api.hpp>
#include <cassert>
#include <string_view>
#include "logger.hpp"
#include "macro.hpp"
extern thread_local mako::Logger logr;
namespace mako {
enum class FutureRC { OK, RETRY, CONFLICT, ABORT };
template <class FutureType>
force_inline FutureRC handleForOnError(fdb::Transaction& tx, FutureType& f, std::string_view step) {
if (auto err = f.error()) {
if (err.is(1020 /*not_committed*/)) {
return FutureRC::CONFLICT;
} else if (err.retryable()) {
logr.warn("Retryable error '{}' found at on_error(), step: {}", err.what(), step);
return FutureRC::RETRY;
} else {
logr.error("Unretryable error '{}' found at on_error(), step: {}", err.what(), step);
tx.reset();
return FutureRC::ABORT;
}
} else {
return FutureRC::RETRY;
}
}
template <class FutureType>
force_inline FutureRC waitAndHandleForOnError(fdb::Transaction& tx, FutureType& f, std::string_view step) {
assert(f);
if (auto err = f.blockUntilReady()) {
logr.error("'{}' found while waiting for on_error() future, step: {}", err.what(), step);
return FutureRC::ABORT;
}
return handleForOnError(tx, f, step);
}
// wait on any non-immediate tx-related step to complete. Follow up with on_error().
template <class FutureType>
force_inline FutureRC waitAndHandleError(fdb::Transaction& tx, FutureType& f, std::string_view step) {
assert(f);
auto err = fdb::Error{};
if ((err = f.blockUntilReady())) {
const auto retry = err.retryable();
logr.error("{} error '{}' found during step: {}", (retry ? "Retryable" : "Unretryable"), err.what(), step);
return retry ? FutureRC::RETRY : FutureRC::ABORT;
}
err = f.error();
if (!err)
return FutureRC::OK;
if (err.retryable()) {
logr.warn("step {} returned '{}'", step, err.what());
} else {
logr.error("step {} returned '{}'", step, err.what());
}
// implicit backoff
auto follow_up = tx.onError(err);
return waitAndHandleForOnError(tx, f, step);
}
} // namespace mako
#endif /*MAKO_FUTURE_HPP*/

View File

@ -0,0 +1,32 @@
/*
* limit.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIMIT_HPP
#define LIMIT_HPP
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__APPLE__)
#include <sys/syslimits.h>
#else
#include <limits.h>
#endif
#endif

View File

@ -0,0 +1,117 @@
/*
* logger.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_LOGGER_HPP
#define MAKO_LOGGER_HPP
#include <fmt/format.h>
#include <cassert>
#include <cstdio>
#include <iterator>
#include <string_view>
#include "process.hpp"
namespace mako {
constexpr const int VERBOSE_NONE = 0; // will still print errors
constexpr const int VERBOSE_DEFAULT = 1; // will print info and work stats
constexpr const int VERBOSE_WARN = 2; // will print expected errors
constexpr const int VERBOSE_DEBUG = 3; // will print everything
template <ProcKind P>
using ProcKindConstant = std::integral_constant<ProcKind, P>;
using MainProcess = ProcKindConstant<ProcKind::MAIN>;
using StatsProcess = ProcKindConstant<ProcKind::STATS>;
using WorkerProcess = ProcKindConstant<ProcKind::WORKER>;
class Logger {
ProcKind proc;
int verbosity{ VERBOSE_DEFAULT };
int process_id{ -1 };
int thread_id{ -1 };
void putHeader(fmt::memory_buffer& buf, std::string_view category) {
if (proc == ProcKind::MAIN) {
fmt::format_to(std::back_inserter(buf), "[MAIN] {}: ", category);
} else if (proc == ProcKind::STATS) {
fmt::format_to(std::back_inserter(buf), "[STATS] {}: ", category);
} else {
if (thread_id == -1) {
fmt::format_to(std::back_inserter(buf), "[WORKER{:3d}] {}: ", process_id + 1, category);
} else {
fmt::format_to(
std::back_inserter(buf), "[WORKER{:3d}:{:3d}] {}: ", process_id + 1, thread_id + 1, category);
}
}
}
public:
Logger(MainProcess, int verbosity) noexcept : proc(MainProcess::value), verbosity(verbosity) {}
Logger(StatsProcess, int verbosity) noexcept : proc(StatsProcess::value), verbosity(verbosity) {}
Logger(WorkerProcess, int verbosity, int process_id, int thread_id = -1) noexcept
: proc(WorkerProcess::value), verbosity(verbosity), process_id(process_id), thread_id(thread_id) {}
Logger(const Logger&) noexcept = default;
Logger& operator=(const Logger&) noexcept = default;
void setVerbosity(int value) noexcept {
assert(value >= VERBOSE_NONE && value <= VERBOSE_DEBUG);
verbosity = value;
}
template <typename... Args>
void printWithLogLevel(int log_level, std::string_view header, Args&&... args) {
assert(log_level >= VERBOSE_NONE && log_level <= VERBOSE_DEBUG);
if (log_level <= verbosity) {
const auto fp = log_level == VERBOSE_NONE ? stderr : stdout;
// 500B inline buffer
auto buf = fmt::memory_buffer{};
putHeader(buf, header);
fmt::format_to(std::back_inserter(buf), std::forward<Args>(args)...);
fmt::print(fp, "{}\n", std::string_view(buf.data(), buf.size()));
}
}
template <typename... Args>
void error(Args&&... args) {
printWithLogLevel(VERBOSE_NONE, "ERROR", std::forward<Args>(args)...);
}
template <typename... Args>
void info(Args&&... args) {
printWithLogLevel(VERBOSE_DEFAULT, "INFO", std::forward<Args>(args)...);
}
template <typename... Args>
void warn(Args&&... args) {
printWithLogLevel(VERBOSE_WARN, "WARNING", std::forward<Args>(args)...);
}
template <typename... Args>
void debug(Args&&... args) {
printWithLogLevel(VERBOSE_DEBUG, "DEBUG", std::forward<Args>(args)...);
}
};
} // namespace mako
#endif /*MAKO_LOGGER_HPP*/

View File

@ -0,0 +1,32 @@
/*
* macro.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_MACRO_HPP
#define MAKO_MACRO_HPP
#if defined(__GNUG__)
#define force_inline inline __attribute__((__always_inline__))
#elif defined(_MSC_VER)
#define force_inline __forceinline
#else
#error Missing force inline
#endif
#endif /*MAKO_MACRO_HPP*/

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,209 +0,0 @@
#ifndef MAKO_H
#define MAKO_H
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 720
#endif
#include <foundationdb/fdb_c.h>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__APPLE__)
#include <sys/syslimits.h>
#else
#include <limits.h>
#endif
#define VERBOSE_NONE 0
#define VERBOSE_DEFAULT 1
#define VERBOSE_ANNOYING 2
#define VERBOSE_DEBUG 3
#define MODE_INVALID -1
#define MODE_CLEAN 0
#define MODE_BUILD 1
#define MODE_RUN 2
#define FDB_SUCCESS 0
#define FDB_ERROR_RETRY -1
#define FDB_ERROR_ABORT -2
#define FDB_ERROR_CONFLICT -3
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
/* transaction specification */
enum Operations {
OP_GETREADVERSION,
OP_GET,
OP_GETRANGE,
OP_SGET,
OP_SGETRANGE,
OP_UPDATE,
OP_INSERT,
OP_INSERTRANGE,
OP_OVERWRITE,
OP_CLEAR,
OP_SETCLEAR,
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
OP_READ_BG,
MAX_OP /* must be the last item */
};
#define OP_COUNT 0
#define OP_RANGE 1
#define OP_REVERSE 2
/* for long arguments */
enum Arguments {
ARG_KEYLEN,
ARG_VALLEN,
ARG_TPS,
ARG_COMMITGET,
ARG_SAMPLING,
ARG_VERSION,
ARG_KNOBS,
ARG_FLATBUFFERS,
ARG_LOGGROUP,
ARG_PREFIXPADDING,
ARG_TRACE,
ARG_TRACEPATH,
ARG_TRACEFORMAT,
ARG_TPSMAX,
ARG_TPSMIN,
ARG_TPSINTERVAL,
ARG_TPSCHANGE,
ARG_TXNTRACE,
ARG_TXNTAGGING,
ARG_TXNTAGGINGPREFIX,
ARG_STREAMING_MODE,
ARG_DISABLE_RYW,
ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT,
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
};
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
#define KEYPREFIX "mako"
#define KEYPREFIXLEN 4
#define TEMP_DATA_STORE "/tmp/makoTemp"
/* we set mako_txnspec_t and mako_args_t only once in the master process,
* and won't be touched by child processes.
*/
typedef struct {
/* for each operation, it stores "count", "range" and "reverse" */
int ops[MAX_OP][3];
} mako_txnspec_t;
#define LOGGROUP_MAX 256
#define KNOB_MAX 256
#define TAGPREFIXLENGTH_MAX 8
#define NUM_CLUSTERS_MAX 3
#define NUM_DATABASES_MAX 10
#define MAX_BG_IDS 1000
/* benchmark parameters */
typedef struct {
int api_version;
int json;
int num_processes;
int num_threads;
int mode;
int rows; /* is 2 billion enough? */
int seconds;
int iteration;
int tpsmax;
int tpsmin;
int tpsinterval;
int tpschange;
int sampling;
int key_length;
int value_length;
int zipf;
int commit_get;
int verbose;
mako_txnspec_t txnspec;
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
int num_fdb_clusters;
int num_databases;
char log_group[LOGGROUP_MAX];
int prefixpadding;
int trace;
char tracepath[PATH_MAX];
int traceformat; /* 0 - XML, 1 - JSON */
char knobs[KNOB_MAX];
uint8_t flatbuffers;
int txntrace;
int txntagging;
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
FDBStreamingMode streaming_mode;
int client_threads_per_version;
int disable_ryw;
char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
} mako_args_t;
/* shared memory */
#define SIGNAL_RED 0
#define SIGNAL_GREEN 1
#define SIGNAL_OFF 2
typedef struct {
int signal;
int readycount;
double throttle_factor;
int stopcount;
} mako_shmhdr_t;
/* memory block allocated to each operation when collecting detailed latency */
typedef struct {
uint64_t data[LAT_BLOCK_SIZE];
void* next_block;
} lat_block_t;
typedef struct {
uint64_t xacts;
uint64_t conflicts;
uint64_t ops[MAX_OP];
uint64_t errors[MAX_OP];
uint64_t latency_samples[MAX_OP];
uint64_t latency_us_total[MAX_OP];
uint64_t latency_us_min[MAX_OP];
uint64_t latency_us_max[MAX_OP];
} mako_stats_t;
/* per-process information */
typedef struct {
int worker_id;
pid_t parent_id;
mako_args_t* args;
mako_shmhdr_t* shm;
FDBDatabase* databases[NUM_DATABASES_MAX];
} process_info_t;
/* args for threads */
typedef struct {
int thread_id;
int database_index; // index of the database to do work to
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
specific operation */
lat_block_t* block[MAX_OP];
process_info_t* process;
} thread_args_t;
/* process type */
typedef enum { proc_master = 0, proc_worker, proc_stats } proc_type_t;
#endif /* MAKO_H */

View File

@ -0,0 +1,168 @@
/*
* mako.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_HPP
#define MAKO_HPP
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 720
#endif
#include <array>
#include <atomic>
#include <cassert>
#include <chrono>
#include <list>
#include <vector>
#include <string_view>
#include <fdb_api.hpp>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>
#include "limit.hpp"
namespace mako {
constexpr const int MODE_INVALID = -1;
constexpr const int MODE_CLEAN = 0;
constexpr const int MODE_BUILD = 1;
constexpr const int MODE_RUN = 2;
/* for long arguments */
enum ArgKind {
ARG_KEYLEN,
ARG_VALLEN,
ARG_TPS,
ARG_ASYNC,
ARG_COMMITGET,
ARG_SAMPLING,
ARG_VERSION,
ARG_KNOBS,
ARG_FLATBUFFERS,
ARG_LOGGROUP,
ARG_PREFIXPADDING,
ARG_TRACE,
ARG_TRACEPATH,
ARG_TRACEFORMAT,
ARG_TPSMAX,
ARG_TPSMIN,
ARG_TPSINTERVAL,
ARG_TPSCHANGE,
ARG_TXNTRACE,
ARG_TXNTAGGING,
ARG_TXNTAGGINGPREFIX,
ARG_STREAMING_MODE,
ARG_DISABLE_RYW,
ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT,
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
};
constexpr const int OP_COUNT = 0;
constexpr const int OP_RANGE = 1;
constexpr const int OP_REVERSE = 2;
/* transaction specification */
enum OpKind {
OP_GETREADVERSION,
OP_GET,
OP_GETRANGE,
OP_SGET,
OP_SGETRANGE,
OP_UPDATE,
OP_INSERT,
OP_INSERTRANGE,
OP_OVERWRITE,
OP_CLEAR,
OP_SETCLEAR,
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation - time it takes to run one iteration of ops sequence */
OP_READ_BG,
MAX_OP /* must be the last item */
};
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
/* we set WorkloadSpec and Arguments only once in the master process,
* and won't be touched by child processes.
*/
struct WorkloadSpec {
/* for each operation, it stores "count", "range" and "reverse" */
int ops[MAX_OP][3];
};
constexpr const int LOGGROUP_MAX = 256;
constexpr const int KNOB_MAX = 256;
constexpr const int TAGPREFIXLENGTH_MAX = 8;
constexpr const int NUM_CLUSTERS_MAX = 3;
constexpr const int NUM_DATABASES_MAX = 10;
constexpr const std::string_view KEY_PREFIX{ "mako" };
constexpr const std::string_view TEMP_DATA_STORE{ "/tmp/makoTemp" };
/* benchmark parameters */
struct Arguments {
int api_version;
int json;
int num_processes;
int num_threads;
int async_xacts;
int mode;
int rows; /* is 2 billion enough? */
int row_digits;
int seconds;
int iteration;
int tpsmax;
int tpsmin;
int tpsinterval;
int tpschange;
int sampling;
int key_length;
int value_length;
int zipf;
int commit_get;
int verbose;
WorkloadSpec txnspec;
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
int num_fdb_clusters;
int num_databases;
char log_group[LOGGROUP_MAX];
int prefixpadding;
int trace;
char tracepath[PATH_MAX];
int traceformat; /* 0 - XML, 1 - JSON */
char knobs[KNOB_MAX];
uint8_t flatbuffers;
int txntrace;
int txntagging;
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
FDBStreamingMode streaming_mode;
int64_t client_threads_per_version;
int disable_ryw;
char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
};
} // namespace mako
#endif /* MAKO_HPP */

View File

@ -53,6 +53,13 @@ Arguments
- | ``-t | --threads <threads>``
| Number of threads per worker process (Default: 1)
| With ``--async_xacts <xacts>`` == 0 (Default), each of the ``<threads>`` operates on a transaction object with blocking API calls
| Otherwise, all of the ``<threads>`` run an asynchronous job scheduler, serving ``<xacts>`` transactions
- | ``--async_xacts <xacts>``
| Number of transactions per worker process to run asynchronously (Default: 0)
| ``<xacts>`` > 0 switches the execution mode to non-blocking (See ``-t | --threads``), with the exception of blob granules API
| Note: throttling options, e.g. ``--tpsmax``, ``--tpsmin``, ``--tpschange``, ``--tpsinterval``, are ignored in asynchronous mode
- | ``-r | --rows <rows>``
| Number of rows initially populated (Default: 100000)

View File

@ -0,0 +1,275 @@
/*
* operations.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "blob_granules.hpp"
#include "operations.hpp"
#include "mako.hpp"
#include "logger.hpp"
#include "utils.hpp"
#include <array>
extern thread_local mako::Logger logr;
namespace mako {
using namespace fdb;
const std::array<Operation, MAX_OP> opTable{
{ { "GRV",
{ { StepKind::READ,
[](Transaction& tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
return tx.getReadVersion().eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString&) {
if (f && !f.error()) {
f.get<future_var::Int64>();
}
} } },
1,
false },
{ "GET",
{ { StepKind::READ,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
return tx.get(key, false /*snapshot*/).eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} } },
1,
false },
{ "GETRANGE",
{ { StepKind::READ,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
return tx
.getRange(key_select::firstGreaterOrEqual(begin),
key_select::lastLessOrEqual(end, 1),
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
false /*snapshot*/,
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
.eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::KeyValueArray>();
}
} } },
1,
false },
{ "SGET",
{ { StepKind::READ,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
return tx.get(key, true /*snapshot*/).eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} } },
1,
false },
{ "SGETRANGE",
{ {
StepKind::READ,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
return tx
.getRange(key_select::firstGreaterOrEqual(begin),
key_select::lastLessOrEqual(end, 1),
0 /*limit*/,
0 /*target_bytes*/,
args.streaming_mode,
0 /*iteration*/,
true /*snapshot*/,
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
.eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::KeyValueArray>();
}
} } },
1,
false },
{ "UPDATE",
{ { StepKind::READ,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
return tx.get(key, false /*snapshot*/).eraseType();
},
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} },
{ StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(value.data(), args.value_length);
tx.set(key, value);
return Future();
} } },
2,
true },
{ "INSERT",
{ { StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
// key[0..args.key_length] := concat(key_prefix, random_string)
randomString(key.data() + intSize(KEY_PREFIX), args.key_length - intSize(KEY_PREFIX));
randomString(value.data(), args.value_length);
tx.set(key, value);
return Future();
} } },
1,
true },
{ "INSERTRANGE",
{ { StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(value.data(), args.value_length);
// key[0..args.key_length] := concat(prefix, random_string, num[0..range_digits])
const auto range = args.txnspec.ops[OP_INSERTRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
const auto random_len = args.key_length - intSize(KEY_PREFIX) - range_digits;
randomString(&key[intSize(KEY_PREFIX)], random_len);
for (auto i = 0; i < range; i++) {
numericWithFill(&key[args.key_length - range_digits], range_digits, i);
tx.set(key, value);
}
return Future();
} } },
1,
true },
{ "OVERWRITE",
{ { StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(value.data(), args.value_length);
tx.set(key, value);
return Future();
} } },
1,
true },
{ "CLEAR",
{ { StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
tx.clear(key);
return Future();
} } },
1,
true },
{ "SETCLEAR",
{ { StepKind::COMMIT,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
randomString(&key[KEY_PREFIX.size()], args.key_length - intSize(KEY_PREFIX));
randomString(value.data(), args.value_length);
tx.set(key, value);
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
tx.reset(); // assuming commit from step 0 worked.
tx.clear(key); // key should forward unchanged from step 0
return Future();
} } },
2,
true },
{ "CLEARRANGE",
{ { StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
tx.clearRange(begin, end);
return Future();
} } },
1,
true },
{ "SETCLEARRANGE",
{ { StepKind::COMMIT,
[](Transaction& tx, Arguments const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
randomString(value.data(), args.value_length);
// key[0..args.key_length] := concat(prefix, random_string, num[0..range_digits])
const auto range = args.txnspec.ops[OP_SETCLEARRANGE][OP_RANGE];
assert(range > 0);
const auto range_digits = digits(range);
const auto random_len = args.key_length - intSize(KEY_PREFIX) - range_digits;
randomString(&key[KEY_PREFIX.size()], random_len);
for (auto i = 0; i < range; i++) {
numericWithFill(&key[args.key_length - range_digits], range_digits, i);
tx.set(key, value);
if (i == 0)
key_begin.assign(key);
}
return tx.commit().eraseType();
} },
{ StepKind::IMM,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
tx.reset();
tx.clearRange(begin, end);
return Future();
} } },
2,
true },
{ "COMMIT", { { StepKind::NONE, nullptr } }, 0, false },
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, 0, false },
{ "READBLOBGRANULE",
{ { StepKind::ON_ERROR,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
auto err = Error{};
err = tx.setOptionNothrow(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, BytesRef());
if (err) {
// Issuing read/writes before disabling RYW results in error.
// Possible malformed workload?
// As workloads execute in sequence, retrying would likely repeat this error.
fmt::print(stderr, "ERROR: TR_OPTION_READ_YOUR_WRITES_DISABLE: {}", err.what());
return Future();
}
// Allocate a separate context per call to avoid multiple threads accessing
auto user_context = blob_granules::local_file::UserContext(args.bg_file_path);
auto api_context = blob_granules::local_file::createApiContext(user_context, args.bg_materialize_files);
auto r = tx.readBlobGranules(begin,
end,
0 /* beginVersion*/,
-2, /* endVersion. -2 (latestVersion) is use txn read version */
api_context);
user_context.clear();
auto out = Result::KeyValueArray{};
err = r.getKeyValueArrayNothrow(out);
if (!err || err.is(2037 /*blob_granule_not_materialized*/))
return Future();
const auto level = (err.is(1020 /*not_committed*/) || err.is(1021 /*commit_unknown_result*/) ||
err.is(1213 /*tag_throttled*/))
? VERBOSE_WARN
: VERBOSE_NONE;
logr.printWithLogLevel(level, "ERROR", "get_keyvalue_array() after readBlobGranules(): {}", err.what());
return tx.onError(err).eraseType();
} } },
1,
false } }
};
} // namespace mako

View File

@ -0,0 +1,140 @@
/*
* operations.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_OPERATIONS_HPP
#define MAKO_OPERATIONS_HPP
#include <fdb_api.hpp>
#include <array>
#include <cassert>
#include <string_view>
#include <tuple>
#include <utility>
#include <vector>
#include "macro.hpp"
#include "mako.hpp"
namespace mako {
// determines how resultant future will be handled
enum class StepKind {
NONE, ///< not part of the table: OP_TRANSACTION, OP_COMMIT
IMM, ///< non-future ops that return immediately: e.g. set, clear_range
READ, ///< blockable reads: get(), get_range(), get_read_version, ...
COMMIT, ///< self-explanatory
ON_ERROR ///< future is a result of tx.on_error()
};
// Ops that doesn't have concrete steps to execute and are there for measurements only
force_inline bool isAbstractOp(int op) noexcept {
return op == OP_COMMIT || op == OP_TRANSACTION;
}
using StepFunction = fdb::Future (*)(fdb::Transaction& tx,
Arguments const&,
fdb::ByteString& /*key1*/,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
using PostStepFunction = void (*)(fdb::Future&,
fdb::Transaction& tx,
Arguments const&,
fdb::ByteString& /*key1*/,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
struct Step {
StepKind kind;
StepFunction step_func_;
PostStepFunction post_step_func_{ nullptr };
};
struct Operation {
std::string_view name_;
Step steps_[2];
int num_steps_;
bool needs_commit_;
std::string_view name() const noexcept { return name_; }
StepKind stepKind(int step) const noexcept {
assert(step < steps());
return steps_[step].kind;
}
StepFunction stepFunction(int step) const noexcept { return steps_[step].step_func_; }
PostStepFunction postStepFunction(int step) const noexcept { return steps_[step].post_step_func_; }
// how many steps in this op?
int steps() const noexcept { return num_steps_; }
// does the op needs to commit some time after its final step?
bool needsCommit() const noexcept { return needs_commit_; }
};
extern const std::array<Operation, MAX_OP> opTable;
force_inline char const* getOpName(int ops_code) {
if (ops_code >= 0 && ops_code < MAX_OP)
return opTable[ops_code].name().data();
return "";
}
struct OpIterator {
int op, count, step;
bool operator==(const OpIterator& other) const noexcept {
return op == other.op && count == other.count && step == other.step;
}
bool operator!=(const OpIterator& other) const noexcept { return !(*this == other); }
StepKind stepKind() const noexcept { return opTable[op].stepKind(step); }
char const* opName() const noexcept { return getOpName(op); }
};
constexpr const OpIterator OpEnd = OpIterator{ MAX_OP, -1, -1 };
force_inline OpIterator getOpBegin(Arguments const& args) noexcept {
for (auto op = 0; op < MAX_OP; op++) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] == 0)
continue;
return OpIterator{ op, 0, 0 };
}
return OpEnd;
}
force_inline OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept {
auto& [op, count, step] = current;
assert(op < MAX_OP && !isAbstractOp(op));
if (opTable[op].steps() > step + 1)
return OpIterator{ op, count, step + 1 };
count++;
for (; op < MAX_OP; op++, count = 0) {
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] <= count)
continue;
return OpIterator{ op, count, 0 };
}
return OpEnd;
}
} // namespace mako
#endif /* MAKO_OPERATIONS_HPP */

View File

@ -0,0 +1,26 @@
/*
* process.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_PROCESS_HPP
#define MAKO_PROCESS_HPP
enum class ProcKind { MAIN, WORKER, STATS };
#endif /*MAKO_PROCESS_HPP*/

View File

@ -0,0 +1,108 @@
/*
* shm.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_SHM_HPP
#define MAKO_SHM_HPP
#include <atomic>
#include <cassert>
#include <cstdint>
#include "stats.hpp"
/* shared memory */
constexpr const int SIGNAL_RED = 0;
constexpr const int SIGNAL_GREEN = 1;
constexpr const int SIGNAL_OFF = 2;
// controlled, safer access to shared memory
namespace mako::shared_memory {
struct Header {
std::atomic<int> signal = ATOMIC_VAR_INIT(SIGNAL_OFF);
std::atomic<int> readycount = ATOMIC_VAR_INIT(0);
std::atomic<double> throttle_factor = ATOMIC_VAR_INIT(1.0);
std::atomic<int> stopcount = ATOMIC_VAR_INIT(0);
};
struct LayoutHelper {
Header hdr;
ThreadStatistics stats;
};
inline size_t storageSize(int num_processes, int num_threads) noexcept {
assert(num_processes >= 1 && num_threads >= 1);
return sizeof(LayoutHelper) + sizeof(ThreadStatistics) * ((num_processes * num_threads) - 1);
}
class Access {
void* base;
int num_processes;
int num_threads;
static inline ThreadStatistics& statsSlot(void* shm_base,
int num_threads,
int process_idx,
int thread_idx) noexcept {
return (&static_cast<LayoutHelper*>(shm_base)->stats)[process_idx * num_threads + thread_idx];
}
public:
Access(void* shm, int num_processes, int num_threads) noexcept
: base(shm), num_processes(num_processes), num_threads(num_threads) {}
Access() noexcept : Access(nullptr, 0, 0) {}
Access(const Access&) noexcept = default;
Access& operator=(const Access&) noexcept = default;
size_t size() const noexcept { return storageSize(num_processes, num_threads); }
void initMemory() noexcept {
new (&header()) Header{};
for (auto i = 0; i < num_processes; i++)
for (auto j = 0; j < num_threads; j++)
new (&statsSlot(i, j)) ThreadStatistics();
}
Header const& headerConst() const noexcept { return *static_cast<Header const*>(base); }
Header& header() const noexcept { return *static_cast<Header*>(base); }
ThreadStatistics const* statsConstArray() const noexcept {
return &statsSlot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
}
ThreadStatistics* statsArray() const noexcept {
return &statsSlot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
}
ThreadStatistics const& statsConstSlot(int process_idx, int thread_idx) const noexcept {
return statsSlot(base, num_threads, process_idx, thread_idx);
}
ThreadStatistics& statsSlot(int process_idx, int thread_idx) const noexcept {
return statsSlot(base, num_threads, process_idx, thread_idx);
}
};
} // namespace mako::shared_memory
#endif /* MAKO_SHM_HPP */

View File

@ -0,0 +1,177 @@
/*
* stats.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_STATS_HPP
#define MAKO_STATS_HPP
#include <array>
#include <cstdint>
#include <cstring>
#include <list>
#include <new>
#include <utility>
#include "operations.hpp"
#include "time.hpp"
namespace mako {
/* rough cap on the number of samples to avoid OOM hindering benchmark */
constexpr const size_t SAMPLE_CAP = 2000000;
/* size of each block to get detailed latency for each operation */
constexpr const size_t LAT_BLOCK_SIZE = 4093;
/* hard cap on the number of sample blocks = 488 */
constexpr const size_t MAX_LAT_BLOCKS = SAMPLE_CAP / LAT_BLOCK_SIZE;
/* memory block allocated to each operation when collecting detailed latency */
class LatencySampleBlock {
uint64_t samples[LAT_BLOCK_SIZE]{
0,
};
uint64_t index{ 0 };
public:
LatencySampleBlock() noexcept = default;
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
void put(timediff_t td) {
assert(!full());
samples[index++] = toIntegerMicroseconds(td);
}
// return {data block, number of samples}
std::pair<uint64_t const*, size_t> data() const noexcept { return { samples, index }; }
};
/* collect sampled latencies until OOM is hit */
class LatencySampleBin {
std::list<LatencySampleBlock> blocks;
bool noMoreAlloc{ false };
bool tryAlloc() {
try {
blocks.emplace_back();
} catch (const std::bad_alloc&) {
noMoreAlloc = true;
return false;
}
return true;
}
public:
void reserveOneBlock() {
if (blocks.empty())
tryAlloc();
}
void put(timediff_t td) {
if (blocks.empty() || blocks.back().full()) {
if (blocks.size() >= MAX_LAT_BLOCKS || noMoreAlloc || !tryAlloc())
return;
}
blocks.back().put(td);
}
// iterate & apply for each block user function void(uint64_t const*, size_t)
template <typename Func>
void forEachBlock(Func&& fn) const {
for (const auto& block : blocks) {
auto [ptr, cnt] = block.data();
fn(ptr, cnt);
}
}
};
class alignas(64) ThreadStatistics {
uint64_t conflicts;
uint64_t total_errors;
uint64_t ops[MAX_OP];
uint64_t errors[MAX_OP];
uint64_t latency_samples[MAX_OP];
uint64_t latency_us_total[MAX_OP];
uint64_t latency_us_min[MAX_OP];
uint64_t latency_us_max[MAX_OP];
public:
ThreadStatistics() noexcept {
memset(this, 0, sizeof(ThreadStatistics));
memset(latency_us_min, 0xff, sizeof(latency_us_min));
}
ThreadStatistics(const ThreadStatistics& other) noexcept = default;
ThreadStatistics& operator=(const ThreadStatistics& other) noexcept = default;
uint64_t getConflictCount() const noexcept { return conflicts; }
uint64_t getOpCount(int op) const noexcept { return ops[op]; }
uint64_t getErrorCount(int op) const noexcept { return errors[op]; }
uint64_t getTotalErrorCount() const noexcept { return total_errors; }
uint64_t getLatencySampleCount(int op) const noexcept { return latency_samples[op]; }
uint64_t getLatencyUsTotal(int op) const noexcept { return latency_us_total[op]; }
uint64_t getLatencyUsMin(int op) const noexcept { return latency_us_min[op]; }
uint64_t getLatencyUsMax(int op) const noexcept { return latency_us_max[op]; }
// with 'this' as final aggregation, factor in 'other'
void combine(const ThreadStatistics& other) {
conflicts += other.conflicts;
for (auto op = 0; op < MAX_OP; op++) {
ops[op] += other.ops[op];
errors[op] += other.errors[op];
total_errors += other.errors[op];
latency_samples[op] += other.latency_samples[op];
latency_us_total[op] += other.latency_us_total[op];
if (latency_us_min[op] > other.latency_us_min[op])
latency_us_min[op] = other.latency_us_min[op];
if (latency_us_max[op] < other.latency_us_max[op])
latency_us_max[op] = other.latency_us_max[op];
}
}
void incrConflictCount() noexcept { conflicts++; }
// non-commit write operations aren't measured for time.
void incrOpCount(int op) noexcept { ops[op]++; }
void incrErrorCount(int op) noexcept {
total_errors++;
errors[op]++;
}
void addLatency(int op, timediff_t diff) noexcept {
const auto latency_us = toIntegerMicroseconds(diff);
latency_samples[op]++;
latency_us_total[op] += latency_us;
if (latency_us_min[op] > latency_us)
latency_us_min[op] = latency_us;
if (latency_us_max[op] < latency_us)
latency_us_max[op] = latency_us;
}
};
using LatencySampleBinArray = std::array<LatencySampleBin, MAX_OP>;
} // namespace mako
#endif /* MAKO_STATS_HPP */

View File

@ -0,0 +1,77 @@
/*
* time.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MAKO_TIME_HPP
#define MAKO_TIME_HPP
#include <chrono>
namespace mako {
/* time measurement helpers */
using std::chrono::steady_clock;
using timepoint_t = decltype(steady_clock::now());
using timediff_t = decltype(std::declval<timepoint_t>() - std::declval<timepoint_t>());
template <typename Duration>
double toDoubleSeconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
}
template <typename Duration>
uint64_t toIntegerSeconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
}
template <typename Duration>
uint64_t toIntegerMicroseconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
}
// timing helpers
struct StartAtCtor {};
class Stopwatch {
timepoint_t p1, p2;
public:
Stopwatch() noexcept : p1(), p2() {}
Stopwatch(StartAtCtor) noexcept { start(); }
Stopwatch(timepoint_t start_time) noexcept : p1(start_time), p2() {}
Stopwatch(const Stopwatch&) noexcept = default;
Stopwatch& operator=(const Stopwatch&) noexcept = default;
timepoint_t getStart() const noexcept { return p1; }
timepoint_t getStop() const noexcept { return p2; }
void start() noexcept { p1 = steady_clock::now(); }
Stopwatch& stop() noexcept {
p2 = steady_clock::now();
return *this;
}
Stopwatch& setStop(timepoint_t p_stop) noexcept {
p2 = p_stop;
return *this;
}
void startFromStop() noexcept { p1 = p2; }
auto diff() const noexcept { return p2 - p1; }
};
} // namespace mako
#endif /* MAKO_TIME_HPP */

View File

@ -1,136 +0,0 @@
#include "utils.h"
#include "mako.h"
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* uniform-distribution random */
int urand(int low, int high) {
double r = rand() / (1.0 + RAND_MAX);
int range = high - low + 1;
return (int)((r * range) + low);
}
/* random string */
/* len is the buffer size, must include null */
void randstr(char* str, int len) {
int i;
for (i = 0; i < len - 1; i++) {
str[i] = '!' + urand(0, 'z' - '!'); /* generage a char from '!' to 'z' */
}
str[len - 1] = '\0';
}
/* random numeric string */
/* len is the buffer size, must include null */
void randnumstr(char* str, int len) {
int i;
for (i = 0; i < len - 1; i++) {
str[i] = '0' + urand(0, 9); /* generage a char from '!' to 'z' */
}
str[len - 1] = '\0';
}
/* return the first key to be inserted */
int insert_begin(int rows, int p_idx, int t_idx, int total_p, int total_t) {
double interval = (double)rows / total_p / total_t;
return (int)(round(interval * ((p_idx * total_t) + t_idx)));
}
/* return the last key to be inserted */
int insert_end(int rows, int p_idx, int t_idx, int total_p, int total_t) {
double interval = (double)rows / total_p / total_t;
return (int)(round(interval * ((p_idx * total_t) + t_idx + 1) - 1));
}
/* devide val equally among threads */
int compute_thread_portion(int val, int p_idx, int t_idx, int total_p, int total_t) {
int interval = val / total_p / total_t;
int remaining = val - (interval * total_p * total_t);
if ((p_idx * total_t + t_idx) < remaining) {
return interval + 1;
} else if (interval == 0) {
return -1;
}
/* else */
return interval;
}
/* number of digits */
int digits(int num) {
int digits = 0;
while (num > 0) {
num /= 10;
digits++;
}
return digits;
}
/* generate a key for a given key number */
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
/* len is the buffer size, key length + null */
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len) {
const int rowdigit = digits(rows);
const int prefixoffset = prefixpadding ? len - (prefixlen + rowdigit) - 1 : 0;
char* prefixstr = (char*)alloca(sizeof(char) * (prefixlen + rowdigit + 1));
snprintf(prefixstr, prefixlen + rowdigit + 1, "%s%0.*d", prefix, rowdigit, num);
memset(str, 'x', len);
memcpy(str + prefixoffset, prefixstr, prefixlen + rowdigit);
str[len - 1] = '\0';
}
/* This is another sorting algorithm used to calculate latency parameters */
/* We moved from radix sort to quick sort to avoid extra space used in radix sort */
#if 0
uint64_t get_max(uint64_t arr[], int n) {
uint64_t mx = arr[0];
for (int i = 1; i < n; i++) {
if (arr[i] > mx) {
mx = arr[i];
}
}
return mx;
}
void bucket_data(uint64_t arr[], int n, uint64_t exp) {
// uint64_t output[n];
int i, count[10] = { 0 };
uint64_t* output = (uint64_t*)malloc(sizeof(uint64_t) * n);
for (i = 0; i < n; i++) {
count[(arr[i] / exp) % 10]++;
}
for (i = 1; i < 10; i++) {
count[i] += count[i - 1];
}
for (i = n - 1; i >= 0; i--) {
output[count[(arr[i] / exp) % 10] - 1] = arr[i];
count[(arr[i] / exp) % 10]--;
}
for (i = 0; i < n; i++) {
arr[i] = output[i];
}
free(output);
}
// The main function is to sort arr[] of size n using Radix Sort
void radix_sort(uint64_t* arr, int n) {
// Find the maximum number to know number of digits
uint64_t m = get_max(arr, n);
for (uint64_t exp = 1; m / exp > 0; exp *= 10) bucket_data(arr, n, exp);
}
#endif
int compare(const void* a, const void* b) {
const uint64_t* da = (const uint64_t*)a;
const uint64_t* db = (const uint64_t*)b;
return (*da > *db) - (*da < *db);
}
// The main function is to sort arr[] of size n using Quick Sort
void quick_sort(uint64_t* arr, int n) {
qsort(arr, n, sizeof(uint64_t), compare);
}

View File

@ -0,0 +1,54 @@
/*
* utils.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils.hpp"
#include "mako.hpp"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fmt/format.h>
namespace mako {
/* return the last key to be inserted */
/* devide val equally among threads */
int computeThreadPortion(int val, int p_idx, int t_idx, int total_p, int total_t) {
int interval = val / total_p / total_t;
int remaining = val - (interval * total_p * total_t);
if ((p_idx * total_t + t_idx) < remaining) {
return interval + 1;
} else if (interval == 0) {
return -1;
}
/* else */
return interval;
}
/* number of digits */
int digits(int num) {
int digits = 0;
while (num > 0) {
num /= 10;
digits++;
}
return digits;
}
} // namespace mako

View File

@ -1,65 +0,0 @@
#ifndef UTILS_H
#define UTILS_H
#pragma once
#include <stdint.h>
/* uniform-distribution random */
/* return a uniform random number between low and high, both inclusive */
int urand(int low, int high);
/* write a random string of the length of (len-1) to memory pointed by str
* with a null-termination character at str[len-1].
*/
void randstr(char* str, int len);
/* write a random numeric string of the length of (len-1) to memory pointed by str
* with a null-termination character at str[len-1].
*/
void randnumstr(char* str, int len);
/* given the total number of rows to be inserted,
* the worker process index p_idx and the thread index t_idx (both 0-based),
* and the total number of processes, total_p, and threads, total_t,
* returns the first row number assigned to this partition.
*/
int insert_begin(int rows, int p_idx, int t_idx, int total_p, int total_t);
/* similar to insert_begin, insert_end returns the last row numer */
int insert_end(int rows, int p_idx, int t_idx, int total_p, int total_t);
/* devide a value equally among threads */
int compute_thread_portion(int val, int p_idx, int t_idx, int total_p, int total_t);
/* similar to insert_begin/end, compute_thread_tps computes
* the per-thread target TPS for given configuration.
*/
#define compute_thread_tps(val, p_idx, t_idx, total_p, total_t) \
compute_thread_portion(val, p_idx, t_idx, total_p, total_t)
/* similar to compute_thread_tps,
* compute_thread_iters computs the number of iterations.
*/
#define compute_thread_iters(val, p_idx, t_idx, total_p, total_t) \
compute_thread_portion(val, p_idx, t_idx, total_p, total_t)
/* get the number of digits */
int digits(int num);
/* generate a key for a given key number */
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
/* len is the buffer size, key length + null */
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len);
#if 0
// The main function is to sort arr[] of size n using Radix Sort
void radix_sort(uint64_t arr[], int n);
void bucket_data(uint64_t arr[], int n, uint64_t exp);
uint64_t get_max(uint64_t arr[], int n);
#endif
// The main function is to sort arr[] of size n using Quick Sort
void quick_sort(uint64_t arr[], int n);
int compare(const void* a, const void* b);
#endif /* UTILS_H */

View File

@ -0,0 +1,195 @@
/*
* utils.hpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef UTILS_HPP
#define UTILS_HPP
#pragma once
#include "macro.hpp"
#include "mako.hpp"
#include "fdbclient/zipf.h"
#include <cassert>
#include <chrono>
#include <cstdint>
#include <string_view>
#include <type_traits>
#include <fmt/format.h>
namespace mako {
/* uniform-distribution random */
/* return a uniform random number between low and high, both inclusive */
force_inline int urand(int low, int high) {
double r = rand() / (1.0 + RAND_MAX);
int range = high - low + 1;
return (int)((r * range) + low);
}
force_inline int nextKey(Arguments const& args) {
if (args.zipf)
return zipfian_next();
return urand(0, args.rows - 1);
}
force_inline int intSize(std::string_view sv) {
return static_cast<int>(sv.size());
}
/* random string */
template <typename Char>
force_inline void randomString(Char* str, int len) {
assert(len >= 0);
for (auto i = 0; i < len; i++) {
str[i] = ('!' + urand(0, 'z' - '!')); /* generate a char from '!' to 'z' */
}
}
/* given the total number of rows to be inserted,
* the worker process index p_idx and the thread index t_idx (both 0-based),
* and the total number of processes, total_p, and threads, total_t,
* returns the first row number assigned to this partition.
*/
force_inline int insertBegin(int rows, int p_idx, int t_idx, int total_p, int total_t) {
double interval = (double)rows / total_p / total_t;
return (int)(round(interval * ((p_idx * total_t) + t_idx)));
}
/* similar to insertBegin, insertEnd returns the last row numer */
force_inline int insertEnd(int rows, int p_idx, int t_idx, int total_p, int total_t) {
double interval = (double)rows / total_p / total_t;
return (int)(round(interval * ((p_idx * total_t) + t_idx + 1) - 1));
}
/* devide a value equally among threads */
int computeThreadPortion(int val, int p_idx, int t_idx, int total_p, int total_t);
/* similar to insertBegin/end, computeThreadTps computes
* the per-thread target TPS for given configuration.
*/
#define computeThreadTps(val, p_idx, t_idx, total_p, total_t) computeThreadPortion(val, p_idx, t_idx, total_p, total_t)
/* similar to computeThreadTps,
* computeThreadIters computs the number of iterations.
*/
#define computeThreadIters(val, p_idx, t_idx, total_p, total_t) \
computeThreadPortion(val, p_idx, t_idx, total_p, total_t)
/* get the number of digits */
int digits(int num);
/* fill memory slice [str, str + len) as stringified, zero-padded num */
template <typename Char>
force_inline void numericWithFill(Char* str, int len, int num) {
static_assert(sizeof(Char) == 1);
assert(num >= 0);
memset(str, '0', len);
for (auto i = len - 1; num > 0 && i >= 0; i--, num /= 10) {
str[i] = (num % 10) + '0';
}
}
/* generate a key for a given key number */
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
template <typename Char>
void genKey(Char* str, std::string_view prefix, Arguments const& args, int num) {
static_assert(sizeof(Char) == 1);
memset(str, 'x', args.key_length);
const auto prefix_len = static_cast<int>(prefix.size());
auto pos = args.prefixpadding ? (args.key_length - prefix_len - args.row_digits) : 0;
memcpy(&str[pos], prefix.data(), prefix_len);
pos += prefix_len;
numericWithFill(&str[pos], args.row_digits, num);
}
template <typename Char>
force_inline void prepareKeys(int op,
std::basic_string<Char>& key1,
std::basic_string<Char>& key2,
Arguments const& args) {
const auto key1_num = nextKey(args);
genKey(key1.data(), KEY_PREFIX, args, key1_num);
if (args.txnspec.ops[op][OP_RANGE] > 0) {
const auto key2_num = std::min(key1_num + args.txnspec.ops[op][OP_RANGE] - 1, args.rows - 1);
genKey(key2.data(), KEY_PREFIX, args, key2_num);
}
}
// invoke user-provided callable when object goes out of scope.
template <typename Func>
class ExitGuard {
std::decay_t<Func> fn;
public:
ExitGuard(Func&& fn) : fn(std::forward<Func>(fn)) {}
~ExitGuard() { fn(); }
};
// invoke user-provided callable when stack unwinds by exception.
template <typename Func>
class FailGuard {
std::decay_t<Func> fn;
public:
FailGuard(Func&& fn) : fn(std::forward<Func>(fn)) {}
~FailGuard() {
if (std::uncaught_exceptions()) {
fn();
}
}
};
// trace helpers
constexpr const int STATS_TITLE_WIDTH = 12;
constexpr const int STATS_FIELD_WIDTH = 12;
template <typename Value>
void putTitle(Value&& value) {
fmt::print("{0: <{1}} ", std::forward<Value>(value), STATS_TITLE_WIDTH);
}
template <typename Value>
void putTitleRight(Value&& value) {
fmt::print("{0: >{1}} ", std::forward<Value>(value), STATS_TITLE_WIDTH);
}
inline void putTitleBar() {
fmt::print("{0:=<{1}} ", "", STATS_TITLE_WIDTH);
}
template <typename Value>
void putField(Value&& value) {
fmt::print("{0: >{1}} ", std::forward<Value>(value), STATS_FIELD_WIDTH);
}
inline void putFieldBar() {
fmt::print("{0:=>{1}} ", "", STATS_FIELD_WIDTH);
}
template <typename Value>
void putFieldFloat(Value&& value, int precision) {
fmt::print("{0: >{1}.{2}f} ", std::forward<Value>(value), STATS_FIELD_WIDTH, precision);
}
} // namespace mako
#endif /* UTILS_HPP */