Added ClientWorkload implementation

This commit is contained in:
Markus Pilman 2022-02-22 14:29:51 +01:00
parent dc973fb67e
commit 53b4d8a307
4 changed files with 155 additions and 6 deletions

View File

@ -172,6 +172,7 @@ set(FDBSERVER_SRCS
workloads/ClearSingleRange.actor.cpp
workloads/ClientLibManagementWorkload.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/ClientWorkload.actor.cpp
workloads/TriggerRecovery.actor.cpp
workloads/SuspendProcesses.actor.cpp
workloads/CommitBugCheck.actor.cpp

View File

@ -342,12 +342,14 @@ struct CompoundWorkload : TestWorkload {
};
Reference<TestWorkload> getWorkloadIface(WorkloadRequest work,
Reference<IClusterConnectionRecord> ccr,
VectorRef<KeyValueRef> options,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
Value testName = getOption(options, LiteralStringRef("testName"), LiteralStringRef("no-test-specified"));
WorkloadContext wcx;
wcx.clientId = work.clientId;
wcx.clientCount = work.clientCount;
wcx.ccr = ccr;
wcx.dbInfo = dbInfo;
wcx.options = options;
wcx.sharedRandomNumber = work.sharedRandomNumber;
@ -377,14 +379,16 @@ Reference<TestWorkload> getWorkloadIface(WorkloadRequest work,
return workload;
}
Reference<TestWorkload> getWorkloadIface(WorkloadRequest work, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
Reference<TestWorkload> getWorkloadIface(WorkloadRequest work,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
if (work.options.size() < 1) {
TraceEvent(SevError, "TestCreationError").detail("Reason", "No options provided");
fprintf(stderr, "ERROR: No options were provided for workload.\n");
throw test_specification_invalid();
}
if (work.options.size() == 1)
return getWorkloadIface(work, work.options[0], dbInfo);
return getWorkloadIface(work, ccr, work.options[0], dbInfo);
WorkloadContext wcx;
wcx.clientId = work.clientId;
@ -394,7 +398,7 @@ Reference<TestWorkload> getWorkloadIface(WorkloadRequest work, Reference<AsyncVa
// getWorkloadIface()?
auto compound = makeReference<CompoundWorkload>(wcx);
for (int i = 0; i < work.options.size(); i++) {
compound->add(getWorkloadIface(work, work.options[i], dbInfo));
compound->add(getWorkloadIface(work, ccr, work.options[i], dbInfo));
}
return compound;
}
@ -647,7 +651,7 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
// add test for "done" ?
TraceEvent("WorkloadReceived", workIface.id()).detail("Title", work.title);
auto workload = getWorkloadIface(work, dbInfo);
auto workload = getWorkloadIface(work, ccr, dbInfo);
if (!workload) {
TraceEvent("TestCreationError").detail("Reason", "Workload could not be created");
fprintf(stderr, "ERROR: The workload could not be created.\n");

View File

@ -0,0 +1,120 @@
/*
* workloads.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include <fmt/format.h>
#include "flow/actorcompiler.h" // has to be last include
struct ClientWorkloadImpl {
Reference<TestWorkload> child;
ISimulator::ProcessInfo* self = g_pSimulator->getCurrentProcess();
ISimulator::ProcessInfo* childProcess = nullptr;
IPAddress childAddress;
std::string processName;
Database cx;
Future<Void> databaseOpened;
ClientWorkloadImpl(Reference<TestWorkload> const& child) : child(child) {
if (self->address.isV6()) {
childAddress =
IPAddress::parse(fmt::format("2001:fdb1:fdb2:fdb3:fdb4:fdb5:fdb6:{:40x}", child->clientId + 2)).get();
} else {
childAddress = IPAddress::parse(fmt::format("192.168.0.{}", child->clientId + 2)).get();
}
processName = fmt::format("TestClient{}", child->clientId);
childProcess = g_simulator.newProcess(processName.c_str(),
childAddress,
0,
self->address.isTLS(),
1,
self->locality,
ProcessClass(ProcessClass::TesterClass, ProcessClass::AutoSource),
self->dataFolder,
self->coordinationFolder,
self->protocolVersion);
databaseOpened = openDatabase(this);
}
~ClientWorkloadImpl() {
g_simulator.destroyProcess(childProcess);
}
ACTOR static Future<Void> openDatabase(ClientWorkloadImpl* self) {
wait(g_simulator.onProcess(self->childProcess));
self->cx = Database::createDatabase(self->child->ccr, -1);
wait(g_simulator.onProcess(self->self));
return Void();
}
ACTOR template<class Ret, class Fun> Future<Ret> runActor(ClientWorkloadImpl* self, Fun f) {
state Optional<Error> err;
state Ret res;
wait(g_simulator.onProcess(self->childProcess));
wait(self->databaseOpened);
try {
Ret r = wait(f(self->cx));
res = r;
} catch(Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
err = e;
}
wait(g_simulator.onProcess(self->self));
if (err.present()) {
throw err.get();
}
return res;
}
};
ClientWorkload::ClientWorkload(Reference<TestWorkload> const& child, WorkloadContext const& wcx)
: TestWorkload(wcx), impl(new ClientWorkloadImpl(child)) {}
std::string ClientWorkload::description() const {
return impl->child->description();
}
Future<Void> ClientWorkload::setup(Database const& cx) {
return impl->runActor<Void>(impl, [this](Database const& db) {
return impl->child->setup(db);
});
}
Future<Void> ClientWorkload::start(Database const& cx) {
return impl->runActor<Void>(impl, [this](Database const& db) {
return impl->child->start(db);
});
}
Future<bool> ClientWorkload::check(Database const& cx) {
return impl->runActor<bool>(impl, [this](Database const& db) {
return impl->child->check(db);
});
}
void ClientWorkload::getMetrics(std::vector<PerfMetric>& m) {
return impl->child->getMetrics(m);
}
double ClientWorkload::getCheckTimeout() const {
return impl->child->getCheckTimeout();
}

View File

@ -50,6 +50,7 @@ struct WorkloadContext {
int clientId, clientCount;
int64_t sharedRandomNumber;
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo;
Reference<IClusterConnectionRecord> ccr;
WorkloadContext();
WorkloadContext(const WorkloadContext&);
@ -79,6 +80,20 @@ struct TestWorkload : NonCopyable, WorkloadContext, ReferenceCounted<TestWorkloa
enum WorkloadPhase { SETUP = 1, EXECUTION = 2, CHECK = 4, METRICS = 8 };
};
struct ClientWorkloadImpl;
struct ClientWorkload : TestWorkload {
ClientWorkloadImpl* impl;
ClientWorkload(Reference<TestWorkload> const& child, WorkloadContext const& wcx);
~ClientWorkload();
std::string description() const override;
Future<Void> setup(Database const& cx) override;
Future<Void> start(Database const& cx) override;
Future<bool> check(Database const& cx) override;
void getMetrics(std::vector<PerfMetric>& m) override;
double getCheckTimeout() const override;
};
struct KVWorkload : TestWorkload {
uint64_t nodeCount;
int64_t nodePrefix;
@ -121,8 +136,17 @@ struct IWorkloadFactory : ReferenceCounted<IWorkloadFactory> {
template <class WorkloadType>
struct WorkloadFactory : IWorkloadFactory {
WorkloadFactory(const char* name) { factories()[name] = Reference<IWorkloadFactory>::addRef(this); }
Reference<TestWorkload> create(WorkloadContext const& wcx) override { return makeReference<WorkloadType>(wcx); }
bool asClient;
WorkloadFactory(const char* name, bool asClient = false) : asClient(asClient) { factories()[name] = Reference<IWorkloadFactory>::addRef(this); }
Reference<TestWorkload> create(WorkloadContext const& wcx) override {
if (g_network->isSimulated() && asClient) {
WorkloadContext clientContext = wcx;
clientContext.dbInfo = decltype(clientContext.dbInfo)();
auto child = makeReference<WorkloadType>(wcx);
return makeReference<ClientWorkload>(child, wcx);
}
return makeReference<WorkloadType>(wcx);
}
};
#define REGISTER_WORKLOAD(classname) WorkloadFactory<classname> classname##WorkloadFactory(#classname)