Merge pull request #3720 from sfc-gh-xwang/mako

update mako and WriteTagThrottling workload so as to trigger tag throttling event
This commit is contained in:
Trevor Clinkenbeard 2020-09-10 15:21:00 -07:00 committed by GitHub
commit 732a457ba4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 176 additions and 91 deletions

View File

@ -54,7 +54,8 @@ FILE* debugme; /* descriptor used for debug messages */
int err = wait_future(_f); \
if (err) { \
int err2; \
if ((err != 1020 /* not_committed */) && (err != 1021 /* commit_unknown_result */)) { \
if ((err != 1020 /* not_committed */) && (err != 1021 /* commit_unknown_result */) && \
(err != 1213 /* tag_throttled */)) { \
fprintf(stderr, "ERROR: Error %s (%d) occured at %s\n", #_func, err, fdb_get_error(err)); \
} else { \
fprintf(annoyme, "ERROR: Error %s (%d) occured at %s\n", #_func, err, fdb_get_error(err)); \
@ -698,7 +699,7 @@ retryTxn:
}
int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps, volatile double* throttle_factor,
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace, lat_block_t* block[],
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace, int dotagging, lat_block_t* block[],
int* elem_size, bool* is_memory_allocated) {
int xacts = 0;
int64_t total_xacts = 0;
@ -710,6 +711,7 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
int current_tps;
char* traceid;
int tracetimer = 0;
char* tagstr;
if (thread_tps < 0) return 0;
@ -717,6 +719,12 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
traceid = (char*)malloc(32);
}
if(dotagging) {
tagstr = (char*)calloc(16, 1);
memcpy(tagstr, KEYPREFIX, KEYPREFIXLEN);
memcpy(tagstr + KEYPREFIXLEN, args->txntagging_prefix, TAGPREFIXLENGTH_MAX);
}
current_tps = (int)((double)thread_tps * *throttle_factor);
keystr = (char*)malloc(sizeof(char) * args->key_length + 1);
@ -774,6 +782,7 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
}
}
} else {
if (thread_tps > 0) {
/* 1 second not passed, throttle */
@ -783,6 +792,17 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
}
} /* throttle or txntrace */
/* enable transaction tagging */
if (dotagging > 0) {
sprintf(tagstr + KEYPREFIXLEN + TAGPREFIXLENGTH_MAX, "%03d", urand(0, args->txntagging - 1));
fdb_error_t err = fdb_transaction_set_option(transaction, FDB_TR_OPTION_AUTO_THROTTLE_TAG,
(uint8_t*)tagstr, 16);
if (err) {
fprintf(stderr, "ERROR: FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER: %s\n",
fdb_get_error(err));
}
}
rc = run_one_transaction(transaction, args, stats, keystr, keystr2, valstr, block, elem_size,
is_memory_allocated);
if (rc) {
@ -808,6 +828,9 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
if (dotrace) {
free(traceid);
}
if(dotagging) {
free(tagstr);
}
return rc;
}
@ -876,6 +899,7 @@ void* worker_thread(void* thread_args) {
int op;
int i, size;
int dotrace = (worker_id == 0 && thread_id == 0 && args->txntrace) ? args->txntrace : 0;
int dotagging = args->txntagging;
volatile int* signal = &((thread_args_t*)thread_args)->process->shm->signal;
volatile double* throttle_factor = &((thread_args_t*)thread_args)->process->shm->throttle_factor;
volatile int* readycount = &((thread_args_t*)thread_args)->process->shm->readycount;
@ -940,8 +964,8 @@ void* worker_thread(void* thread_args) {
/* run the workload */
else if (args->mode == MODE_RUN) {
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, block,
elem_size, is_memory_allocated);
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters,
signal, stats, dotrace, dotagging, block, elem_size, is_memory_allocated);
if (rc < 0) {
fprintf(stderr, "ERROR: run_workload failed\n");
}
@ -1209,6 +1233,8 @@ int init_args(mako_args_t* args) {
args->tracepath[0] = '\0';
args->traceformat = 0; /* default to client's default (XML) */
args->txntrace = 0;
args->txntagging = 0;
memset(args->txntagging_prefix, 0, TAGPREFIXLENGTH_MAX);
for (i = 0; i < MAX_OP; i++) {
args->txnspec.ops[i][OP_COUNT] = 0;
}
@ -1366,6 +1392,8 @@ void usage() {
printf("%-24s %s\n", " --tracepath=PATH", "Set trace file path");
printf("%-24s %s\n", " --trace_format <xml|json>", "Set trace format (Default: json)");
printf("%-24s %s\n", " --txntrace=sec", "Specify transaction tracing interval (Default: 0)");
printf("%-24s %s\n", " --txntagging", "Specify the number of different transaction tag (Default: 0, max = 1000)");
printf("%-24s %s\n", " --txntagging_prefix", "Specify the prefix of transaction tag - mako${txntagging_prefix} (Default: '')");
printf("%-24s %s\n", " --knobs=KNOBS", "Set client knobs");
printf("%-24s %s\n", " --flatbuffers", "Use flatbuffers");
}
@ -1407,6 +1435,8 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
{ "commitget", no_argument, NULL, ARG_COMMITGET },
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
{ "trace", no_argument, NULL, ARG_TRACE },
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX},
{ "version", no_argument, NULL, ARG_VERSION },
{ NULL, 0, NULL, 0 }
};
@ -1522,8 +1552,25 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
case ARG_TXNTRACE:
args->txntrace = atoi(optarg);
break;
case ARG_TXNTAGGING:
args->txntagging = atoi(optarg);
if(args->txntagging > 1000) {
args->txntagging = 1000;
}
break;
case ARG_TXNTAGGINGPREFIX: {
if(strlen(optarg) > TAGPREFIXLENGTH_MAX) {
fprintf(stderr, "Error: the length of txntagging_prefix is larger than %d\n", TAGPREFIXLENGTH_MAX);
exit(0);
}
memcpy(args->txntagging_prefix, optarg, strlen(optarg));
break;
}
}
}
if ((args->tpsmin == -1) || (args->tpsmin > args->tpsmax)) {
args->tpsmin = args->tpsmax;
}
@ -1580,6 +1627,10 @@ int validate_args(mako_args_t* args) {
fprintf(stderr, "ERROR: Must specify either seconds or iteration\n");
return -1;
}
if(args->txntagging < 0) {
fprintf(stderr, "ERROR: --txntagging must be a non-negative integer\n");
return -1;
}
}
return 0;
}

View File

@ -75,7 +75,9 @@ enum Arguments {
ARG_TPSMIN,
ARG_TPSINTERVAL,
ARG_TPSCHANGE,
ARG_TXNTRACE
ARG_TXNTRACE,
ARG_TXNTAGGING,
ARG_TXNTAGGINGPREFIX
};
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
@ -95,6 +97,7 @@ typedef struct {
} mako_txnspec_t;
#define KNOB_MAX 256
#define TAGPREFIXLENGTH_MAX 8
/* benchmark parameters */
typedef struct {
@ -124,6 +127,8 @@ typedef struct {
char knobs[KNOB_MAX];
uint8_t flatbuffers;
int txntrace;
int txntagging;
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
} mako_args_t;
/* shared memory */

View File

@ -20,6 +20,7 @@
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TagThrottle.h"
@ -34,6 +35,7 @@ constexpr int SAMPLE_SIZE = 10000;
// output TPS and latency of read/set/clear operations to do eyeball check. We want this new feature would not cause
// more load to the cluster (Maybe some visualization tools is needed to show the trend of metrics)
struct WriteTagThrottlingWorkload : KVWorkload {
// Performance metrics
int goodActorTrNum = 0, goodActorRetries = 0, goodActorTooOldRetries = 0, goodActorCommitFailedRetries = 0;
int badActorTrNum = 0, badActorRetries = 0, badActorTooOldRetries = 0, badActorCommitFailedRetries = 0;
@ -46,30 +48,31 @@ struct WriteTagThrottlingWorkload : KVWorkload {
int goodActorPerClient, badActorPerClient;
int numWritePerTr, numReadPerTr, numClearPerTr;
int keyCount;
double badOpRate;
double testDuration, throttleDuration;
double tpsRate;
double badOpRate, hotRangeRate;
double testDuration;
bool writeThrottle;
bool populateData;
// internal states
double trInterval;
TransactionTag badReadTag, badWriteTag, goodTag;
TransactionTag badTag, goodTag;
bool fastSuccess = false;
int rangeEachBadActor = 0;
std::set<std::string> throttledTags;
static constexpr const char* NAME = "WriteTagThrottling";
static constexpr int MIN_TAGS_PER_TRANSACTION = 2;
static constexpr int MIN_MANUAL_THROTTLED_TRANSACTION_TAGS = 3;
static constexpr int MIN_TAGS_PER_TRANSACTION = 1;
static constexpr int MIN_TRANSACTION_TAG_LENGTH = 2;
WriteTagThrottlingWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), badActorCommitLatency(SAMPLE_SIZE), badActorReadLatency(SAMPLE_SIZE),
goodActorCommitLatency(SAMPLE_SIZE), goodActorReadLatency(SAMPLE_SIZE) {
testDuration = getOption(options, LiteralStringRef("testDuration"), 120.0);
throttleDuration = getOption(options, LiteralStringRef("throttleDuration"), testDuration + 120.0);
badOpRate = getOption(options, LiteralStringRef("badOpRate"), 0.9);
tpsRate = getOption(options, LiteralStringRef("tpsRate"), 300.0);
numWritePerTr = getOption(options, LiteralStringRef("numWritePerTr"), 1);
numReadPerTr = getOption(options, LiteralStringRef("numReadPerTr"), 1);
numClearPerTr = getOption(options, LiteralStringRef("numClearPerTr"), 1);
hotRangeRate = getOption(options, LiteralStringRef("hotRangeRate"), 0.1);
populateData = getOption(options, LiteralStringRef("populateData"), true);
writeThrottle = getOption(options, LiteralStringRef("writeThrottle"), false);
badActorPerClient = getOption(options, LiteralStringRef("badActorPerClient"), 1);
@ -77,47 +80,36 @@ struct WriteTagThrottlingWorkload : KVWorkload {
actorCount = goodActorPerClient + badActorPerClient;
keyCount = getOption(options, LiteralStringRef("keyCount"),
std::max(3000, actorCount * 3)); // enough keys to avoid too many conflicts
std::max(3000, clientCount * actorCount * 3)); // enough keys to avoid too many conflicts
trInterval = actorCount * 1.0 / getOption(options, LiteralStringRef("trPerSecond"), 1000);
if(badActorPerClient > 0) {
rangeEachBadActor = keyCount / (clientCount * badActorPerClient);
}
badReadTag = TransactionTag(std::string("bR"));
badWriteTag = TransactionTag(std::string("bW"));
badTag = TransactionTag(std::string("bT"));
goodTag = TransactionTag(std::string("gT"));
}
virtual std::string description() { return WriteTagThrottlingWorkload::NAME; }
// choose a tag
// NOTE: this workload make sense if and only if all following tags are attached to db successfully.
ACTOR static Future<Void> _setup(Database cx, WriteTagThrottlingWorkload* self) {
state TransactionPriority priority = deterministicRandom()->randomChoice(allTransactionPriorities);
state TagSet tagSet1;
state TagSet tagSet2;
state TagSet tagSet3;
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION >= MIN_TAGS_PER_TRANSACTION &&
CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH >= MIN_TRANSACTION_TAG_LENGTH);
tagSet1.addTag(self->badWriteTag);
tagSet2.addTag(self->badReadTag);
tagSet3.addTag(self->goodTag);
ASSERT(SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS >= MIN_MANUAL_THROTTLED_TRANSACTION_TAGS);
wait(ThrottleApi::throttleTags(cx, tagSet1, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
priority));
wait(ThrottleApi::throttleTags(cx, tagSet2, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
priority));
wait(ThrottleApi::throttleTags(cx, tagSet3, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
priority));
if(self->populateData) {
wait(bulkSetup(cx, self, self->keyCount, Promise<double>()));
}
if(self->clientId == 0) {
wait(ThrottleApi::enableAuto(cx, true));
}
return Void();
}
Future<Void> setup(const Database& cx) override {
if (CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION < MIN_TAGS_PER_TRANSACTION ||
CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < MIN_TRANSACTION_TAG_LENGTH ||
SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS < MIN_MANUAL_THROTTLED_TRANSACTION_TAGS) {
CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < MIN_TRANSACTION_TAG_LENGTH ) {
fastSuccess = true;
return Void();
}
return clientId ? Void() : _setup(cx, this);
return _setup(cx, this);
}
ACTOR static Future<Void> _start(Database cx, WriteTagThrottlingWorkload* self) {
vector<Future<Void>> clientActors;
@ -128,6 +120,7 @@ struct WriteTagThrottlingWorkload : KVWorkload {
for (actorId = 0; actorId < self->badActorPerClient; ++actorId) {
clientActors.push_back(clientActor(true, actorId, self->badOpRate, cx, self));
}
clientActors.push_back(throttledTagUpdater(cx, self));
wait(timeout(waitForAll(clientActors), self->testDuration, Void()));
return Void();
}
@ -137,21 +130,21 @@ struct WriteTagThrottlingWorkload : KVWorkload {
}
virtual Future<bool> check(Database const& cx) {
if(fastSuccess) return true;
if (badActorTrNum == 0 && goodActorTrNum == 0) {
TraceEvent(SevError, "NoTransactionsCommitted");
return false;
}
if (writeThrottle) {
if (!badActorThrottleRetries && !goodActorThrottleRetries) {
TraceEvent(SevWarn, "NoThrottleTriggered");
}
if (badActorThrottleRetries < goodActorThrottleRetries) {
TraceEvent(SevError, "IncorrectThrottle")
TraceEvent(SevWarnAlways, "IncorrectThrottle")
.detail("BadActorThrottleRetries", badActorThrottleRetries)
.detail("GoodActorThrottleRetries", goodActorThrottleRetries);
}
if(!throttledTags.empty() && throttledTags.count(badTag.toString()) == 0) {
TraceEvent(SevWarnAlways, "IncorrectThrottle")
.detail("ThrottledTagNumber", throttledTags.size())
.detail("ThrottledTags", setToString(throttledTags));
return false;
}
// TODO check whether write throttled tag is correct after enabling that feature
// NOTE also do eyeball check of Retries.throttle and Avg Latency
}
return true;
@ -193,25 +186,27 @@ struct WriteTagThrottlingWorkload : KVWorkload {
m.push_back(PerfMetric("50% Commit Latency (ms, goodActor)", 1000 * goodActorCommitLatency.median(), true));
}
Standalone<KeyValueRef> operator()(uint64_t n) {
return KeyValueRef(keyForIndex(n), generateVal());
}
// return a key based on useReadKey
Key generateKey(bool useReadKey, int idx) {
Key generateKey(bool useReadKey, int startIdx, int availableRange) {
if (useReadKey) {
return keyForIndex(idx, false);
return keyForIndex(startIdx + deterministicRandom()->randomInt(0, availableRange), false);
}
return getRandomKey();
}
// return a range based on useClearKey
KeyRange generateRange(bool useClearKey, int idx) {
int a = deterministicRandom()->randomInt(0, keyCount / 3);
if (useClearKey) {
if (a < idx) {
return KeyRange(KeyRangeRef(keyForIndex(a, false), keyForIndex(idx, false)));
} else if (a > idx) {
return KeyRange(KeyRangeRef(keyForIndex(idx, false), keyForIndex(a, false)));
} else
return singleKeyRange(keyForIndex(a, false));
KeyRange generateRange(bool useClearKey, int startIdx, int availableRange) {
int a, b;
if(useClearKey) {
a = deterministicRandom()->randomInt(startIdx, availableRange + startIdx);
b = deterministicRandom()->randomInt(startIdx, availableRange + startIdx);
}
else {
a = deterministicRandom()->randomInt(0, keyCount);
b = deterministicRandom()->randomInt(0, keyCount);
}
int b = deterministicRandom()->randomInt(0, keyCount / 3);
if (a > b) std::swap(a, b);
if (a == b) return singleKeyRange(keyForIndex(a, false));
return KeyRange(KeyRangeRef(keyForIndex(a, false), keyForIndex(b, false)));
@ -221,6 +216,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
// read and write value on particular/random Key
ACTOR static Future<Void> clientActor(bool isBadActor, int actorId, double badOpRate, Database cx,
WriteTagThrottlingWorkload* self) {
state int startIdx = (self->clientId * self->badActorPerClient + actorId) * self->rangeEachBadActor;
state int availableRange = std::max(int(self->rangeEachBadActor * self->hotRangeRate), 1);
state double lastTime = now();
state double opStart;
state StringRef key;
@ -234,11 +231,11 @@ struct WriteTagThrottlingWorkload : KVWorkload {
if (self->writeThrottle) {
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION >= MIN_TAGS_PER_TRANSACTION);
tr.options.tags.clear();
tr.options.readTags.clear();
if (isBadActor) {
tr.options.tags.addTag(self->badWriteTag);
tr.options.tags.addTag(self->badReadTag);
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->badTag);
} else if (deterministicRandom()->coinflip()) {
tr.options.tags.addTag(self->goodTag);
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->goodTag);
}
}
@ -247,17 +244,17 @@ struct WriteTagThrottlingWorkload : KVWorkload {
try {
for (i = 0; i < self->numClearPerTr; ++i) {
bool useClearKey = deterministicRandom()->random01() < badOpRate;
tr.clear(self->generateRange(useClearKey, actorId));
tr.clear(self->generateRange(useClearKey, startIdx, availableRange));
}
for (i = 0; i < self->numWritePerTr; ++i) {
bool useReadKey = deterministicRandom()->random01() < badOpRate;
key = self->generateKey(useReadKey, actorId + self->keyCount / 3);
key = self->generateKey(useReadKey, startIdx, availableRange);
tr.set(key, self->generateVal());
}
for (i = 0; i < self->numReadPerTr; ++i) {
bool useReadKey = deterministicRandom()->random01() < badOpRate;
ASSERT(self->keyCount >= actorId);
key = self->generateKey(useReadKey, self->keyCount - actorId);
key = self->generateKey(useReadKey, startIdx, availableRange);
opStart = now();
Optional<Value> v = wait(tr.get(key));
double duration = now() - opStart;
@ -296,10 +293,28 @@ struct WriteTagThrottlingWorkload : KVWorkload {
throw;
}
}
// TODO periodically to ask which tag is throttled; collect other health metrics
// ACTOR static Future<Void> healthMetricsChecker(Database cx, WriteTagThrottlingWorkload* self) {
//
// }
void recordThrottledTags(std::vector<TagThrottleInfo>& tags) {
for(auto& tag: tags) {
throttledTags.insert(tag.tag.toString());
}
}
ACTOR static Future<Void> throttledTagUpdater(Database cx, WriteTagThrottlingWorkload* self) {
state std::vector<TagThrottleInfo> tags;
loop {
wait(delay(1.0));
wait(store(tags, ThrottleApi::getThrottledTags(cx, CLIENT_KNOBS->TOO_MANY, true)));
self->recordThrottledTags(tags);
};
}
static std::string setToString(const std::set<std::string>& myset) {
std::string res;
for(auto& s: myset) {
res.append(s).push_back(' ');
}
return res;
}
};
WorkloadFactory<WriteTagThrottlingWorkload> WriteTagThrottlingWorkloadFactory(WriteTagThrottlingWorkload::NAME);

View File

@ -166,7 +166,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/RedwoodCorrectnessBTree.toml)
add_fdb_test(TEST_FILES rare/TransactionTagApiCorrectness.toml)
add_fdb_test(TEST_FILES rare/TransactionTagSwizzledApiCorrectness.toml)
add_fdb_test(TEST_FILES rare/WriteTagThrottling.toml)
add_fdb_test(
TEST_FILES restarting/from_7.0.0/ConfigureTestRestart-1.txt
restarting/from_7.0.0/ConfigureTestRestart-2.txt)

View File

@ -1,25 +0,0 @@
testTitle=withoutWriteThrottling
testName=WriteTagThrottling
trPerSecond=10000000
keyCount=5000
absentFrac=0.01
numWritePerTr=1
numReadPerTr=1
numClearPerTr=1
badOpRate = 0.9
testName=HealthMetricsApi
testTitle=withWriteThrottling
testName=WriteTagThrottling
writeThrottle = true
trPerSecond=10000000
keyCount=5000
absentFrac=0.01
numWritePerTr=1
numReadPerTr=1
numClearPerTr=1
badOpRate = 0.9
tpsRate=20
testName=HealthMetricsApi

View File

@ -0,0 +1,39 @@
[[test]]
testTitle = 'withoutWriteThrottling'
[[test.workload]]
testName = 'WriteTagThrottling'
populateData = true
trPerSecond=10000000
keyCount=100000
absentFrac=0.01
numWritePerTr=10
numReadPerTr=10
numClearPerTr=0
badOpRate = 0.9
hotRangeRate = 0.001
badActorPerClient = 1
goodActorPerClient = 1
[[test.workload]]
testName='HealthMetricsApi'
[[test]]
testTitle = 'withWriteThrottling'
[[test.workload]]
testName = 'WriteTagThrottling'
writeThrottle = true
trPerSecond=10000000
keyCount=100000
absentFrac=0.01
numWritePerTr=10
numReadPerTr=10
numClearPerTr=0
badOpRate = 0.9
hotRangeRate = 0.001
badActorPerClient = 1
goodActorPerClient = 1
[[test.workload]]
testName='HealthMetricsApi'