fix bug
This commit is contained in:
parent
12a51b2d39
commit
0bbcc75e4b
|
@ -2,6 +2,7 @@
|
||||||
#include "fdbserver/TesterInterface.actor.h"
|
#include "fdbserver/TesterInterface.actor.h"
|
||||||
#include "workloads.actor.h"
|
#include "workloads.actor.h"
|
||||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||||
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
#include "flow/actorcompiler.h"
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -158,11 +159,10 @@ struct MakoWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void randStr(std::string& str, const int& len) {
|
static void randStr(std::string& str, const int& len) {
|
||||||
static char randomStr[BUFFERSIZE];
|
str.resize(len);
|
||||||
for (int i = 0; i < len; ++i) {
|
for (int i = 0; i < len; ++i) {
|
||||||
randomStr[i] = g_random->randomAlphaNumeric();
|
str[i] = g_random->randomAlphaNumeric();
|
||||||
}
|
}
|
||||||
str.assign(randomStr, len);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void randStr(char *str, const int& len){
|
static void randStr(char *str, const int& len){
|
||||||
|
@ -175,14 +175,14 @@ struct MakoWorkload : KVWorkload {
|
||||||
// TODO : add limit on maxvaluebytes
|
// TODO : add limit on maxvaluebytes
|
||||||
const int length = g_random->randomInt(minValueBytes, maxValueBytes + 1);
|
const int length = g_random->randomInt(minValueBytes, maxValueBytes + 1);
|
||||||
randStr(valueString, length);
|
randStr(valueString, length);
|
||||||
return StringRef((uint8_t*)valueString.c_str(), length);
|
return StringRef(reinterpret_cast<const uint8_t*>(valueString.c_str()), length);
|
||||||
}
|
}
|
||||||
|
|
||||||
Key ind2key(const uint64_t& ind) {
|
Key ind2key(const uint64_t& ind) {
|
||||||
// allocate one more byte for potential null-character added by snprintf
|
// allocate one more byte for potential null-character added by sprintf
|
||||||
Key result = makeString(keyBytes+1);
|
Key result = makeString(keyBytes+1);
|
||||||
uint8_t* data = mutateString(result);
|
char* data = reinterpret_cast<char*>(mutateString(result));
|
||||||
sprintf((char*)data, (KEYPREFIX + "%0*d").c_str(), seqNumLen, ind);
|
sprintf(data, (KEYPREFIX + "%0*d").c_str(), seqNumLen, ind);
|
||||||
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
||||||
data[i] = 'x';
|
data[i] = 'x';
|
||||||
return StringRef(result.begin(), keyBytes);
|
return StringRef(result.begin(), keyBytes);
|
||||||
|
@ -210,7 +210,6 @@ struct MakoWorkload : KVWorkload {
|
||||||
loop {
|
loop {
|
||||||
elapsed += self->periodicLoggingInterval;
|
elapsed += self->periodicLoggingInterval;
|
||||||
wait( delayUntil(start + elapsed));
|
wait( delayUntil(start + elapsed));
|
||||||
// TODO: if there is any other traceEvent to add
|
|
||||||
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed);
|
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed);
|
||||||
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
||||||
|
|
||||||
|
@ -266,7 +265,7 @@ struct MakoWorkload : KVWorkload {
|
||||||
|
|
||||||
state Key rkey;
|
state Key rkey;
|
||||||
state Value rval;
|
state Value rval;
|
||||||
state Transaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
state bool doCommit;
|
state bool doCommit;
|
||||||
state int i, count;
|
state int i, count;
|
||||||
state uint64_t range, indBegin, indEnd, rangeLen;
|
state uint64_t range, indBegin, indEnd, rangeLen;
|
||||||
|
@ -275,7 +274,7 @@ struct MakoWorkload : KVWorkload {
|
||||||
state KeyRangeRef rkeyRangeRef;
|
state KeyRangeRef rkeyRangeRef;
|
||||||
state vector<int> perOpCount(MAX_OP, 0);
|
state vector<int> perOpCount(MAX_OP, 0);
|
||||||
|
|
||||||
TraceEvent("ClientStarting").detail("AcotrIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// used for throttling
|
// used for throttling
|
||||||
|
@ -314,46 +313,46 @@ struct MakoWorkload : KVWorkload {
|
||||||
} else if (i == OP_UPDATE){
|
} else if (i == OP_UPDATE){
|
||||||
// get followed by set, TODO: do I need to add counter of get here
|
// get followed by set, TODO: do I need to add counter of get here
|
||||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
||||||
tr.set(rkey, rval, true);
|
tr.set(rkey, rval);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERT){
|
} else if (i == OP_INSERT){
|
||||||
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
||||||
randStr((char*) mutateString(rkey) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
tr.set(rkey, rval, true);
|
tr.set(rkey, rval);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_INSERTRANGE){
|
} else if (i == OP_INSERTRANGE){
|
||||||
uint8_t *rkeyPtr = mutateString(rkey);
|
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||||
randStr((char*) rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
randStr(rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
for (int range_i = 0; range_i < range; ++range_i){
|
for (int range_i = 0; range_i < range; ++range_i){
|
||||||
sprintf((char*) rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
sprintf(rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
||||||
tr.set(rkey, self->randomValue(), true);
|
tr.set(rkey, self->randomValue());
|
||||||
}
|
}
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEAR){
|
} else if (i == OP_CLEAR){
|
||||||
tr.clear(rkey, true);
|
tr.clear(rkey);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if(i == OP_SETCLEAR){
|
} else if(i == OP_SETCLEAR){
|
||||||
randStr((char*) mutateString(rkey) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
randStr(reinterpret_cast<char*>(mutateString(rkey)) + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
tr.set(rkey, rval, true);
|
tr.set(rkey, rval);
|
||||||
// commit the change and update metrics
|
// commit the change and update metrics
|
||||||
commitStart = now();
|
commitStart = now();
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||||
++perOpCount[OP_COMMIT];
|
++perOpCount[OP_COMMIT];
|
||||||
tr.reset();
|
tr.reset();
|
||||||
tr.clear(rkey, true);
|
tr.clear(rkey);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_CLEARRANGE){
|
} else if (i == OP_CLEARRANGE){
|
||||||
tr.clear(rkeyRangeRef, true);
|
tr.clear(rkeyRangeRef);
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
} else if (i == OP_SETCLEARRANGE){
|
} else if (i == OP_SETCLEARRANGE){
|
||||||
uint8_t *rkeyPtr = mutateString(rkey);
|
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||||
randStr((char*) rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
randStr(rkeyPtr + KEYPREFIXLEN, self->keyBytes-KEYPREFIXLEN);
|
||||||
state std::string scr_start_key;
|
state std::string scr_start_key;
|
||||||
state std::string scr_end_key;
|
state std::string scr_end_key;
|
||||||
for (int range_i = 0; range_i < range; ++range_i){
|
for (int range_i = 0; range_i < range; ++range_i){
|
||||||
sprintf((char*) rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
sprintf(rkeyPtr + self->keyBytes - rangeLen, "%0.*d", rangeLen, range_i);
|
||||||
tr.set(rkey, self->randomValue(), true);
|
tr.set(rkey, self->randomValue());
|
||||||
if (range_i == 0)
|
if (range_i == 0)
|
||||||
scr_start_key = rkey.toString();
|
scr_start_key = rkey.toString();
|
||||||
}
|
}
|
||||||
|
@ -363,7 +362,7 @@ struct MakoWorkload : KVWorkload {
|
||||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||||
++perOpCount[OP_COMMIT];
|
++perOpCount[OP_COMMIT];
|
||||||
tr.reset();
|
tr.reset();
|
||||||
tr.clear(KeyRangeRef(StringRef(scr_start_key), StringRef(scr_end_key)), true);
|
tr.clear(KeyRangeRef(StringRef(scr_start_key), StringRef(scr_end_key)));
|
||||||
doCommit = true;
|
doCommit = true;
|
||||||
}
|
}
|
||||||
++perOpCount[i];
|
++perOpCount[i];
|
||||||
|
@ -400,18 +399,18 @@ struct MakoWorkload : KVWorkload {
|
||||||
|
|
||||||
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
||||||
// clear all data starts with 'mako' in the database
|
// clear all data starts with 'mako' in the database
|
||||||
state std::string startKey("mako");
|
state std::string keyPrefix(KEYPREFIX);
|
||||||
state std::string endKey("mako\xff");
|
// state std::string endKey("mako\xff");
|
||||||
state Transaction tr(cx);
|
state ReadYourWritesTransaction tr(cx);
|
||||||
|
|
||||||
loop{
|
loop{
|
||||||
try {
|
try {
|
||||||
tr.clear(KeyRangeRef(KeyRef(startKey), KeyRef(endKey)));
|
// tr.clear(KeyRangeRef(KeyRef(startKey), KeyRef(endKey)));
|
||||||
|
tr.clear(prefixRange(keyPrefix));
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
break;
|
break;
|
||||||
} catch (Error &e){
|
} catch (Error &e){
|
||||||
wait(tr.onError(e));
|
wait(tr.onError(e));
|
||||||
tr.reset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue