added a workload to test change feeds

This commit is contained in:
Evan Tschannen 2021-09-03 14:19:27 -07:00
parent 8309c2ce26
commit b02e8d99c6
4 changed files with 154 additions and 18 deletions

View File

@ -6544,7 +6544,7 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> getChangeFeedMutatio
if (!val.present()) {
throw unsupported_operation();
}
KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get()));
state KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get())) & range;
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
keys,
@ -6707,7 +6707,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
results.sendError(unsupported_operation());
return Void();
}
keys = std::get<0>(decodeChangeFeedValue(val.get()));
keys = std::get<0>(decodeChangeFeedValue(val.get())) & range;
break;
} catch (Error& e) {
wait(tr.onError(e));
@ -6773,8 +6773,8 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
if (locations.size() > 1) {
std::vector<std::pair<StorageServerInterface, KeyRange>> interfs;
for (int i = 0; i < locations.size(); i++) {
interfs.push_back(
std::make_pair(locations[i].second->getInterface(chosenLocations[i]), locations[i].first));
interfs.push_back(std::make_pair(locations[i].second->getInterface(chosenLocations[i]),
locations[i].first & range));
}
wait(mergeChangeFeedStream(interfs, results, rangeID, &begin, end) || cx->connectionFileChanged());
} else {

View File

@ -30,8 +30,103 @@
#include "flow/serialize.h"
#include <cstring>
ACTOR Future<std::pair<Standalone<VectorRef<KeyValueRef>>, Version>> readDatabase(Database cx) {
state Transaction tr(cx);
loop {
state Standalone<VectorRef<KeyValueRef>> output;
state Version readVersion;
try {
Version ver = wait(tr.getReadVersion());
readVersion = ver;
state PromiseStream<Standalone<RangeResultRef>> results;
state Future<Void> stream = tr.getRangeStream(results, normalKeys, 1e6);
loop {
Standalone<RangeResultRef> res = waitNext(results.getFuture());
output.append(output.arena(), res.begin(), res.size());
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
return std::make_pair(output, readVersion);
}
wait(tr.onError(e));
}
}
}
ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> readMutations(Database cx,
Key rangeID,
Version begin,
Version end) {
state Standalone<VectorRef<MutationsAndVersionRef>> output;
loop {
try {
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results;
state Future<Void> stream = cx->getChangeFeedStream(results, rangeID, begin, end, normalKeys);
loop {
Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results.getFuture());
output.arena().dependsOn(res.arena());
output.append(output.arena(), res.begin(), res.size());
begin = res.back().version + 1;
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
return output;
}
throw;
}
}
}
Standalone<VectorRef<KeyValueRef>> advanceData(Standalone<VectorRef<KeyValueRef>> source,
Standalone<VectorRef<MutationsAndVersionRef>> mutations) {
std::map<KeyRef, ValueRef> data;
for (auto& kv : source) {
data[kv.key] = kv.value;
}
for (auto& it : mutations) {
for (auto& m : it.mutations) {
if (m.type == MutationRef::SetValue) {
data[m.param1] = m.param2;
} else {
ASSERT(m.type == MutationRef::ClearRange);
data.erase(data.lower_bound(m.param1), data.lower_bound(m.param2));
}
}
}
Standalone<VectorRef<KeyValueRef>> output;
output.arena().dependsOn(source.arena());
output.arena().dependsOn(mutations.arena());
for (auto& kv : data) {
output.push_back(output.arena(), KeyValueRef(kv.first, kv.second));
}
return output;
}
bool compareData(Standalone<VectorRef<KeyValueRef>> source, Standalone<VectorRef<KeyValueRef>> dest) {
if (source.size() != dest.size()) {
TraceEvent(SevError, "ChangeFeedSizeMismatch").detail("SrcSize", source.size()).detail("DestSize", dest.size());
return false;
}
for (int i = 0; i < source.size(); i++) {
if (source[i] != dest[i]) {
TraceEvent("ChangeFeedMutationMismatch")
.detail("Index", i)
.detail("SrcKey", source[i].key)
.detail("DestKey", dest[i].key)
.detail("SrcValue", source[i].value)
.detail("DestValue", dest[i].value);
return false;
}
}
return true;
}
struct ChangeFeedsWorkload : TestWorkload {
double testDuration;
Future<Void> client;
ChangeFeedsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
testDuration = getOption(options, "testDuration"_sr, 10.0);
@ -39,16 +134,19 @@ struct ChangeFeedsWorkload : TestWorkload {
std::string description() const override { return "ChangeFeedsWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return Void(); /*changeFeedClient(cx->clone(), this, testDuration);*/ }
Future<bool> check(Database const& cx) override { return true; }
Future<Void> start(Database const& cx) override {
client = changeFeedClient(cx->clone(), this);
return delay(testDuration);
}
Future<bool> check(Database const& cx) override {
client = Future<Void>();
return true;
}
void getMetrics(vector<PerfMetric>& m) override {}
/*
ACTOR Future<Void> changeFeedClient(Database cx, ChangeFeedsWorkload* self, double duration) {
// Enable change feed for a key range
state UID rangeUID = deterministicRandom()->randomUniqueID();
state Key rangeID = StringRef(rangeUID.toString());
state Key logPath = LiteralStringRef("\xff\x02/cftest/");
ACTOR Future<Void> changeFeedClient(Database cx, ChangeFeedsWorkload* self) {
// Enable change feeds for a key range
state Key rangeID = StringRef(deterministicRandom()->randomUniqueID().toString());
state Transaction tr(cx);
loop {
try {
@ -60,13 +158,40 @@ struct ChangeFeedsWorkload : TestWorkload {
}
}
// Periodically read from both and compare results (merging clears)
state PromiseStream<Standalone<VectorRef<MutationsAndVersionRef>>> results;
cx->getChangeFeedStream(results, rangeID);
loop {
wait(delay(deterministicRandom()->random01()));
// Pop from both
return Void();
}*/
state std::pair<Standalone<VectorRef<KeyValueRef>>, Version> firstResults = wait(readDatabase(cx));
wait(delay(10 * deterministicRandom()->random01()));
state std::pair<Standalone<VectorRef<KeyValueRef>>, Version> secondResults = wait(readDatabase(cx));
state Standalone<VectorRef<MutationsAndVersionRef>> mutations =
wait(readMutations(cx, rangeID, firstResults.second, secondResults.second));
Standalone<VectorRef<KeyValueRef>> advancedResults = advanceData(firstResults.first, mutations);
if (!compareData(secondResults.first, advancedResults)) {
TraceEvent(SevError, "ChangeFeedMismatch")
.detail("FirstVersion", firstResults.second)
.detail("SecondVersion", secondResults.second);
for (int i = 0; i < secondResults.first.size(); i++) {
TraceEvent("ChangeFeedBase")
.detail("Index", i)
.detail("K", secondResults.first[i].key)
.detail("V", secondResults.first[i].value);
}
for (int i = 0; i < advancedResults.size(); i++) {
TraceEvent("ChangeFeedAdvanced")
.detail("Index", i)
.detail("K", advancedResults[i].key)
.detail("V", advancedResults[i].value);
}
}
wait(cx->popChangeFeedMutations(rangeID, secondResults.second));
}
}
};
WorkloadFactory<ChangeFeedsWorkload> ChangeFeedsWorkloadFactory("ChangeFeeds");

View File

@ -125,6 +125,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml)
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
add_fdb_test(TEST_FILES fast/CycleTest.toml)
add_fdb_test(TEST_FILES fast/ChangeFeeds.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)

View File

@ -0,0 +1,10 @@
[[test]]
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 60.0
expectedRate = 0
[[test.workload]]
testName = 'ChangeFeeds'
testDuration = 60.0