fixed problems with the change feed workload

This commit is contained in:
Evan Tschannen 2021-09-03 15:14:28 -07:00
parent b02e8d99c6
commit 367c7cda92
2 changed files with 22 additions and 7 deletions

View File

@ -40,10 +40,11 @@ ACTOR Future<std::pair<Standalone<VectorRef<KeyValueRef>>, Version>> readDatabas
readVersion = ver;
state PromiseStream<Standalone<RangeResultRef>> results;
state Future<Void> stream = tr.getRangeStream(results, normalKeys, 1e6);
state Future<Void> stream = tr.getRangeStream(results, normalKeys, GetRangeLimits());
loop {
Standalone<RangeResultRef> res = waitNext(results.getFuture());
output.arena().dependsOn(res.arena());
output.append(output.arena(), res.begin(), res.size());
}
} catch (Error& e) {
@ -60,7 +61,6 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> readMutations(Databa
Version begin,
Version end) {
state Standalone<VectorRef<MutationsAndVersionRef>> output;
loop {
try {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results;
@ -82,16 +82,29 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> readMutations(Databa
Standalone<VectorRef<KeyValueRef>> advanceData(Standalone<VectorRef<KeyValueRef>> source,
Standalone<VectorRef<MutationsAndVersionRef>> mutations) {
StringRef dbgKey = LiteralStringRef("");
std::map<KeyRef, ValueRef> data;
for (auto& kv : source) {
if (kv.key == dbgKey)
TraceEvent("ChangeFeedDbgStart").detail("K", kv.key).detail("V", kv.value);
data[kv.key] = kv.value;
}
for (auto& it : mutations) {
for (auto& m : it.mutations) {
if (m.type == MutationRef::SetValue) {
if (m.param1 == dbgKey)
TraceEvent("ChangeFeedDbgSet")
.detail("Ver", it.version)
.detail("K", m.param1)
.detail("V", m.param2);
data[m.param1] = m.param2;
} else {
ASSERT(m.type == MutationRef::ClearRange);
if (KeyRangeRef(m.param1, m.param2).contains(dbgKey))
TraceEvent("ChangeFeedDbgClear")
.detail("Ver", it.version)
.detail("Begin", m.param1)
.detail("End", m.param2);
data.erase(data.lower_bound(m.param1), data.lower_bound(m.param2));
}
}
@ -108,9 +121,8 @@ Standalone<VectorRef<KeyValueRef>> advanceData(Standalone<VectorRef<KeyValueRef>
bool compareData(Standalone<VectorRef<KeyValueRef>> source, Standalone<VectorRef<KeyValueRef>> dest) {
if (source.size() != dest.size()) {
TraceEvent(SevError, "ChangeFeedSizeMismatch").detail("SrcSize", source.size()).detail("DestSize", dest.size());
return false;
}
for (int i = 0; i < source.size(); i++) {
for (int i = 0; i < std::min(source.size(), dest.size()); i++) {
if (source[i] != dest[i]) {
TraceEvent("ChangeFeedMutationMismatch")
.detail("Index", i)
@ -121,7 +133,7 @@ bool compareData(Standalone<VectorRef<KeyValueRef>> source, Standalone<VectorRef
return false;
}
}
return true;
return source.size() == dest.size();
}
struct ChangeFeedsWorkload : TestWorkload {
@ -167,7 +179,7 @@ struct ChangeFeedsWorkload : TestWorkload {
state std::pair<Standalone<VectorRef<KeyValueRef>>, Version> secondResults = wait(readDatabase(cx));
state Standalone<VectorRef<MutationsAndVersionRef>> mutations =
wait(readMutations(cx, rangeID, firstResults.second, secondResults.second));
wait(readMutations(cx, rangeID, firstResults.second, secondResults.second + 1));
Standalone<VectorRef<KeyValueRef>> advancedResults = advanceData(firstResults.first, mutations);
@ -187,6 +199,7 @@ struct ChangeFeedsWorkload : TestWorkload {
.detail("K", advancedResults[i].key)
.detail("V", advancedResults[i].value);
}
ASSERT(false);
}
wait(cx->popChangeFeedMutations(rangeID, secondResults.second));

View File

@ -1,7 +1,9 @@
[[test]]
testTitle = 'ChangeFeed'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
transactionsPerSecond = 250.0
testDuration = 60.0
expectedRate = 0