From 2da7732637ae67d774678b96937f13c5f278f3c0 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Wed, 15 Sep 2021 10:55:39 -0700 Subject: [PATCH] added a new workload to test streaming reads --- fdbserver/CMakeLists.txt | 1 + .../workloads/StreamingRangeRead.actor.cpp | 175 ++++++++++++++++++ tests/CMakeLists.txt | 1 + tests/fast/StreamRangeRead.toml | 29 +++ 4 files changed, 206 insertions(+) create mode 100644 fdbserver/workloads/StreamingRangeRead.actor.cpp create mode 100644 tests/fast/StreamRangeRead.toml diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 1590f3c199..4f1a82bc77 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -231,6 +231,7 @@ set(FDBSERVER_SRCS workloads/SlowTaskWorkload.actor.cpp workloads/SnapTest.actor.cpp workloads/SpecialKeySpaceCorrectness.actor.cpp + workloads/StreamingRangeRead.actor.cpp workloads/StatusWorkload.actor.cpp workloads/Storefront.actor.cpp workloads/StreamingRead.actor.cpp diff --git a/fdbserver/workloads/StreamingRangeRead.actor.cpp b/fdbserver/workloads/StreamingRangeRead.actor.cpp new file mode 100644 index 0000000000..0f7e4ffa2e --- /dev/null +++ b/fdbserver/workloads/StreamingRangeRead.actor.cpp @@ -0,0 +1,175 @@ +/* + * StreamingRangeRead.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/FDBOptions.g.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "flow/Arena.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/Trace.h" +#include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/serialize.h" +#include + +ACTOR Future streamUsingGetRange(PromiseStream results, Transaction* tr, KeyRange keys) { + state KeySelectorRef begin = firstGreaterOrEqual(keys.begin); + state KeySelectorRef end = firstGreaterOrEqual(keys.end); + + try { + loop { + GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, 1e6); + limits.minRows = 0; + state RangeResult rep = wait(tr->getRange(begin, end, limits, Snapshot::True)); + if (!rep.more) { + rep.readThrough = keys.end; + } + results.send(rep); + + if (!rep.more) { + results.sendError(end_of_stream()); + return Void(); + } + + if (rep.readThrough.present()) { + begin = firstGreaterOrEqual(rep.readThrough.get()); + } else { + begin = firstGreaterThan(rep.end()[-1].key); + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + results.sendError(e); + throw; + } +} + +ACTOR Future convertStream(PromiseStream input, PromiseStream output) { + try { + loop { + RangeResult res = waitNext(input.getFuture()); + for (auto& kv : res) { + output.send(kv); + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + output.sendError(e); + } + return Void(); +} + +struct StreamingRangeReadWorkload : KVWorkload { + double testDuration; + std::string valueString; + Future client; + + StreamingRangeReadWorkload(WorkloadContext const& wcx) : KVWorkload(wcx) { + testDuration = getOption(options, "testDuration"_sr, 60.0); + valueString = std::string(maxValueBytes, '.'); + } + + Value randomValue() { + return StringRef((uint8_t*)valueString.c_str(), + deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1)); + } + + Standalone operator()(uint64_t n) { return KeyValueRef(keyForIndex(n, false), randomValue()); } + + std::string description() const override { return "StreamingRangeReadWorkload"; } + Future setup(Database const& cx) override { return bulkSetup(cx, this, nodeCount, Promise()); } + Future start(Database const& cx) override { + client = timeout(streamingClient(cx->clone(), this), testDuration, Void()); + return delay(testDuration); + } + + Future check(Database const& cx) override { + client = Void(); + return true; + } + + void getMetrics(vector& m) override {} + + ACTOR Future streamingClient(Database cx, StreamingRangeReadWorkload* self) { + state Transaction tr(cx); + state Key next; + state Future rateLimit = delay(0.01); + loop { + state PromiseStream streamRaw; + state PromiseStream compareRaw; + state PromiseStream streamResults; + state PromiseStream compareResults; + + try { + state Future compareConvert = convertStream(compareRaw, compareResults); + state Future streamConvert = convertStream(streamRaw, streamResults); + state Future compare = streamUsingGetRange(compareRaw, &tr, KeyRangeRef(next, normalKeys.end)); + state Future stream = tr.getRangeStream(streamRaw, + KeySelector(firstGreaterOrEqual(next), next.arena()), + KeySelector(firstGreaterOrEqual(normalKeys.end)), + GetRangeLimits()); + loop { + state Optional cmp; + state Optional res; + try { + KeyValue _cmp = waitNext(streamResults.getFuture()); + cmp = _cmp; + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw; + } + cmp = Optional(); + } + try { + KeyValue _res = waitNext(compareResults.getFuture()); + res = _res; + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw; + } + res = Optional(); + } + if (cmp != res) { + TraceEvent(SevError, "RangeStreamMismatch"); + ASSERT(false); + } + if (cmp.present()) { + next = keyAfter(cmp.get().key); + } else { + next = Key(); + break; + } + } + } catch (Error& e) { + wait(tr.onError(e)); + } + wait(rateLimit); + rateLimit = delay(0.01); + } + } +}; + +WorkloadFactory StreamingRangeReadWorkloadFactory("StreamingRangeRead"); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ff59b9e1d5..0bb917f32a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -156,6 +156,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/SidebandWithStatus.toml) add_fdb_test(TEST_FILES fast/SimpleAtomicAdd.toml) add_fdb_test(TEST_FILES fast/SpecialKeySpaceCorrectness.toml) + add_fdb_test(TEST_FILES fast/StreamingRangeRead.toml) add_fdb_test(TEST_FILES fast/SwizzledRollbackSideband.toml) add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.toml) add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.toml) diff --git a/tests/fast/StreamRangeRead.toml b/tests/fast/StreamRangeRead.toml new file mode 100644 index 0000000000..630ba8c97d --- /dev/null +++ b/tests/fast/StreamRangeRead.toml @@ -0,0 +1,29 @@ +[[test]] +testTitle = 'StreamingRangeReadTest' + + [[test.workload]] + testName = 'StreamingRangeRead' + testDuration = 60.0 + + [[test.workload]] + testName = 'RandomClogging' + testDuration = 60.0 + + [[test.workload]] + testName = 'Rollback' + meanDelay = 60.0 + testDuration = 60.0 + + [[test.workload]] + testName = 'Attrition' + machinesToKill = 10 + machinesToLeave = 3 + reboot = true + testDuration = 60.0 + + [[test.workload]] + testName = 'Attrition' + machinesToKill = 10 + machinesToLeave = 3 + reboot = true + testDuration = 60.0