2021-08-03 05:03:20 +08:00
* ConfigIncrement.actor.cpp
* This source file is part of the FoundationDB open source project
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "fdbclient/ISingleThreadTransaction.h"
#include "fdbclient/Tuple.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class ConfigIncrementWorkload : public TestWorkload {
2021-08-03 06:50:10 +08:00
int incrementActors{ 0 };
2021-08-03 05:03:20 +08:00
int incrementsPerActor{ 0 };
2021-08-03 06:50:10 +08:00
Version lastKnownCommittedVersion{ ::invalidVersion };
int lastKnownValue{ -1 };
2021-08-03 05:03:20 +08:00
bool useSimpleConfigDB{ true };
2021-08-03 08:47:33 +08:00
double meanSleepWithinTransactions{ 0.01 };
double meanSleepBetweenTransactions{ 0.1 };
2021-08-03 05:03:20 +08:00
static KeyRef const testKnobName;
static Key configKey;
PerfIntCounter transactions, retries;
static Key getConfigKey() {
Tuple tuple;
tuple.appendNull(); // config class
tuple << testKnobName;
return tuple.pack();
ACTOR static Future<int> get(Reference<ISingleThreadTransaction> tr) {
2021-08-03 06:50:10 +08:00
TraceEvent(SevDebug, "ConfigIncrementGet");
2021-08-03 05:03:20 +08:00
Optional<Value> serializedValue = wait(tr->get(getConfigKey()));
if (!serializedValue.present()) {
return 0;
} else {
return BinaryReader::fromStringRef<int>(serializedValue.get(), Unversioned());
2021-08-03 06:50:10 +08:00
static void set(Reference<ISingleThreadTransaction> tr, int value) {
TraceEvent(SevDebug, "ConfigIncrementSet").detail("Value", value);
tr->set(getConfigKey(), format("%d", value));
2021-08-03 05:03:20 +08:00
ACTOR static Future<Void> incrementActor(ConfigIncrementWorkload* self, Database cx) {
2021-08-03 06:50:10 +08:00
TraceEvent(SevDebug, "ConfigIncrementStartIncrementActor");
2021-08-03 05:03:20 +08:00
state int trsComplete = 0;
while (trsComplete < self->incrementsPerActor) {
2021-08-03 06:50:10 +08:00
loop {
try {
state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx);
state int currentValue = wait(get(tr));
set(tr, currentValue + 1);
2021-08-03 08:47:33 +08:00
wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepWithinTransactions));
2021-08-03 06:50:10 +08:00
ASSERT_GT(tr->getCommittedVersion(), self->lastKnownCommittedVersion);
ASSERT_GE(currentValue, self->lastKnownValue);
self->lastKnownCommittedVersion = tr->getCommittedVersion();
self->lastKnownValue = currentValue + 1;
2021-08-03 08:47:33 +08:00
wait(delay(deterministicRandom()->random01() * 2 * self->meanSleepBetweenTransactions));
2021-08-03 06:50:10 +08:00
} catch (Error& e) {
2021-08-03 08:47:33 +08:00
TraceEvent(SevDebug, "ConfigIncrementError")
.detail("LastKnownValue", self->lastKnownValue)
2021-08-03 06:50:10 +08:00
2021-08-03 08:47:33 +08:00
2021-08-03 06:50:10 +08:00
2021-08-03 05:03:20 +08:00
return Void();
ACTOR static Future<bool> check(ConfigIncrementWorkload* self, Database cx) {
state Reference<ISingleThreadTransaction> tr = self->getTransaction(cx);
2021-08-03 06:50:10 +08:00
loop {
try {
state int currentValue = wait(get(tr));
2021-08-03 08:47:33 +08:00
auto expectedValue = self->incrementActors * self->incrementsPerActor;
2021-08-03 06:50:10 +08:00
.detail("CurrentValue", currentValue)
2021-08-03 08:47:33 +08:00
.detail("ExpectedValue", expectedValue);
return currentValue >= expectedValue; // >= because we may have maybe_committed errors
2021-08-03 06:50:10 +08:00
} catch (Error& e) {
2021-08-03 05:03:20 +08:00
Reference<ISingleThreadTransaction> getTransaction(Database cx) const {
auto type = useSimpleConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG
: ISingleThreadTransaction::Type::PAXOS_CONFIG;
return ISingleThreadTransaction::create(type, cx);
ConfigIncrementWorkload(WorkloadContext const& wcx)
2021-08-03 08:47:33 +08:00
: TestWorkload(wcx), transactions("Transactions"), retries("Retries") {
2021-08-03 06:50:10 +08:00
incrementActors = getOption(options, "incrementActors"_sr, 10);
incrementsPerActor = getOption(options, "incrementsPerActor"_sr, 10);
useSimpleConfigDB = getOption(options, "useSimpleConfigDB"_sr, true);
2021-08-03 08:47:33 +08:00
meanSleepWithinTransactions = getOption(options, "meanSleepWithinTransactions"_sr, 0.01);
meanSleepBetweenTransactions = getOption(options, "meanSleepBetweenTransactions"_sr, 0.1);
2021-08-03 05:03:20 +08:00
std::string description() const override { return "ConfigIncrementWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
std::vector<Future<Void>> actors;
2021-08-03 08:52:28 +08:00
auto localIncrementActors =
(clientId < incrementActors) ? ((incrementActors - clientId - 1) / clientCount + 1) : 0;
2021-08-03 08:47:33 +08:00
for (int i = 0; i < localIncrementActors; ++i) {
2021-08-03 05:03:20 +08:00
actors.push_back(incrementActor(this, cx));
return waitForAll(actors);
2021-08-03 08:47:33 +08:00
Future<bool> check(Database const& cx) override { return clientId ? Future<bool>{ true } : check(this, cx); }
2021-08-03 05:03:20 +08:00
2021-08-03 08:47:33 +08:00
void getMetrics(std::vector<PerfMetric>& m) override {
2021-08-03 05:03:20 +08:00
WorkloadFactory<ConfigIncrementWorkload> ConfigIncrementWorkloadFactory("ConfigIncrement");
KeyRef const ConfigIncrementWorkload::testKnobName = "test_int"_sr;