Introduce post-step functions

- To try accessing data from ready future for get* APIs,
simulating real workload.
- Adapt to modified blob granules API
This commit is contained in:
Junhyun Shim 2022-04-14 14:38:55 +02:00
parent dfa35c860e
commit 4b3f6cbd0b
8 changed files with 64 additions and 3332 deletions

View File

@ -23,7 +23,7 @@
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 710
#define FDB_API_VERSION 720
#endif
#include <cstdint>

View File

@ -125,6 +125,8 @@ repeat_immediate_steps:
} else {
// step is blocking. register a continuation and return
f.then([this, state = shared_from_this()](Future f) {
if (auto postStepFn = opTable[iter.op].postStepFunction(iter.step))
postStepFn(f, tx, args, key1, key2, val);
if (iter.stepKind() != StepKind::ON_ERROR) {
if (auto err = f.error()) {
logr.printWithLogLevel(err.retryable() ? VERBOSE_WARN : VERBOSE_NONE,

View File

@ -27,7 +27,12 @@ extern thread_local mako::Logger logr;
namespace mako::blob_granules::local_file {
int64_t startLoad(const char* filename, int filenameLength, int64_t offset, int64_t length, void* userContext) {
int64_t startLoad(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* userContext) {
FILE* fp;
char full_fname[PATH_MAX]{
0,
@ -103,6 +108,7 @@ fdb::native::FDBReadBlobGranuleContext createApiContext(UserContext& ctx, bool m
ret.get_load_f = &getLoad;
ret.free_load_f = &freeLoad;
ret.debugNoMaterialize = !materialize_files;
ret.granuleParallelism = 2; // TODO make knob or setting for changing this?
return ret;
}

File diff suppressed because it is too large Load Diff

View File

@ -233,6 +233,8 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
future_rc = waitAndHandleForOnError(tx, f, opTable[op].name());
}
}
if (auto postStepFn = opTable[op].postStepFunction(step))
postStepFn(f, tx, args, key1, key2, val);
watch_step.stop();
if (future_rc != FutureRC::OK) {
if (future_rc == FutureRC::CONFLICT) {

View File

@ -1,209 +0,0 @@
#ifndef MAKO_H
#define MAKO_H
#pragma once
#ifndef FDB_API_VERSION
#define FDB_API_VERSION 720
#endif
#include <foundationdb/fdb_c.h>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__APPLE__)
#include <sys/syslimits.h>
#else
#include <limits.h>
#endif
#define VERBOSE_NONE 0
#define VERBOSE_DEFAULT 1
#define VERBOSE_ANNOYING 2
#define VERBOSE_DEBUG 3
#define MODE_INVALID -1
#define MODE_CLEAN 0
#define MODE_BUILD 1
#define MODE_RUN 2
#define FDB_SUCCESS 0
#define FDB_ERROR_RETRY -1
#define FDB_ERROR_ABORT -2
#define FDB_ERROR_CONFLICT -3
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
/* transaction specification */
enum Operations {
OP_GETREADVERSION,
OP_GET,
OP_GETRANGE,
OP_SGET,
OP_SGETRANGE,
OP_UPDATE,
OP_INSERT,
OP_INSERTRANGE,
OP_OVERWRITE,
OP_CLEAR,
OP_SETCLEAR,
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
OP_READ_BG,
MAX_OP /* must be the last item */
};
#define OP_COUNT 0
#define OP_RANGE 1
#define OP_REVERSE 2
/* for long arguments */
enum Arguments {
ARG_KEYLEN,
ARG_VALLEN,
ARG_TPS,
ARG_COMMITGET,
ARG_SAMPLING,
ARG_VERSION,
ARG_KNOBS,
ARG_FLATBUFFERS,
ARG_LOGGROUP,
ARG_PREFIXPADDING,
ARG_TRACE,
ARG_TRACEPATH,
ARG_TRACEFORMAT,
ARG_TPSMAX,
ARG_TPSMIN,
ARG_TPSINTERVAL,
ARG_TPSCHANGE,
ARG_TXNTRACE,
ARG_TXNTAGGING,
ARG_TXNTAGGINGPREFIX,
ARG_STREAMING_MODE,
ARG_DISABLE_RYW,
ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT,
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
};
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
#define KEYPREFIX "mako"
#define KEYPREFIXLEN 4
#define TEMP_DATA_STORE "/tmp/makoTemp"
/* we set mako_txnspec_t and mako_args_t only once in the master process,
* and won't be touched by child processes.
*/
typedef struct {
/* for each operation, it stores "count", "range" and "reverse" */
int ops[MAX_OP][3];
} mako_txnspec_t;
#define LOGGROUP_MAX 256
#define KNOB_MAX 256
#define TAGPREFIXLENGTH_MAX 8
#define NUM_CLUSTERS_MAX 3
#define NUM_DATABASES_MAX 10
#define MAX_BG_IDS 1000
/* benchmark parameters */
typedef struct {
int api_version;
int json;
int num_processes;
int num_threads;
int mode;
int rows; /* is 2 billion enough? */
int seconds;
int iteration;
int tpsmax;
int tpsmin;
int tpsinterval;
int tpschange;
int sampling;
int key_length;
int value_length;
int zipf;
int commit_get;
int verbose;
mako_txnspec_t txnspec;
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
int num_fdb_clusters;
int num_databases;
char log_group[LOGGROUP_MAX];
int prefixpadding;
int trace;
char tracepath[PATH_MAX];
int traceformat; /* 0 - XML, 1 - JSON */
char knobs[KNOB_MAX];
uint8_t flatbuffers;
int txntrace;
int txntagging;
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
FDBStreamingMode streaming_mode;
int client_threads_per_version;
int disable_ryw;
char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
} mako_args_t;
/* shared memory */
#define SIGNAL_RED 0
#define SIGNAL_GREEN 1
#define SIGNAL_OFF 2
typedef struct {
int signal;
int readycount;
double throttle_factor;
int stopcount;
} mako_shmhdr_t;
/* memory block allocated to each operation when collecting detailed latency */
typedef struct {
uint64_t data[LAT_BLOCK_SIZE];
void* next_block;
} lat_block_t;
typedef struct {
uint64_t xacts;
uint64_t conflicts;
uint64_t ops[MAX_OP];
uint64_t errors[MAX_OP];
uint64_t latency_samples[MAX_OP];
uint64_t latency_us_total[MAX_OP];
uint64_t latency_us_min[MAX_OP];
uint64_t latency_us_max[MAX_OP];
} mako_stats_t;
/* per-process information */
typedef struct {
int worker_id;
pid_t parent_id;
mako_args_t* args;
mako_shmhdr_t* shm;
FDBDatabase* databases[NUM_DATABASES_MAX];
} process_info_t;
/* args for threads */
typedef struct {
int thread_id;
int database_index; // index of the database to do work to
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
specific operation */
lat_block_t* block[MAX_OP];
process_info_t* process;
} thread_args_t;
/* process type */
typedef enum { proc_master = 0, proc_worker, proc_stats } proc_type_t;
#endif /* MAKO_H */

View File

@ -74,6 +74,11 @@ const std::array<Operation, MAX_OP> opTable{
{ { StepKind::READ,
[](Transaction tx, Arguments const&, ByteString&, ByteString&, ByteString&) {
return tx.getReadVersion().eraseType();
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString&) {
if (f && !f.error()) {
f.get<future_var::Int64>();
}
} } },
false },
{ "GET",
@ -82,6 +87,11 @@ const std::array<Operation, MAX_OP> opTable{
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} } },
false },
{ "GETRANGE",
@ -103,6 +113,11 @@ const std::array<Operation, MAX_OP> opTable{
false /*snapshot*/,
args.txnspec.ops[OP_GETRANGE][OP_REVERSE])
.eraseType();
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::KeyValueArray>();
}
} } },
false },
{ "SGET",
@ -111,6 +126,11 @@ const std::array<Operation, MAX_OP> opTable{
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, true /*snapshot*/).eraseType();
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} } },
false },
{ "SGETRANGE",
@ -134,9 +154,12 @@ const std::array<Operation, MAX_OP> opTable{
true /*snapshot*/,
args.txnspec.ops[OP_SGETRANGE][OP_REVERSE])
.eraseType();
}
} },
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::KeyValueArray>();
}
} } },
false },
{ "UPDATE",
{ { StepKind::READ,
@ -144,6 +167,11 @@ const std::array<Operation, MAX_OP> opTable{
const auto num = nextKey(args);
genKey(key, KEY_PREFIX, args, num);
return tx.get(key, false /*snapshot*/).eraseType();
},
[](Future f, Transaction, Arguments const&, ByteString&, ByteString&, ByteString& val) {
if (f && !f.error()) {
f.get<future_var::Value>();
}
} },
{ StepKind::IMM,
[](Transaction tx, Arguments const& args, ByteString& key, ByteString&, ByteString& value) {
@ -290,8 +318,11 @@ const std::array<Operation, MAX_OP> opTable{
auto api_context = blob_granules::local_file::createApiContext(user_context, args.bg_materialize_files);
auto r =
tx.readBlobGranules(begin, end, 0 /*begin_version*/, -1 /*end_version, use txn's*/, api_context);
auto r = tx.readBlobGranules(begin,
end,
0 /* beginVersion*/,
-2, /* endVersion. -2 (latestVersion) is use txn read version */
api_context);
user_context.clear();

View File

@ -79,8 +79,20 @@ using StepFunction = fdb::Future (*)(fdb::Transaction tx,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
using PostStepFunction = void (*)(fdb::Future,
fdb::Transaction tx,
Arguments const&,
fdb::ByteString& /*key1*/,
fdb::ByteString& /*key2*/,
fdb::ByteString& /*value*/);
struct Step {
StepKind kind;
StepFunction step_func_;
PostStepFunction post_step_func_{ nullptr };
};
class Operation {
using Step = std::pair<StepKind, StepFunction>;
std::string_view name_;
std::vector<Step> steps_;
bool needs_commit_;
@ -93,11 +105,12 @@ public:
StepKind stepKind(int step) const noexcept {
assert(step < steps());
return steps_[step].first;
return steps_[step].kind;
}
StepFunction stepFunction(int step) const noexcept { return steps_[step].second; }
StepFunction stepFunction(int step) const noexcept { return steps_[step].step_func_; }
PostStepFunction postStepFunction(int step) const noexcept { return steps_[step].post_step_func_; }
// how many steps in this op?
int steps() const noexcept { return static_cast<int>(steps_.size()); }
// does the op needs to commit some time after its final step?