Optimizations and restructuring
This commit is contained in:
parent
e74a676e26
commit
0cf1349c37
|
@ -107,7 +107,7 @@ void ResumableStateForRunWorkload::runOneTick() {
|
|||
assert(iter != OpEnd);
|
||||
watch_step.start();
|
||||
if (iter.step == 0 /* first step */) {
|
||||
watch_per_op[iter.op] = Stopwatch(watch_step.getStart());
|
||||
watch_op = Stopwatch(watch_step.getStart());
|
||||
}
|
||||
auto f = Future{};
|
||||
// to minimize context switch overhead, repeat immediately completed ops
|
||||
|
@ -208,9 +208,9 @@ void ResumableStateForRunWorkload::updateStepStats() {
|
|||
if (iter.step + 1 == opTable[iter.op].steps()) {
|
||||
if (opTable[iter.op].needsCommit())
|
||||
needs_commit = true;
|
||||
watch_per_op[iter.op].setStop(watch_step.getStop());
|
||||
watch_op.setStop(watch_step.getStop());
|
||||
if (do_sample) {
|
||||
const auto op_latency = watch_per_op[iter.op].diff();
|
||||
const auto op_latency = watch_op.diff();
|
||||
stats.addLatency(iter.op, op_latency);
|
||||
sample_bins[iter.op].put(op_latency);
|
||||
}
|
||||
|
|
|
@ -87,8 +87,8 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
|
|||
fdb::ByteString key1;
|
||||
fdb::ByteString key2;
|
||||
fdb::ByteString val;
|
||||
std::array<Stopwatch, MAX_OP> watch_per_op;
|
||||
Stopwatch watch_step;
|
||||
Stopwatch watch_op;
|
||||
Stopwatch watch_commit;
|
||||
Stopwatch watch_tx;
|
||||
Stopwatch watch_task;
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* macro.hpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef MAKO_MACRO_HPP
|
||||
#define MAKO_MACRO_HPP
|
||||
|
||||
#if defined(__GNUG__)
|
||||
#define force_inline inline __attribute__((__always_inline__))
|
||||
#elif defined(_MSC_VER)
|
||||
#define force_inline __forceinline
|
||||
#else
|
||||
#error Missing force inline
|
||||
#endif
|
||||
|
||||
#endif /*MAKO_MACRO_HPP*/
|
|
@ -54,12 +54,32 @@
|
|||
#include <fmt/printf.h>
|
||||
#include <fdb_api.hpp>
|
||||
#include "fdbclient/zipf.h"
|
||||
|
||||
#include "async.hpp"
|
||||
#include "future.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "mako.hpp"
|
||||
#include "operations.hpp"
|
||||
#include "process.hpp"
|
||||
#include "utils.hpp"
|
||||
#include "future.hpp"
|
||||
#include "async.hpp"
|
||||
#include "shm.hpp"
|
||||
#include "stats.hpp"
|
||||
#include "time.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* args for threads */
|
||||
struct alignas(64) ThreadArgs {
|
||||
int worker_id;
|
||||
int thread_id;
|
||||
pid_t parent_id;
|
||||
LatencySampleBinArray sample_bins;
|
||||
Arguments const* args;
|
||||
shared_memory::Access shm;
|
||||
fdb::Database database; // database to work with
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
using namespace fdb;
|
||||
using namespace mako;
|
||||
|
@ -201,7 +221,7 @@ int populate(Transaction tx,
|
|||
}
|
||||
|
||||
/* run one iteration of configured task */
|
||||
int runOneTask(Transaction tx,
|
||||
int runOneTask(Transaction& tx,
|
||||
Arguments const& args,
|
||||
ThreadStatistics& stats,
|
||||
LatencySampleBinArray& sample_bins,
|
||||
|
@ -209,25 +229,19 @@ int runOneTask(Transaction tx,
|
|||
ByteString& key2,
|
||||
ByteString& val) {
|
||||
const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0;
|
||||
auto watch_task = Stopwatch{};
|
||||
auto watch_tx = Stopwatch{};
|
||||
auto watch_task = Stopwatch(StartAtCtor{});
|
||||
auto watch_tx = Stopwatch(watch_task.getStart());
|
||||
auto watch_op = Stopwatch{};
|
||||
if (do_sample) {
|
||||
watch_tx.start();
|
||||
watch_task = Stopwatch(watch_tx.getStart());
|
||||
}
|
||||
|
||||
auto op_iter = getOpBegin(args);
|
||||
auto needs_commit = false;
|
||||
task_begin:
|
||||
while (op_iter != OpEnd) {
|
||||
const auto [op, count, step] = op_iter;
|
||||
const auto step_kind = opTable[op].stepKind(step);
|
||||
auto watch_step = Stopwatch{};
|
||||
if (do_sample) {
|
||||
watch_step.start();
|
||||
if (step == 0 /* first step */)
|
||||
watch_op = Stopwatch(watch_step.getStart());
|
||||
}
|
||||
auto watch_step = Stopwatch(StartAtCtor{});
|
||||
if (step == 0 /* first step */)
|
||||
watch_op = Stopwatch(watch_step.getStart());
|
||||
auto f = opTable[op].stepFunction(step)(tx, args, key1, key2, val);
|
||||
auto future_rc = FutureRC::OK;
|
||||
if (f) {
|
||||
|
@ -239,8 +253,7 @@ int runOneTask(Transaction tx,
|
|||
}
|
||||
if (auto postStepFn = opTable[op].postStepFunction(step))
|
||||
postStepFn(f, tx, args, key1, key2, val);
|
||||
if (do_sample)
|
||||
watch_step.stop();
|
||||
watch_step.stop();
|
||||
if (future_rc != FutureRC::OK) {
|
||||
if (future_rc == FutureRC::CONFLICT) {
|
||||
stats.incrConflictCount();
|
||||
|
@ -279,8 +292,8 @@ int runOneTask(Transaction tx,
|
|||
if (step + 1 == opTable[op].steps() /* last step */) {
|
||||
if (opTable[op].needsCommit())
|
||||
needs_commit = true;
|
||||
watch_op.setStop(watch_step.getStop());
|
||||
if (do_sample) {
|
||||
watch_op.setStop(watch_step.getStop());
|
||||
const auto op_latency = watch_op.diff();
|
||||
stats.addLatency(op, op_latency);
|
||||
sample_bins[op].put(op_latency);
|
||||
|
@ -289,46 +302,40 @@ int runOneTask(Transaction tx,
|
|||
}
|
||||
// move to next op
|
||||
op_iter = getOpNext(args, op_iter);
|
||||
|
||||
// reached the end?
|
||||
if (op_iter == OpEnd && (needs_commit || args.commit_get)) {
|
||||
auto watch_commit = Stopwatch();
|
||||
if (do_sample)
|
||||
watch_commit.start();
|
||||
auto f = tx.commit();
|
||||
const auto rc = waitAndHandleError(tx, f, "COMMIT_AT_TX_END");
|
||||
}
|
||||
// reached the end?
|
||||
if (needs_commit || args.commit_get) {
|
||||
auto watch_commit = Stopwatch(StartAtCtor{});
|
||||
auto f = tx.commit();
|
||||
const auto rc = waitAndHandleError(tx, f, "COMMIT_AT_TX_END");
|
||||
watch_commit.stop();
|
||||
watch_tx.setStop(watch_commit.getStop());
|
||||
auto tx_resetter = ExitGuard([&watch_tx, &tx]() {
|
||||
tx.reset();
|
||||
watch_tx.startFromStop();
|
||||
});
|
||||
if (rc == FutureRC::OK) {
|
||||
if (do_sample) {
|
||||
watch_commit.stop();
|
||||
watch_tx.setStop(watch_commit.getStop());
|
||||
const auto commit_latency = watch_commit.diff();
|
||||
const auto tx_duration = watch_tx.diff();
|
||||
stats.addLatency(OP_COMMIT, commit_latency);
|
||||
stats.addLatency(OP_TRANSACTION, tx_duration);
|
||||
sample_bins[OP_COMMIT].put(commit_latency);
|
||||
sample_bins[OP_TRANSACTION].put(tx_duration);
|
||||
}
|
||||
auto tx_resetter = ExitGuard([&do_sample, &watch_tx, &tx]() {
|
||||
tx.reset();
|
||||
if (do_sample)
|
||||
watch_tx.startFromStop();
|
||||
});
|
||||
if (rc == FutureRC::OK) {
|
||||
if (do_sample) {
|
||||
const auto commit_latency = watch_commit.diff();
|
||||
const auto tx_duration = watch_tx.diff();
|
||||
stats.addLatency(OP_COMMIT, commit_latency);
|
||||
stats.addLatency(OP_TRANSACTION, tx_duration);
|
||||
sample_bins[OP_COMMIT].put(commit_latency);
|
||||
sample_bins[OP_TRANSACTION].put(tx_duration);
|
||||
}
|
||||
stats.incrOpCount(OP_COMMIT);
|
||||
stats.incrOpCount(OP_TRANSACTION);
|
||||
} else {
|
||||
if (rc == FutureRC::CONFLICT)
|
||||
stats.incrConflictCount();
|
||||
else
|
||||
stats.incrErrorCount(OP_COMMIT);
|
||||
if (rc == FutureRC::ABORT) {
|
||||
return -1;
|
||||
}
|
||||
// restart from beginning
|
||||
op_iter = getOpBegin(args);
|
||||
stats.incrOpCount(OP_COMMIT);
|
||||
stats.incrOpCount(OP_TRANSACTION);
|
||||
} else {
|
||||
if (rc == FutureRC::CONFLICT)
|
||||
stats.incrConflictCount();
|
||||
else
|
||||
stats.incrErrorCount(OP_COMMIT);
|
||||
if (rc == FutureRC::ABORT) {
|
||||
return -1;
|
||||
}
|
||||
needs_commit = false;
|
||||
// restart from beginning
|
||||
op_iter = getOpBegin(args);
|
||||
goto task_begin;
|
||||
}
|
||||
}
|
||||
// one task iteration has completed successfully
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#define MAKO_HPP
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 710
|
||||
#define FDB_API_VERSION 720
|
||||
#endif
|
||||
|
||||
#include <array>
|
||||
|
@ -43,10 +43,6 @@
|
|||
#else
|
||||
#include <limits.h>
|
||||
#endif
|
||||
#include "operations.hpp"
|
||||
#include "shm.hpp"
|
||||
#include "stats.hpp"
|
||||
#include "time.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
|
@ -85,6 +81,28 @@ enum ArgKind {
|
|||
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
|
||||
};
|
||||
|
||||
/* transaction specification */
|
||||
enum OpKind {
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_TASK, /* pseudo-operation - cumulative time for each iteraton in runWorkload */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
};
|
||||
|
||||
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
|
||||
|
||||
/* we set WorkloadSpec and Arguments only once in the master process,
|
||||
|
@ -153,17 +171,6 @@ constexpr const int SIGNAL_RED = 0;
|
|||
constexpr const int SIGNAL_GREEN = 1;
|
||||
constexpr const int SIGNAL_OFF = 2;
|
||||
|
||||
/* args for threads */
|
||||
struct alignas(64) ThreadArgs {
|
||||
int worker_id;
|
||||
int thread_id;
|
||||
pid_t parent_id;
|
||||
LatencySampleBinArray sample_bins;
|
||||
Arguments const* args;
|
||||
shared_memory::Access shm;
|
||||
fdb::Database database; // database to work with
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_HPP */
|
||||
|
|
|
@ -30,31 +30,6 @@ extern thread_local mako::Logger logr;
|
|||
|
||||
namespace mako {
|
||||
|
||||
OpIterator getOpBegin(Arguments const& args) noexcept {
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] == 0)
|
||||
continue;
|
||||
return OpIterator{ op, 0, 0 };
|
||||
}
|
||||
return OpEnd;
|
||||
}
|
||||
|
||||
OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept {
|
||||
if (OpEnd == current)
|
||||
return OpEnd;
|
||||
auto [op, count, step] = current;
|
||||
assert(op < MAX_OP && !isAbstractOp(op));
|
||||
if (opTable[op].steps() > step + 1)
|
||||
return OpIterator{ op, count, step + 1 };
|
||||
count++;
|
||||
for (; op < MAX_OP; op++, count = 0) {
|
||||
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] <= count)
|
||||
continue;
|
||||
return OpIterator{ op, count, 0 };
|
||||
}
|
||||
return OpEnd;
|
||||
}
|
||||
|
||||
using namespace fdb;
|
||||
|
||||
inline int nextKey(Arguments const& args) {
|
||||
|
@ -63,40 +38,36 @@ inline int nextKey(Arguments const& args) {
|
|||
return urand(0, args.rows - 1);
|
||||
}
|
||||
|
||||
char const* getOpName(int ops_code) {
|
||||
if (ops_code >= 0 && ops_code < MAX_OP)
|
||||
return opTable[ops_code].name().data();
|
||||
return "";
|
||||
}
|
||||
|
||||
const std::array<Operation, MAX_OP> opTable{
|
||||
{ { "GRV",
|
||||
{ { StepKind::READ,
|
||||
[](Transaction tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
|
||||
return tx.getReadVersion().eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString&) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString&) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Int64>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
false },
|
||||
{ "GET",
|
||||
{ { StepKind::READ,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = nextKey(args);
|
||||
genKey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, false /*snapshot*/).eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
false },
|
||||
{ "GETRANGE",
|
||||
{ { StepKind::READ,
|
||||
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = nextKey(args);
|
||||
genKey(begin, KEY_PREFIX, args, num_begin);
|
||||
auto num_end = num_begin + args.txnspec.ops[OP_GETRANGE][OP_RANGE] - 1;
|
||||
|
@ -114,30 +85,32 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
|
||||
.eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::KeyValueArray>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
false },
|
||||
{ "SGET",
|
||||
{ { StepKind::READ,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = nextKey(args);
|
||||
genKey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, true /*snapshot*/).eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
false },
|
||||
{ "SGETRANGE",
|
||||
{ {
|
||||
|
||||
StepKind::READ,
|
||||
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = nextKey(args);
|
||||
genKey(begin, KEY_PREFIX, args, num_begin);
|
||||
auto num_end = num_begin + args.txnspec.ops[OP_SGETRANGE][OP_RANGE] - 1;
|
||||
|
@ -155,34 +128,36 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
args.txnspec.ops[OP_SGETRANGE][OP_REVERSE])
|
||||
.eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::KeyValueArray>();
|
||||
}
|
||||
} } },
|
||||
1,
|
||||
false },
|
||||
{ "UPDATE",
|
||||
{ { StepKind::READ,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = nextKey(args);
|
||||
genKey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, false /*snapshot*/).eraseType();
|
||||
},
|
||||
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
[](Future& f, Transaction&, Arguments const&, ByteString&, ByteString&, ByteString& val) {
|
||||
if (f && !f.error()) {
|
||||
f.get<future_var::Value>();
|
||||
}
|
||||
} },
|
||||
{ StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
randomString(value, args.value_length);
|
||||
tx.set(key, value);
|
||||
return Future();
|
||||
} } },
|
||||
2,
|
||||
true },
|
||||
{ "INSERT",
|
||||
{ { StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genKeyPrefix(key, KEY_PREFIX, args);
|
||||
// concat([padding], key_prefix, random_string): reasonably unique
|
||||
randomString<false /*clear-before-append*/>(key, args.key_length - static_cast<int>(key.size()));
|
||||
|
@ -190,10 +165,11 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
tx.set(key, value);
|
||||
return Future();
|
||||
} } },
|
||||
1,
|
||||
true },
|
||||
{ "INSERTRANGE",
|
||||
{ { StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genKeyPrefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
const auto range = args.txnspec.ops[OP_INSERTRANGE][OP_RANGE];
|
||||
|
@ -211,27 +187,30 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
}
|
||||
return Future();
|
||||
} } },
|
||||
1,
|
||||
true },
|
||||
{ "OVERWRITE",
|
||||
{ { StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genKey(key, KEY_PREFIX, args, nextKey(args));
|
||||
randomString(value, args.value_length);
|
||||
tx.set(key, value);
|
||||
return Future();
|
||||
} } },
|
||||
1,
|
||||
true },
|
||||
{ "CLEAR",
|
||||
{ { StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
genKey(key, KEY_PREFIX, args, nextKey(args));
|
||||
tx.clear(key);
|
||||
return Future();
|
||||
} } },
|
||||
1,
|
||||
true },
|
||||
{ "SETCLEAR",
|
||||
{ { StepKind::COMMIT,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genKeyPrefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
randomString<false /*append-after-clear*/>(key, args.key_length - prefix_len);
|
||||
|
@ -240,15 +219,16 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
return tx.commit().eraseType();
|
||||
} },
|
||||
{ StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
tx.reset(); // assuming commit from step 0 worked.
|
||||
tx.clear(key); // key should forward unchanged from step 0
|
||||
return Future();
|
||||
} } },
|
||||
2,
|
||||
true },
|
||||
{ "CLEARRANGE",
|
||||
{ { StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = nextKey(args);
|
||||
genKey(begin, KEY_PREFIX, args, num_begin);
|
||||
const auto range = args.txnspec.ops[OP_CLEARRANGE][OP_RANGE];
|
||||
|
@ -257,10 +237,11 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
tx.clearRange(begin, end);
|
||||
return Future();
|
||||
} } },
|
||||
1,
|
||||
true },
|
||||
{ "SETCLEARRANGE",
|
||||
{ { StepKind::COMMIT,
|
||||
[](Transaction tx, Arguments const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
|
||||
genKeyPrefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
const auto range = args.txnspec.ops[OP_SETCLEARRANGE][OP_RANGE];
|
||||
|
@ -285,18 +266,19 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
return tx.commit().eraseType();
|
||||
} },
|
||||
{ StepKind::IMM,
|
||||
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
tx.reset();
|
||||
tx.clearRange(begin, end);
|
||||
return Future();
|
||||
} } },
|
||||
2,
|
||||
true },
|
||||
{ "COMMIT", { { StepKind::NONE, nullptr } }, false },
|
||||
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, false },
|
||||
{ "TASK", { { StepKind::NONE, nullptr } }, false },
|
||||
{ "COMMIT", { { StepKind::NONE, nullptr } }, 0, false },
|
||||
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, 0, false },
|
||||
{ "TASK", { { StepKind::NONE, nullptr } }, 0, false },
|
||||
{ "READBLOBGRANULE",
|
||||
{ { StepKind::ON_ERROR,
|
||||
[](Transaction tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = nextKey(args);
|
||||
genKey(begin, KEY_PREFIX, args, num_begin);
|
||||
const auto range = args.txnspec.ops[OP_READ_BG][OP_RANGE];
|
||||
|
@ -337,6 +319,7 @@ const std::array<Operation, MAX_OP> opTable{
|
|||
logr.printWithLogLevel(level, "ERROR", "get_keyvalue_array() after readBlobGranules(): {}", err.what());
|
||||
return tx.onError(err).eraseType();
|
||||
} } },
|
||||
1,
|
||||
false } }
|
||||
};
|
||||
|
||||
|
|
|
@ -28,33 +28,11 @@
|
|||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "macro.hpp"
|
||||
#include "mako.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
struct Arguments;
|
||||
|
||||
/* transaction specification */
|
||||
enum OpKind {
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_TASK, /* pseudo-operation - cumulative time for each iteraton in run_workload */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
};
|
||||
|
||||
constexpr const int OP_COUNT = 0;
|
||||
constexpr const int OP_RANGE = 1;
|
||||
constexpr const int OP_REVERSE = 2;
|
||||
|
@ -69,18 +47,18 @@ enum class StepKind {
|
|||
};
|
||||
|
||||
// Ops that doesn't have concrete steps to execute and are there for measurements only
|
||||
inline bool isAbstractOp(int op) noexcept {
|
||||
force_inline bool isAbstractOp(int op) noexcept {
|
||||
return op == OP_COMMIT || op == OP_TRANSACTION; // || op == OP_TASK;
|
||||
}
|
||||
|
||||
using StepFunction = fdb::Future (*)(fdb::Transaction tx,
|
||||
using StepFunction = fdb::Future (*)(fdb::Transaction& tx,
|
||||
Arguments const&,
|
||||
fdb::ByteString& /*key1*/,
|
||||
fdb::ByteString& /*key2*/,
|
||||
fdb::ByteString& /*value*/);
|
||||
|
||||
using PostStepFunction = void (*)(fdb::Future,
|
||||
fdb::Transaction tx,
|
||||
using PostStepFunction = void (*)(fdb::Future&,
|
||||
fdb::Transaction& tx,
|
||||
Arguments const&,
|
||||
fdb::ByteString& /*key1*/,
|
||||
fdb::ByteString& /*key2*/,
|
||||
|
@ -92,15 +70,12 @@ struct Step {
|
|||
PostStepFunction post_step_func_{ nullptr };
|
||||
};
|
||||
|
||||
class Operation {
|
||||
struct Operation {
|
||||
std::string_view name_;
|
||||
std::vector<Step> steps_;
|
||||
Step steps_[2];
|
||||
int num_steps_;
|
||||
bool needs_commit_;
|
||||
|
||||
public:
|
||||
Operation(std::string_view name, std::vector<Step>&& steps, bool needs_commit)
|
||||
: name_(name), steps_(std::move(steps)), needs_commit_(needs_commit) {}
|
||||
|
||||
std::string_view name() const noexcept { return name_; }
|
||||
|
||||
StepKind stepKind(int step) const noexcept {
|
||||
|
@ -112,15 +87,19 @@ public:
|
|||
|
||||
PostStepFunction postStepFunction(int step) const noexcept { return steps_[step].post_step_func_; }
|
||||
// how many steps in this op?
|
||||
int steps() const noexcept { return static_cast<int>(steps_.size()); }
|
||||
int steps() const noexcept { return num_steps_; }
|
||||
// does the op needs to commit some time after its final step?
|
||||
bool needsCommit() const noexcept { return needs_commit_; }
|
||||
};
|
||||
|
||||
char const* getOpName(int ops_code);
|
||||
|
||||
extern const std::array<Operation, MAX_OP> opTable;
|
||||
|
||||
force_inline char const* getOpName(int ops_code) {
|
||||
if (ops_code >= 0 && ops_code < MAX_OP)
|
||||
return opTable[ops_code].name().data();
|
||||
return "";
|
||||
}
|
||||
|
||||
struct OpIterator {
|
||||
int op, count, step;
|
||||
|
||||
|
@ -137,9 +116,30 @@ struct OpIterator {
|
|||
|
||||
constexpr const OpIterator OpEnd = OpIterator{ MAX_OP, -1, -1 };
|
||||
|
||||
OpIterator getOpBegin(Arguments const& args) noexcept;
|
||||
force_inline OpIterator getOpBegin(Arguments const& args) noexcept {
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] == 0)
|
||||
continue;
|
||||
return OpIterator{ op, 0, 0 };
|
||||
}
|
||||
return OpEnd;
|
||||
}
|
||||
|
||||
OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept;
|
||||
force_inline OpIterator getOpNext(Arguments const& args, OpIterator current) noexcept {
|
||||
if (OpEnd == current)
|
||||
return OpEnd;
|
||||
auto [op, count, step] = current;
|
||||
assert(op < MAX_OP && !isAbstractOp(op));
|
||||
if (opTable[op].steps() > step + 1)
|
||||
return OpIterator{ op, count, step + 1 };
|
||||
count++;
|
||||
for (; op < MAX_OP; op++, count = 0) {
|
||||
if (isAbstractOp(op) || args.txnspec.ops[op][OP_COUNT] <= count)
|
||||
continue;
|
||||
return OpIterator{ op, count, 0 };
|
||||
}
|
||||
return OpEnd;
|
||||
}
|
||||
|
||||
} // namespace mako
|
||||
|
||||
|
|
Loading…
Reference in New Issue