VersionStamp can co-exist with other workloads that write data to the database.
VersionStamp previously would range-read the entire database during validation. This has the unfortunate effect of making it fail during validation if run with any other workload that writes keys to the database. Now, all keys written and read are done with a configurable prefix, so that it may co-exist with a variety of other workloads.
This commit is contained in:
parent
370a6afb80
commit
23945b9fea
|
@ -34,6 +34,8 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
int64_t nodePrefix;
|
int64_t nodePrefix;
|
||||||
int keyBytes;
|
int keyBytes;
|
||||||
bool failIfDataLost;
|
bool failIfDataLost;
|
||||||
|
Key vsKeyPrefix;
|
||||||
|
Key vsValuePrefix;
|
||||||
std::map<Key, std::vector<std::pair<Version, Standalone<StringRef>>>> key_commit;
|
std::map<Key, std::vector<std::pair<Version, Standalone<StringRef>>>> key_commit;
|
||||||
std::map<Key, std::vector<std::pair<Version, Standalone<StringRef>>>> versionStampKey_commit;
|
std::map<Key, std::vector<std::pair<Version, Standalone<StringRef>>>> versionStampKey_commit;
|
||||||
|
|
||||||
|
@ -45,6 +47,9 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
nodeCount = getOption(options, LiteralStringRef("nodeCount"), (uint64_t)10000);
|
nodeCount = getOption(options, LiteralStringRef("nodeCount"), (uint64_t)10000);
|
||||||
keyBytes = std::max(getOption(options, LiteralStringRef("keyBytes"), 16), 4);
|
keyBytes = std::max(getOption(options, LiteralStringRef("keyBytes"), 16), 4);
|
||||||
failIfDataLost = getOption(options, LiteralStringRef("failIfDataLost"), true);
|
failIfDataLost = getOption(options, LiteralStringRef("failIfDataLost"), true);
|
||||||
|
const Key prefix = getOption(options, LiteralStringRef("prefix"), LiteralStringRef("VS_"));
|
||||||
|
vsKeyPrefix = LiteralStringRef("K_").withPrefix(prefix);
|
||||||
|
vsValuePrefix = LiteralStringRef("V_").withPrefix(prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual std::string description() { return "VersionStamp"; }
|
virtual std::string description() { return "VersionStamp"; }
|
||||||
|
@ -65,7 +70,7 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
double d = double(index) / nodeCount;
|
double d = double(index) / nodeCount;
|
||||||
emplaceIndex(data, 0, *(int64_t*)&d);
|
emplaceIndex(data, 0, *(int64_t*)&d);
|
||||||
|
|
||||||
return result;
|
return result.withPrefix(vsValuePrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
Key versionStampKeyForIndex(uint64_t index) {
|
Key versionStampKeyForIndex(uint64_t index) {
|
||||||
|
@ -76,12 +81,12 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
double d = double(index) / nodeCount;
|
double d = double(index) / nodeCount;
|
||||||
emplaceIndex(data, 4, *(int64_t*)&d);
|
emplaceIndex(data, 4, *(int64_t*)&d);
|
||||||
|
|
||||||
data[38 - 2] = 24;
|
data[38 - 2] = 24 + vsKeyPrefix.size();
|
||||||
data[38 - 1] = 0;
|
data[38 - 1] = 0;
|
||||||
return result;
|
return result.withPrefix(vsKeyPrefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
Key endOfRange(Key startOfRange) {
|
static Key endOfRange(Key startOfRange) {
|
||||||
int n = startOfRange.size();
|
int n = startOfRange.size();
|
||||||
Key result = makeString(n);
|
Key result = makeString(n);
|
||||||
uint8_t* data = mutateString(result);
|
uint8_t* data = mutateString(result);
|
||||||
|
@ -102,13 +107,14 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
state Version readVersion = wait(tr.getReadVersion());
|
state Version readVersion = wait(tr.getReadVersion());
|
||||||
loop{
|
loop{
|
||||||
try {
|
try {
|
||||||
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(LiteralStringRef(""), LiteralStringRef("V")), self->nodeCount + 1));
|
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(self->vsValuePrefix, endOfRange(self->vsValuePrefix)), self->nodeCount + 1));
|
||||||
ASSERT(result.size() <= self->nodeCount);
|
ASSERT(result.size() <= self->nodeCount);
|
||||||
TraceEvent("VST_check0").detail("size", result.size()).detail("nodeCount", self->nodeCount).detail("key_commit", self->key_commit.size()).detail("readVersion", readVersion);
|
TraceEvent("VST_check0").detail("size", result.size()).detail("nodeCount", self->nodeCount).detail("key_commit", self->key_commit.size()).detail("readVersion", readVersion);
|
||||||
if (self->failIfDataLost) {
|
if (self->failIfDataLost) {
|
||||||
ASSERT(result.size() == self->key_commit.size());
|
ASSERT(result.size() == self->key_commit.size());
|
||||||
}
|
}
|
||||||
for (auto it : result) {
|
for (auto it : result) {
|
||||||
|
const Standalone<StringRef> key = it.key.removePrefix(self->vsValuePrefix);
|
||||||
Version parsedVersion = 0;
|
Version parsedVersion = 0;
|
||||||
Standalone<StringRef> parsedVersionstamp = makeString(10);
|
Standalone<StringRef> parsedVersionstamp = makeString(10);
|
||||||
memcpy(&parsedVersion, it.value.begin(), sizeof(Version));
|
memcpy(&parsedVersion, it.value.begin(), sizeof(Version));
|
||||||
|
@ -116,7 +122,7 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
parsedVersion = bigEndian64(parsedVersion);
|
parsedVersion = bigEndian64(parsedVersion);
|
||||||
|
|
||||||
TraceEvent("VST_check0a").detail("itKey", printable(it.key)).detail("itValue", printable(it.value)).detail("parsedVersion", parsedVersion);
|
TraceEvent("VST_check0a").detail("itKey", printable(it.key)).detail("itValue", printable(it.value)).detail("parsedVersion", parsedVersion);
|
||||||
const auto& all_values_iter = self->key_commit.find(it.key);
|
const auto& all_values_iter = self->key_commit.find(key);
|
||||||
ASSERT(all_values_iter != self->key_commit.end()); // Reading a key that we didn't commit.
|
ASSERT(all_values_iter != self->key_commit.end()); // Reading a key that we didn't commit.
|
||||||
const auto& all_values = all_values_iter->second;
|
const auto& all_values = all_values_iter->second;
|
||||||
|
|
||||||
|
@ -136,18 +142,19 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
ASSERT(commitVersionstamp.compare(parsedVersionstamp) == 0);
|
ASSERT(commitVersionstamp.compare(parsedVersionstamp) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(LiteralStringRef("V"), LiteralStringRef("\xFF")), self->nodeCount + 1));
|
Standalone<RangeResultRef> result = wait(tr.getRange(KeyRangeRef(self->vsKeyPrefix, endOfRange(self->vsKeyPrefix)), self->nodeCount + 1));
|
||||||
ASSERT(result.size() <= self->nodeCount);
|
ASSERT(result.size() <= self->nodeCount);
|
||||||
|
|
||||||
TraceEvent("VST_check1").detail("size", result.size()).detail("vsKey_commit_size", self->versionStampKey_commit.size());
|
TraceEvent("VST_check1").detail("size", result.size()).detail("vsKey_commit_size", self->versionStampKey_commit.size());
|
||||||
for (auto it : result) {
|
for (auto it : result) {
|
||||||
|
const Standalone<StringRef> key = it.key.removePrefix(self->vsKeyPrefix);
|
||||||
Version parsedVersion = 0;
|
Version parsedVersion = 0;
|
||||||
Standalone<StringRef> parsedVersionstamp = makeString(10);
|
Standalone<StringRef> parsedVersionstamp = makeString(10);
|
||||||
memcpy(&parsedVersion, &(it.key.begin())[24], sizeof(Version));
|
memcpy(&parsedVersion, &(key.begin())[24], sizeof(Version));
|
||||||
memcpy(mutateString(parsedVersionstamp), &(it.key.begin())[24], 10);
|
memcpy(mutateString(parsedVersionstamp), &(key.begin())[24], 10);
|
||||||
parsedVersion = bigEndian64(parsedVersion);
|
parsedVersion = bigEndian64(parsedVersion);
|
||||||
|
|
||||||
const Key vsKey = it.key.substr(4, 16);
|
const Key vsKey = key.substr(4, 16);
|
||||||
TraceEvent("VST_check1a").detail("itKey", printable(it.key)).detail("vsKey", printable(vsKey)).detail("itValue", printable(it.value)).detail("parsedVersion", parsedVersion);
|
TraceEvent("VST_check1a").detail("itKey", printable(it.key)).detail("vsKey", printable(vsKey)).detail("itValue", printable(it.value)).detail("parsedVersion", parsedVersion);
|
||||||
const auto& all_values_iter = self->versionStampKey_commit.find(vsKey);
|
const auto& all_values_iter = self->versionStampKey_commit.find(vsKey);
|
||||||
ASSERT(all_values_iter != self->versionStampKey_commit.end()); // Reading a key that we didn't commit.
|
ASSERT(all_values_iter != self->versionStampKey_commit.end()); // Reading a key that we didn't commit.
|
||||||
|
@ -209,9 +216,9 @@ struct VersionStampWorkload : TestWorkload {
|
||||||
Void _ = wait(tr.commit());
|
Void _ = wait(tr.commit());
|
||||||
Standalone<StringRef> trVs = wait(fTrVs);
|
Standalone<StringRef> trVs = wait(fTrVs);
|
||||||
std::pair<Version, Standalone<StringRef>> committedVersionPair = std::make_pair(tr.getCommittedVersion(), trVs);
|
std::pair<Version, Standalone<StringRef>> committedVersionPair = std::make_pair(tr.getCommittedVersion(), trVs);
|
||||||
Standalone<StringRef> vsKeyKey = versionStampKey.substr(4, 16);
|
Standalone<StringRef> vsKeyKey = versionStampKey.removePrefix(self->vsKeyPrefix).substr(4, 16);
|
||||||
TraceEvent("VST_commit_success").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).detail("vsKeyKey", printable(vsKeyKey)).detail("clear", printable(range)).detail("version", tr.getCommittedVersion()).detail("vsValue", printable(committedVersionPair.second));
|
TraceEvent("VST_commit_success").detail("key", printable(key)).detail("vsKey", printable(versionStampKey)).detail("vsKeyKey", printable(vsKeyKey)).detail("clear", printable(range)).detail("version", tr.getCommittedVersion()).detail("vsValue", printable(committedVersionPair.second));
|
||||||
self->key_commit[key].push_back(committedVersionPair);
|
self->key_commit[key.removePrefix(self->vsValuePrefix)].push_back(committedVersionPair);
|
||||||
self->versionStampKey_commit[vsKeyKey].push_back(committedVersionPair);
|
self->versionStampKey_commit[vsKeyKey].push_back(committedVersionPair);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue