From 53b4d8a307b4154427f2260618fa12707f948ae5 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Tue, 22 Feb 2022 14:29:51 +0100 Subject: [PATCH] Added ClientWorkload implementation --- fdbserver/CMakeLists.txt | 1 + fdbserver/tester.actor.cpp | 12 +- fdbserver/workloads/ClientWorkload.actor.cpp | 120 +++++++++++++++++++ fdbserver/workloads/workloads.actor.h | 28 ++++- 4 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 fdbserver/workloads/ClientWorkload.actor.cpp diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 4dea381051..4118f831ee 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -172,6 +172,7 @@ set(FDBSERVER_SRCS workloads/ClearSingleRange.actor.cpp workloads/ClientLibManagementWorkload.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp + workloads/ClientWorkload.actor.cpp workloads/TriggerRecovery.actor.cpp workloads/SuspendProcesses.actor.cpp workloads/CommitBugCheck.actor.cpp diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 1581732625..8eb5724229 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -342,12 +342,14 @@ struct CompoundWorkload : TestWorkload { }; Reference getWorkloadIface(WorkloadRequest work, + Reference ccr, VectorRef options, Reference const> dbInfo) { Value testName = getOption(options, LiteralStringRef("testName"), LiteralStringRef("no-test-specified")); WorkloadContext wcx; wcx.clientId = work.clientId; wcx.clientCount = work.clientCount; + wcx.ccr = ccr; wcx.dbInfo = dbInfo; wcx.options = options; wcx.sharedRandomNumber = work.sharedRandomNumber; @@ -377,14 +379,16 @@ Reference getWorkloadIface(WorkloadRequest work, return workload; } -Reference getWorkloadIface(WorkloadRequest work, Reference const> dbInfo) { +Reference getWorkloadIface(WorkloadRequest work, + Reference ccr, + Reference const> dbInfo) { if (work.options.size() < 1) { TraceEvent(SevError, "TestCreationError").detail("Reason", "No options provided"); fprintf(stderr, "ERROR: No options were provided for workload.\n"); throw test_specification_invalid(); } if (work.options.size() == 1) - return getWorkloadIface(work, work.options[0], dbInfo); + return getWorkloadIface(work, ccr, work.options[0], dbInfo); WorkloadContext wcx; wcx.clientId = work.clientId; @@ -394,7 +398,7 @@ Reference getWorkloadIface(WorkloadRequest work, Reference(wcx); for (int i = 0; i < work.options.size(); i++) { - compound->add(getWorkloadIface(work, work.options[i], dbInfo)); + compound->add(getWorkloadIface(work, ccr, work.options[i], dbInfo)); } return compound; } @@ -647,7 +651,7 @@ ACTOR Future testerServerWorkload(WorkloadRequest work, // add test for "done" ? TraceEvent("WorkloadReceived", workIface.id()).detail("Title", work.title); - auto workload = getWorkloadIface(work, dbInfo); + auto workload = getWorkloadIface(work, ccr, dbInfo); if (!workload) { TraceEvent("TestCreationError").detail("Reason", "Workload could not be created"); fprintf(stderr, "ERROR: The workload could not be created.\n"); diff --git a/fdbserver/workloads/ClientWorkload.actor.cpp b/fdbserver/workloads/ClientWorkload.actor.cpp new file mode 100644 index 0000000000..8fd104a1e9 --- /dev/null +++ b/fdbserver/workloads/ClientWorkload.actor.cpp @@ -0,0 +1,120 @@ +/* + * workloads.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbrpc/simulator.h" + +#include + +#include "flow/actorcompiler.h" // has to be last include + +struct ClientWorkloadImpl { + Reference child; + ISimulator::ProcessInfo* self = g_pSimulator->getCurrentProcess(); + ISimulator::ProcessInfo* childProcess = nullptr; + IPAddress childAddress; + std::string processName; + Database cx; + Future databaseOpened; + + ClientWorkloadImpl(Reference const& child) : child(child) { + if (self->address.isV6()) { + childAddress = + IPAddress::parse(fmt::format("2001:fdb1:fdb2:fdb3:fdb4:fdb5:fdb6:{:40x}", child->clientId + 2)).get(); + } else { + childAddress = IPAddress::parse(fmt::format("192.168.0.{}", child->clientId + 2)).get(); + } + processName = fmt::format("TestClient{}", child->clientId); + childProcess = g_simulator.newProcess(processName.c_str(), + childAddress, + 0, + self->address.isTLS(), + 1, + self->locality, + ProcessClass(ProcessClass::TesterClass, ProcessClass::AutoSource), + self->dataFolder, + self->coordinationFolder, + self->protocolVersion); + databaseOpened = openDatabase(this); + } + + ~ClientWorkloadImpl() { + g_simulator.destroyProcess(childProcess); + } + + + ACTOR static Future openDatabase(ClientWorkloadImpl* self) { + wait(g_simulator.onProcess(self->childProcess)); + self->cx = Database::createDatabase(self->child->ccr, -1); + wait(g_simulator.onProcess(self->self)); + return Void(); + } + + ACTOR template Future runActor(ClientWorkloadImpl* self, Fun f) { + state Optional err; + state Ret res; + wait(g_simulator.onProcess(self->childProcess)); + wait(self->databaseOpened); + try { + Ret r = wait(f(self->cx)); + res = r; + } catch(Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + err = e; + } + wait(g_simulator.onProcess(self->self)); + if (err.present()) { + throw err.get(); + } + return res; + } + +}; + +ClientWorkload::ClientWorkload(Reference const& child, WorkloadContext const& wcx) + : TestWorkload(wcx), impl(new ClientWorkloadImpl(child)) {} + +std::string ClientWorkload::description() const { + return impl->child->description(); +} +Future ClientWorkload::setup(Database const& cx) { + return impl->runActor(impl, [this](Database const& db) { + return impl->child->setup(db); + }); +} +Future ClientWorkload::start(Database const& cx) { + return impl->runActor(impl, [this](Database const& db) { + return impl->child->start(db); + }); +} +Future ClientWorkload::check(Database const& cx) { + return impl->runActor(impl, [this](Database const& db) { + return impl->child->check(db); + }); +} +void ClientWorkload::getMetrics(std::vector& m) { + return impl->child->getMetrics(m); +} + +double ClientWorkload::getCheckTimeout() const { + return impl->child->getCheckTimeout(); +} diff --git a/fdbserver/workloads/workloads.actor.h b/fdbserver/workloads/workloads.actor.h index 1770c7eb52..9de4316e62 100644 --- a/fdbserver/workloads/workloads.actor.h +++ b/fdbserver/workloads/workloads.actor.h @@ -50,6 +50,7 @@ struct WorkloadContext { int clientId, clientCount; int64_t sharedRandomNumber; Reference const> dbInfo; + Reference ccr; WorkloadContext(); WorkloadContext(const WorkloadContext&); @@ -79,6 +80,20 @@ struct TestWorkload : NonCopyable, WorkloadContext, ReferenceCounted const& child, WorkloadContext const& wcx); + ~ClientWorkload(); + std::string description() const override; + Future setup(Database const& cx) override; + Future start(Database const& cx) override; + Future check(Database const& cx) override; + void getMetrics(std::vector& m) override; + + double getCheckTimeout() const override; +}; + struct KVWorkload : TestWorkload { uint64_t nodeCount; int64_t nodePrefix; @@ -121,8 +136,17 @@ struct IWorkloadFactory : ReferenceCounted { template struct WorkloadFactory : IWorkloadFactory { - WorkloadFactory(const char* name) { factories()[name] = Reference::addRef(this); } - Reference create(WorkloadContext const& wcx) override { return makeReference(wcx); } + bool asClient; + WorkloadFactory(const char* name, bool asClient = false) : asClient(asClient) { factories()[name] = Reference::addRef(this); } + Reference create(WorkloadContext const& wcx) override { + if (g_network->isSimulated() && asClient) { + WorkloadContext clientContext = wcx; + clientContext.dbInfo = decltype(clientContext.dbInfo)(); + auto child = makeReference(wcx); + return makeReference(child, wcx); + } + return makeReference(wcx); + } }; #define REGISTER_WORKLOAD(classname) WorkloadFactory classname##WorkloadFactory(#classname)