Merge pull request #2345 from zjuLcg/add-consistency-verification-in-mako-workload
Add consistency verification in mako workload
This commit is contained in:
commit
6945a6ea01
|
@ -3,8 +3,10 @@
|
|||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
#include "fdbclient/zipf.h"
|
||||
#include "fdbrpc/crc32c.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
|
||||
|
||||
enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP};
|
||||
|
@ -12,9 +14,9 @@ enum {OP_COUNT, OP_RANGE};
|
|||
constexpr int MAXKEYVALUESIZE = 1000;
|
||||
constexpr int RANGELIMIT = 10000;
|
||||
struct MakoWorkload : TestWorkload {
|
||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes;
|
||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes, csSize, csCount, csPartitionSize, csStepSizeInPartition;
|
||||
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval, zipfConstant;
|
||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf;
|
||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData, zipf, checksumVerification, doChecksumVerificationOnly;
|
||||
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
||||
std::vector<PerfIntCounter> opCounters;
|
||||
std::vector<uint64_t> insertionCountsToMeasure;
|
||||
|
@ -26,9 +28,11 @@ struct MakoWorkload : TestWorkload {
|
|||
std::vector<PerfMetric> periodicMetrics;
|
||||
// store latency of each operation with sampling
|
||||
std::vector<ContinuousSample<double>> opLatencies;
|
||||
// prefix of keys populated, e.g. 'mako00000xxxxxxx'
|
||||
const std::string KEYPREFIX = "mako";
|
||||
const int KEYPREFIXLEN = KEYPREFIX.size();
|
||||
// key used to store checkSum for given key range
|
||||
std::vector<Key> csKeys;
|
||||
// key prefix of for all generated keys
|
||||
std::string keyPrefix;
|
||||
int KEYPREFIXLEN;
|
||||
const std::array<std::string, MAX_OP> opNames = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
||||
MakoWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx),
|
||||
|
@ -58,6 +62,9 @@ struct MakoWorkload : TestWorkload {
|
|||
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
||||
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
||||
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
||||
// All the generated keys will start with the specified prefix
|
||||
keyPrefix = getOption( options, LiteralStringRef("keyPrefix"), LiteralStringRef("mako")).toString();
|
||||
KEYPREFIXLEN = keyPrefix.size();
|
||||
// If true, the workload will picking up keys which are zipfian distributed
|
||||
zipf = getOption(options, LiteralStringRef("zipf"), false);
|
||||
zipfConstant = getOption(options, LiteralStringRef("zipfConstant"), 0.99);
|
||||
|
@ -105,6 +112,22 @@ struct MakoWorkload : TestWorkload {
|
|||
if (zipf){
|
||||
zipfian_generator3(0, (int)rowCount-1, zipfConstant);
|
||||
}
|
||||
// Added for checksum verification
|
||||
csSize = getOption(options, LiteralStringRef("csSize"), rowCount / 100);
|
||||
ASSERT(csSize <= rowCount);
|
||||
csCount = getOption(options, LiteralStringRef("csCount"), 0);
|
||||
checksumVerification = (csCount != 0);
|
||||
doChecksumVerificationOnly = getOption(options, LiteralStringRef("doChecksumVerificationOnly"), false);
|
||||
if (doChecksumVerificationOnly)
|
||||
ASSERT(checksumVerification); // csCount should be non-zero when you do checksum verification only
|
||||
if (csCount) {
|
||||
csPartitionSize = rowCount / csSize;
|
||||
ASSERT(csCount <= csPartitionSize);
|
||||
csStepSizeInPartition = csPartitionSize / csCount;
|
||||
for (int i= 0; i < csCount; ++i) {
|
||||
csKeys.emplace_back(format((keyPrefix + "_crc32c_%u_%u").c_str(), i, rowCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string description() override {
|
||||
|
@ -114,22 +137,27 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
// use all the clients to populate data
|
||||
if (populateData)
|
||||
return _setup(cx, this);
|
||||
return Void();
|
||||
if (doChecksumVerificationOnly)
|
||||
return Void();
|
||||
return _setup(cx, this);
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (doChecksumVerificationOnly)
|
||||
return Void();
|
||||
return _start(cx, this);
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override {
|
||||
return true;
|
||||
if (!checksumVerification){
|
||||
return true;
|
||||
}
|
||||
// verify checksum consistency
|
||||
return dochecksumVerification(cx, this);
|
||||
}
|
||||
|
||||
// disable the default timeout setting
|
||||
double getCheckTimeout() {return std::numeric_limits<double>::max();}
|
||||
double getCheckTimeout() override {return std::numeric_limits<double>::max();}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
// metrics of population process
|
||||
|
@ -192,7 +220,7 @@ struct MakoWorkload : TestWorkload {
|
|||
Key keyForIndex(uint64_t ind) {
|
||||
Key result = makeString(keyBytes);
|
||||
char* data = reinterpret_cast<char*>(mutateString(result));
|
||||
format((KEYPREFIX + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
||||
format((keyPrefix + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
||||
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
||||
data[i] = 'x';
|
||||
return result;
|
||||
|
@ -207,6 +235,56 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
return digits;
|
||||
}
|
||||
|
||||
static void updateCSFlags(MakoWorkload* self, std::vector<bool>& flags, uint64_t startIdx, uint64_t endIdx){
|
||||
// We deal with cases where rowCount % csCount != 0 and csPartitionSize % csSize != 0;
|
||||
// In particular, all keys with index in range [csSize * csPartitionSize, rowCount) will not be used for checksum
|
||||
// By the same way, for any i in range [0, csSize):
|
||||
// keys with index in range [ i*csPartitionSize, i*csPartitionSize + csCount*csStepSizeInPartition) will not be used for checksum
|
||||
uint64_t boundary = self->csSize * self->csPartitionSize;
|
||||
if (startIdx >= boundary)
|
||||
return;
|
||||
else if (endIdx > boundary)
|
||||
endIdx = boundary;
|
||||
|
||||
// If all checksums need to be updated, just return
|
||||
if (std::all_of(flags.begin(), flags.end(), [](bool flag){return flag;}))
|
||||
return;
|
||||
|
||||
if (startIdx + 1 == endIdx){
|
||||
// single key case
|
||||
startIdx = startIdx % self->csPartitionSize;
|
||||
if ((startIdx < self->csCount * self->csStepSizeInPartition) && (startIdx % self->csStepSizeInPartition == 0)){
|
||||
flags.at(startIdx / self->csStepSizeInPartition) = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// key range case
|
||||
uint64_t count = self->csCount;
|
||||
uint64_t base = (startIdx / self->csPartitionSize) * self->csPartitionSize;
|
||||
startIdx -= base;
|
||||
endIdx -= base;
|
||||
uint64_t startStepIdx = std::min(startIdx / self->csStepSizeInPartition, self->csCount - 1);
|
||||
|
||||
// if changed range size is more than one csPartitionSize, which means every checksum needs to be updated
|
||||
if ((endIdx - startIdx) < self->csPartitionSize){
|
||||
uint64_t endStepIdx;
|
||||
if (endIdx > self->csPartitionSize){
|
||||
endStepIdx = self->csCount + std::min((endIdx - 1 - self->csPartitionSize) / self->csStepSizeInPartition, self->csCount);
|
||||
} else {
|
||||
endStepIdx = std::min((endIdx - 1) / self->csStepSizeInPartition, self->csCount - 1);
|
||||
}
|
||||
// All the left boundary of csStep should be updated
|
||||
// Also, check the startIdx whether it is the left boundary of a csStep
|
||||
if (startIdx == self->csStepSizeInPartition * startStepIdx)
|
||||
flags[startStepIdx] = true;
|
||||
count = endStepIdx - startStepIdx;
|
||||
}
|
||||
for (int i = 1; i <= count; ++i){
|
||||
flags[ (startStepIdx+i) % self->csCount] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
Standalone<KeyValueRef> operator()(uint64_t n) {
|
||||
return KeyValueRef(keyForIndex(n), randomValue());
|
||||
}
|
||||
|
@ -232,17 +310,23 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
|
||||
// use all the clients to populate data
|
||||
if (self->populateData) {
|
||||
state Promise<double> loadTime;
|
||||
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||
|
||||
state Promise<double> loadTime;
|
||||
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
||||
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
||||
|
||||
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
||||
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
||||
|
||||
// This is the setup time
|
||||
self->loadTime = loadTime.getFuture().get();
|
||||
// This is the rates of importing keys
|
||||
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
||||
// This is the setup time
|
||||
self->loadTime = loadTime.getFuture().get();
|
||||
// This is the rates of importing keys
|
||||
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
||||
}
|
||||
// Use one client to initialize checksums
|
||||
if (self->checksumVerification && self->clientId == 0){
|
||||
wait(generateChecksum(cx, self));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -279,10 +363,12 @@ struct MakoWorkload : TestWorkload {
|
|||
state bool doCommit;
|
||||
state int i, count;
|
||||
state uint64_t range, indBegin, indEnd, rangeLen;
|
||||
state double lastTime = now();
|
||||
state double commitStart;
|
||||
state KeyRangeRef rkeyRangeRef;
|
||||
state std::vector<int> perOpCount(MAX_OP, 0);
|
||||
// flag at index-i indicates whether checksum-i need to be updated
|
||||
state std::vector<bool> csChangedFlags(self->csCount, false);
|
||||
state double lastTime = now();
|
||||
state double commitStart;
|
||||
|
||||
TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
||||
|
||||
|
@ -307,6 +393,16 @@ struct MakoWorkload : TestWorkload {
|
|||
// KeyRangeRef(min, maxPlusOne)
|
||||
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
|
||||
|
||||
// used for mako-level consistency check
|
||||
if (self->checksumVerification){
|
||||
if (i == OP_INSERT | i == OP_UPDATE | i == OP_CLEAR) {
|
||||
updateCSFlags(self, csChangedFlags, indBegin, indBegin + 1);
|
||||
}
|
||||
else if (i == OP_CLEARRANGE) {
|
||||
updateCSFlags(self, csChangedFlags, indBegin, indEnd);
|
||||
}
|
||||
}
|
||||
|
||||
if (i == OP_GETREADVERSION){
|
||||
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
||||
}
|
||||
|
@ -343,6 +439,7 @@ struct MakoWorkload : TestWorkload {
|
|||
} else if(i == OP_SETCLEAR){
|
||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
tr.set(rkey, rval);
|
||||
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
|
||||
// commit the change and update metrics
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
|
@ -359,6 +456,7 @@ struct MakoWorkload : TestWorkload {
|
|||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
state std::string scr_start_key;
|
||||
state std::string scr_end_key;
|
||||
state KeyRangeRef scr_key_range_ref;
|
||||
for (int range_i = 0; range_i < range; ++range_i){
|
||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||
tr.set(rkey, self->randomValue());
|
||||
|
@ -366,12 +464,14 @@ struct MakoWorkload : TestWorkload {
|
|||
scr_start_key = rkey.toString();
|
||||
}
|
||||
scr_end_key = rkey.toString();
|
||||
scr_key_range_ref = KeyRangeRef(KeyRef(scr_start_key), KeyRef(scr_end_key));
|
||||
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
tr.reset();
|
||||
tr.clear(KeyRangeRef(StringRef(scr_start_key), StringRef(scr_end_key)));
|
||||
tr.clear(scr_key_range_ref);
|
||||
doCommit = true;
|
||||
}
|
||||
++perOpCount[i];
|
||||
|
@ -379,6 +479,7 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
if (doCommit) {
|
||||
wait(self->updateCSBeforeCommit(&tr, self, &csChangedFlags));
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
|
@ -408,13 +509,14 @@ struct MakoWorkload : TestWorkload {
|
|||
|
||||
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
||||
// clear all data starts with 'mako' in the database
|
||||
state std::string keyPrefix(self->KEYPREFIX);
|
||||
state std::string keyPrefix(self->keyPrefix);
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
loop{
|
||||
try {
|
||||
tr.clear(prefixRange(keyPrefix));
|
||||
wait(tr.commit());
|
||||
TraceEvent("CleanUpMakoRelatedData").detail("KeyPrefix", self->keyPrefix);
|
||||
break;
|
||||
} catch (Error &e){
|
||||
TraceEvent("FailedToCleanData").error(e);
|
||||
|
@ -531,7 +633,7 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
/* set range */
|
||||
if (num > RANGELIMIT)
|
||||
TraceEvent(SevError, "RangeExceedLimit").detail("RangeLimit", RANGELIMIT).detail("Range", num);
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "RangeExceedLimit").detail("RangeLimit", RANGELIMIT).detail("Range", num);
|
||||
operations[op][OP_RANGE] = num;
|
||||
}
|
||||
}
|
||||
|
@ -539,9 +641,103 @@ struct MakoWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
if (error) {
|
||||
TraceEvent(SevError, "InvalidTransactionSpecification").detail("operations", operationsSpec);
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidTransactionSpecification").detail("operations", operationsSpec);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<uint32_t> calcCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIndex){
|
||||
state uint32_t result = 0;
|
||||
state int i;
|
||||
state Key csKey;
|
||||
for( i = 0; i < self->csSize; ++i){
|
||||
int idx = csIndex * self->csStepSizeInPartition + i * self->csPartitionSize;
|
||||
csKey = self->keyForIndex(idx);
|
||||
Optional<Value> temp = wait(tr->get(csKey));
|
||||
if (temp.present()){
|
||||
Value val = temp.get();
|
||||
result = crc32c_append(result, val.begin(), val.size());
|
||||
} else {
|
||||
// If the key does not exists, we just use the key itself not the value to calculate checkSum
|
||||
result = crc32c_append(result, csKey.begin(), csKey.size());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR static Future<bool> dochecksumVerification(Database cx, MakoWorkload* self) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state int csIdx;
|
||||
state Value csValue;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
|
||||
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
|
||||
if (!temp.present()){
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "NoExistingChecksum").detail("missedChecksumIndex", csIdx);
|
||||
return false;
|
||||
} else {
|
||||
csValue = temp.get();
|
||||
ASSERT(csValue.size() == sizeof(uint32_t));
|
||||
uint32_t calculatedCS = wait(calcCheckSum(&tr, self, csIdx));
|
||||
uint32_t existingCS = *(reinterpret_cast<const uint32_t*>(csValue.begin()));
|
||||
if (existingCS != calculatedCS){
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "ChecksumVerificationFailure").detail("ChecksumIndex", csIdx).detail("ExistingChecksum", existingCS).detail("CurrentChecksum", calculatedCS);
|
||||
return false;
|
||||
}
|
||||
TraceEvent("ChecksumVerificationPass").detail("ChecksumIndex", csIdx).detail("ChecksumValue", existingCS);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} catch(Error& e) {
|
||||
TraceEvent("FailedToCalculateChecksum").detail("ChecksumIndex", csIdx).error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> generateChecksum(Database cx, MakoWorkload* self) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state int csIdx;
|
||||
loop {
|
||||
try {
|
||||
for (csIdx = 0; csIdx < self->csCount; ++csIdx) {
|
||||
Optional<Value> temp = wait(tr.get(self->csKeys[csIdx]));
|
||||
if (temp.present())
|
||||
TraceEvent("DuplicatePopulationOnSamePrefix").detail("KeyPrefix", self->keyPrefix);
|
||||
wait(self->updateCheckSum(&tr, self, csIdx));
|
||||
}
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
TraceEvent("FailedToGenerateChecksumForPopulatedData").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateCheckSum(ReadYourWritesTransaction* tr, MakoWorkload* self, int csIdx){
|
||||
state uint32_t csVal = wait(calcCheckSum(tr, self, csIdx));
|
||||
TraceEvent("UpdateCheckSum").detail("ChecksumIndex", csIdx).detail("Checksum", csVal);
|
||||
tr->set(self->csKeys[csIdx], ValueRef(reinterpret_cast<const uint8_t*>(&csVal), sizeof(uint32_t)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateCSBeforeCommit(ReadYourWritesTransaction* tr, MakoWorkload* self, std::vector<bool>* flags){
|
||||
if (!self->checksumVerification)
|
||||
return Void();
|
||||
|
||||
state int csIdx;
|
||||
for (csIdx = 0; csIdx < self->csCount; ++csIdx){
|
||||
if ((*flags)[csIdx]){
|
||||
wait(updateCheckSum(tr, self, csIdx));
|
||||
(*flags)[csIdx] = false;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<MakoWorkload> MakoloadFactory("Mako");
|
||||
|
|
Loading…
Reference in New Issue