update WriteTagThrottling workload

This commit is contained in:
Xiaoxi Wang 2020-08-28 19:48:00 +00:00
parent f824f0296e
commit 944c78ac54
1 changed files with 75 additions and 60 deletions

View File

@ -20,6 +20,7 @@
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TagThrottle.h"
@ -34,6 +35,7 @@ constexpr int SAMPLE_SIZE = 10000;
// output TPS and latency of read/set/clear operations to do eyeball check. We want this new feature would not cause
// more load to the cluster (Maybe some visualization tools is needed to show the trend of metrics)
struct WriteTagThrottlingWorkload : KVWorkload {
// Performance metrics
int goodActorTrNum = 0, goodActorRetries = 0, goodActorTooOldRetries = 0, goodActorCommitFailedRetries = 0;
int badActorTrNum = 0, badActorRetries = 0, badActorTooOldRetries = 0, badActorCommitFailedRetries = 0;
@ -46,30 +48,31 @@ struct WriteTagThrottlingWorkload : KVWorkload {
int goodActorPerClient, badActorPerClient;
int numWritePerTr, numReadPerTr, numClearPerTr;
int keyCount;
double badOpRate;
double testDuration, throttleDuration;
double tpsRate;
double badOpRate, hotRangeRate;
double testDuration;
bool writeThrottle;
bool populateData;
// internal states
double trInterval;
TransactionTag badReadTag, badWriteTag, goodTag;
TransactionTag badTag, goodTag;
bool fastSuccess = false;
int rangeEachBadActor = 0;
std::set<std::string> throttledTags;
static constexpr const char* NAME = "WriteTagThrottling";
static constexpr int MIN_TAGS_PER_TRANSACTION = 2;
static constexpr int MIN_TAGS_PER_TRANSACTION = 1;
static constexpr int MIN_TRANSACTION_TAG_LENGTH = 2;
WriteTagThrottlingWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), badActorCommitLatency(SAMPLE_SIZE), badActorReadLatency(SAMPLE_SIZE),
goodActorCommitLatency(SAMPLE_SIZE), goodActorReadLatency(SAMPLE_SIZE) {
testDuration = getOption(options, LiteralStringRef("testDuration"), 120.0);
throttleDuration = getOption(options, LiteralStringRef("throttleDuration"), testDuration + 120.0);
badOpRate = getOption(options, LiteralStringRef("badOpRate"), 0.9);
tpsRate = getOption(options, LiteralStringRef("tpsRate"), 300.0);
numWritePerTr = getOption(options, LiteralStringRef("numWritePerTr"), 1);
numReadPerTr = getOption(options, LiteralStringRef("numReadPerTr"), 1);
numClearPerTr = getOption(options, LiteralStringRef("numClearPerTr"), 1);
hotRangeRate = getOption(options, LiteralStringRef("hotRangeRate"), 0.1);
populateData = getOption(options, LiteralStringRef("populateData"), true);
writeThrottle = getOption(options, LiteralStringRef("writeThrottle"), false);
badActorPerClient = getOption(options, LiteralStringRef("badActorPerClient"), 1);
@ -77,47 +80,36 @@ struct WriteTagThrottlingWorkload : KVWorkload {
actorCount = goodActorPerClient + badActorPerClient;
keyCount = getOption(options, LiteralStringRef("keyCount"),
std::max(3000, actorCount * 3)); // enough keys to avoid too many conflicts
std::max(3000, clientCount * actorCount * 3)); // enough keys to avoid too many conflicts
trInterval = actorCount * 1.0 / getOption(options, LiteralStringRef("trPerSecond"), 1000);
if(badActorPerClient > 0) {
rangeEachBadActor = keyCount / (clientCount * badActorPerClient);
badReadTag = TransactionTag(std::string("bR"));
badWriteTag = TransactionTag(std::string("bW"));
badTag = TransactionTag(std::string("bT"));
goodTag = TransactionTag(std::string("gT"));
virtual std::string description() { return WriteTagThrottlingWorkload::NAME; }
// choose a tag
// NOTE: this workload make sense if and only if all following tags are attached to db successfully.
ACTOR static Future<Void> _setup(Database cx, WriteTagThrottlingWorkload* self) {
state TransactionPriority priority = deterministicRandom()->randomChoice(allTransactionPriorities);
state TagSet tagSet1;
state TagSet tagSet2;
state TagSet tagSet3;
wait(ThrottleApi::throttleTags(cx, tagSet1, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
wait(ThrottleApi::throttleTags(cx, tagSet2, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
wait(ThrottleApi::throttleTags(cx, tagSet3, self->tpsRate, self->throttleDuration, TagThrottleType::MANUAL,
if(self->populateData) {
wait(bulkSetup(cx, self, self->keyCount, Promise<double>()));
if(self->clientId == 0) {
wait(ThrottleApi::enableAuto(cx, true));
return Void();
Future<Void> setup(const Database& cx) override {
fastSuccess = true;
return Void();
return clientId ? Void() : _setup(cx, this);
return _setup(cx, this);
ACTOR static Future<Void> _start(Database cx, WriteTagThrottlingWorkload* self) {
vector<Future<Void>> clientActors;
@ -128,6 +120,7 @@ struct WriteTagThrottlingWorkload : KVWorkload {
for (actorId = 0; actorId < self->badActorPerClient; ++actorId) {
clientActors.push_back(clientActor(true, actorId, self->badOpRate, cx, self));
clientActors.push_back(throttledTagUpdater(cx, self));
wait(timeout(waitForAll(clientActors), self->testDuration, Void()));
return Void();
@ -137,21 +130,21 @@ struct WriteTagThrottlingWorkload : KVWorkload {
virtual Future<bool> check(Database const& cx) {
if(fastSuccess) return true;
if (badActorTrNum == 0 && goodActorTrNum == 0) {
TraceEvent(SevError, "NoTransactionsCommitted");
return false;
if (writeThrottle) {
if (!badActorThrottleRetries && !goodActorThrottleRetries) {
TraceEvent(SevWarn, "NoThrottleTriggered");
if (badActorThrottleRetries < goodActorThrottleRetries) {
TraceEvent(SevError, "IncorrectThrottle")
TraceEvent(SevWarnAlways, "IncorrectThrottle")
.detail("BadActorThrottleRetries", badActorThrottleRetries)
.detail("GoodActorThrottleRetries", goodActorThrottleRetries);
if(!throttledTags.empty() && throttledTags.count(badTag.toString()) == 0) {
TraceEvent(SevWarnAlways, "IncorrectThrottle")
.detail("ThrottledTagNumber", throttledTags.size())
.detail("ThrottledTags", setToString(throttledTags));
return false;
// TODO check whether write throttled tag is correct after enabling that feature
// NOTE also do eyeball check of Retries.throttle and Avg Latency
return true;
@ -193,25 +186,27 @@ struct WriteTagThrottlingWorkload : KVWorkload {
m.push_back(PerfMetric("50% Commit Latency (ms, goodActor)", 1000 * goodActorCommitLatency.median(), true));
Standalone<KeyValueRef> operator()(uint64_t n) {
return KeyValueRef(keyForIndex(n), generateVal());
// return a key based on useReadKey
Key generateKey(bool useReadKey, int idx) {
Key generateKey(bool useReadKey, int startIdx, int availableRange) {
if (useReadKey) {
return keyForIndex(idx, false);
return keyForIndex(startIdx + deterministicRandom()->randomInt(0, availableRange), false);
return getRandomKey();
// return a range based on useClearKey
KeyRange generateRange(bool useClearKey, int idx) {
int a = deterministicRandom()->randomInt(0, keyCount / 3);
if (useClearKey) {
if (a < idx) {
return KeyRange(KeyRangeRef(keyForIndex(a, false), keyForIndex(idx, false)));
} else if (a > idx) {
return KeyRange(KeyRangeRef(keyForIndex(idx, false), keyForIndex(a, false)));
} else
return singleKeyRange(keyForIndex(a, false));
KeyRange generateRange(bool useClearKey, int startIdx, int availableRange) {
int a, b;
if(useClearKey) {
a = deterministicRandom()->randomInt(startIdx, availableRange + startIdx);
b = deterministicRandom()->randomInt(startIdx, availableRange + startIdx);
else {
a = deterministicRandom()->randomInt(0, keyCount);
b = deterministicRandom()->randomInt(0, keyCount);
int b = deterministicRandom()->randomInt(0, keyCount / 3);
if (a > b) std::swap(a, b);
if (a == b) return singleKeyRange(keyForIndex(a, false));
return KeyRange(KeyRangeRef(keyForIndex(a, false), keyForIndex(b, false)));
@ -221,6 +216,8 @@ struct WriteTagThrottlingWorkload : KVWorkload {
// read and write value on particular/random Key
ACTOR static Future<Void> clientActor(bool isBadActor, int actorId, double badOpRate, Database cx,
WriteTagThrottlingWorkload* self) {
state int startIdx = (self->clientId * self->badActorPerClient + actorId) * self->rangeEachBadActor;
state int availableRange = std::max(int(self->rangeEachBadActor * self->hotRangeRate), 1);
state double lastTime = now();
state double opStart;
state StringRef key;
@ -234,11 +231,11 @@ struct WriteTagThrottlingWorkload : KVWorkload {
if (self->writeThrottle) {
if (isBadActor) {
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->badTag);
} else if (deterministicRandom()->coinflip()) {
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->goodTag);
@ -247,17 +244,17 @@ struct WriteTagThrottlingWorkload : KVWorkload {
try {
for (i = 0; i < self->numClearPerTr; ++i) {
bool useClearKey = deterministicRandom()->random01() < badOpRate;
tr.clear(self->generateRange(useClearKey, actorId));
tr.clear(self->generateRange(useClearKey, startIdx, availableRange));
for (i = 0; i < self->numWritePerTr; ++i) {
bool useReadKey = deterministicRandom()->random01() < badOpRate;
key = self->generateKey(useReadKey, actorId + self->keyCount / 3);
key = self->generateKey(useReadKey, startIdx, availableRange);
tr.set(key, self->generateVal());
for (i = 0; i < self->numReadPerTr; ++i) {
bool useReadKey = deterministicRandom()->random01() < badOpRate;
ASSERT(self->keyCount >= actorId);
key = self->generateKey(useReadKey, self->keyCount - actorId);
key = self->generateKey(useReadKey, startIdx, availableRange);
opStart = now();
Optional<Value> v = wait(tr.get(key));
double duration = now() - opStart;
@ -296,10 +293,28 @@ struct WriteTagThrottlingWorkload : KVWorkload {
// TODO periodically to ask which tag is throttled; collect other health metrics
// ACTOR static Future<Void> healthMetricsChecker(Database cx, WriteTagThrottlingWorkload* self) {
// }
void recordThrottledTags(std::vector<TagThrottleInfo>& tags) {
for(auto& tag: tags) {
ACTOR static Future<Void> throttledTagUpdater(Database cx, WriteTagThrottlingWorkload* self) {
state std::vector<TagThrottleInfo> tags;
loop {
wait(store(tags, ThrottleApi::getThrottledTags(cx, CLIENT_KNOBS->TOO_MANY, true)));
static std::string setToString(const std::set<std::string>& myset) {
std::string res;
for(auto& s: myset) {
res.append(s).push_back(' ');
return res;
WorkloadFactory<WriteTagThrottlingWorkload> WriteTagThrottlingWorkloadFactory(WriteTagThrottlingWorkload::NAME);