Use SimpleConfigConsumer in LocalConfiguration

This commit is contained in:
sfc-gh-tclinkenbeard 2021-05-12 17:23:32 -07:00
parent 93c9ccf47b
commit 511ce8d088
12 changed files with 205 additions and 83 deletions

View File

@ -85,7 +85,7 @@ public:
void setTimestamp(double timestamp) { this->timestamp = timestamp; }
bool isSet() const { return value.present(); }
static Standalone<ConfigMutationRef> createConfigMutationRef(KeyRef encodedKey, Optional<ValueRef> value) {
static Standalone<ConfigMutationRef> createConfigMutation(KeyRef encodedKey, Optional<ValueRef> value) {
auto key = ConfigKeyRef::decodeKey(encodedKey);
return ConfigMutationRef(key, value);
}

View File

@ -87,11 +87,11 @@ public:
}
void set(KeyRef key, ValueRef value) {
mutations.push_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutationRef(key, value));
mutations.push_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutation(key, value));
}
void clear(KeyRef key) {
mutations.emplace_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutationRef(key, {}));
mutations.emplace_back_deep(mutations.arena(), ConfigMutationRef::createConfigMutation(key, {}));
}
Future<Optional<Value>> get(KeyRef key) { return get(this, key); }

View File

@ -96,6 +96,7 @@ set(FDBSERVER_SRCS
ServerDBInfo.actor.h
ServerDBInfo.h
SimpleConfigConsumer.actor.cpp
SimpleConfigConsumer.h
SimpleConfigDatabaseNode.actor.cpp
SimulatedCluster.actor.cpp
SimulatedCluster.h

View File

@ -19,7 +19,7 @@
*/
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/IConfigConsumer.h"
#include "fdbserver/SimpleConfigConsumer.h"
#include "flow/actorcompiler.h" // must be last include
class ConfigBroadcasterImpl {

View File

@ -33,14 +33,3 @@ public:
virtual Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) = 0;
virtual Future<Void> consume(ConfigBroadcaster& broadcaster) = 0;
};
class SimpleConfigConsumer : public IConfigConsumer {
std::unique_ptr<class SimpleConfigConsumerImpl> impl;
public:
SimpleConfigConsumer(ClusterConnectionString const& ccs);
SimpleConfigConsumer(ServerCoordinators const& coordinators);
~SimpleConfigConsumer();
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
};

View File

@ -19,9 +19,11 @@
*/
#include "fdbclient/Knobs.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/Tuple.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LocalConfiguration.h"
#include "fdbserver/SimpleConfigConsumer.h"
#include "flow/Knobs.h"
#include "flow/UnitTest.h"
@ -107,7 +109,6 @@ public:
class LocalConfigurationImpl {
IKeyValueStore* kvStore; // FIXME: fix leaks?
Version lastSeenVersion { 0 };
Future<Void> initFuture;
FlowKnobs flowKnobs;
ClientKnobs clientKnobs;
@ -115,6 +116,7 @@ class LocalConfigurationImpl {
TestKnobs testKnobs;
ManualKnobOverrides manualKnobOverrides;
ConfigKnobOverrides configKnobOverrides;
ActorCollection actors{ false };
ACTOR static Future<Void> saveConfigPath(LocalConfigurationImpl* self) {
self->kvStore->set(
@ -130,21 +132,20 @@ class LocalConfigurationImpl {
return Void();
}
ACTOR static Future<Void> getLastSeenVersion(LocalConfigurationImpl *self) {
ACTOR static Future<Version> getLastSeenVersion(LocalConfigurationImpl* self) {
state Version result = 0;
state Optional<Value> lastSeenVersionValue = wait(self->kvStore->readValue(lastSeenVersionKey));
if (!lastSeenVersionValue.present()) {
self->lastSeenVersion = 0;
self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(self->lastSeenVersion, IncludeVersion())));
self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(result, IncludeVersion())));
wait(self->kvStore->commit());
return Void();
} else {
result = BinaryReader::fromStringRef<Version>(lastSeenVersionValue.get(), IncludeVersion());
}
self->lastSeenVersion = BinaryReader::fromStringRef<Version>(lastSeenVersionValue.get(), IncludeVersion());
return Void();
return result;
}
ACTOR static Future<Void> init(LocalConfigurationImpl* self) {
wait(self->kvStore->init());
wait(getLastSeenVersion(self));
state Optional<Value> storedConfigPathValue = wait(self->kvStore->readValue(configPathKey));
if (!storedConfigPathValue.present()) {
wait(saveConfigPath(self));
@ -191,19 +192,27 @@ class LocalConfigurationImpl {
initializeKnobs();
}
ACTOR static Future<Void> applyKnobUpdates(LocalConfigurationImpl* self, ConfigFollowerGetSnapshotReply reply) {
ACTOR static Future<Void> setSnapshot(LocalConfigurationImpl* self,
std::map<ConfigKey, Value> snapshot,
Version lastCompactedVersion) {
// TODO: Concurrency control?
self->kvStore->clear(knobOverrideKeys);
for (const auto& [configKey, knobValue] : reply.snapshot) {
for (const auto& [configKey, knobValue] : snapshot) {
self->configKnobOverrides.set(configKey.configClass, configKey.knobName, knobValue);
}
self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(self->lastSeenVersion, IncludeVersion())));
self->kvStore->set(
KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(lastCompactedVersion, IncludeVersion())));
wait(self->kvStore->commit());
self->updateInMemoryKnobs();
return Void();
}
ACTOR static Future<Void> applyKnobUpdates(LocalConfigurationImpl *self, ConfigFollowerGetChangesReply reply) {
for (const auto &versionedMutation : reply.versionedMutations) {
ACTOR static Future<Void> addVersionedMutations(
LocalConfigurationImpl* self,
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations,
Version mostRecentVersion) {
// TODO: Concurrency control?
for (const auto& versionedMutation : versionedMutations) {
const auto &mutation = versionedMutation.mutation;
auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion());
if (mutation.isSet()) {
@ -214,36 +223,12 @@ class LocalConfigurationImpl {
self->configKnobOverrides.remove(mutation.getConfigClass(), mutation.getKnobName());
}
}
self->lastSeenVersion = reply.mostRecentVersion;
self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(reply.mostRecentVersion, IncludeVersion())));
self->kvStore->set(KeyValueRef(lastSeenVersionKey, BinaryWriter::toValue(mostRecentVersion, IncludeVersion())));
wait(self->kvStore->commit());
self->updateInMemoryKnobs();
return Void();
}
ACTOR static Future<Void> fetchChanges(LocalConfigurationImpl *self, ConfigFollowerInterface broadcaster) {
try {
ConfigFollowerGetChangesReply changesReply =
wait(broadcaster.getChanges.getReply(ConfigFollowerGetChangesRequest{
self->lastSeenVersion, self->configKnobOverrides.getConfigClassSet() }));
// TODO: Avoid applying if there are no updates
wait(applyKnobUpdates(self, changesReply));
} catch (Error &e) {
if (e.code() == error_code_version_already_compacted) {
ConfigFollowerGetVersionReply versionReply = wait(broadcaster.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
self->lastSeenVersion = versionReply.version;
ConfigFollowerGetSnapshotReply snapshotReply =
wait(broadcaster.getSnapshot.getReply(ConfigFollowerGetSnapshotRequest{
self->lastSeenVersion, self->configKnobOverrides.getConfigClassSet() }));
// TODO: Avoid applying if there are no updates
wait(applyKnobUpdates(self, snapshotReply));
} else {
throw e;
}
}
return Void();
}
ACTOR static Future<Void> monitorBroadcaster(Reference<AsyncVar<ServerDBInfo> const> serverDBInfo,
Reference<AsyncVar<ConfigFollowerInterface>> broadcaster) {
loop {
@ -252,22 +237,28 @@ class LocalConfigurationImpl {
}
}
ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
ACTOR static Future<Void> consumeLoopIteration(LocalConfiguration* self,
LocalConfigurationImpl* impl,
Reference<AsyncVar<ConfigFollowerInterface>> broadcaster) {
// TODO: Cache lastSeenVersion in memory
// state Version lastSeenVersion = wait(impl->getLastSeenVersion());
state SimpleConfigConsumer consumer(broadcaster->get());
choose {
when(wait(broadcaster->onChange())) {}
when(wait(brokenPromiseToNever(consumer.consume(*self)))) { ASSERT(false); }
when(wait(impl->actors.getResult())) { ASSERT(false); }
}
return Void();
}
ACTOR static Future<Void> consume(LocalConfiguration* self,
LocalConfigurationImpl* impl,
Reference<AsyncVar<ServerDBInfo> const> serverDBInfo) {
wait(self->initFuture);
state Future<ConfigFollowerGetChangesReply> getChangesReply = Never();
wait(impl->initFuture);
state Reference<AsyncVar<ConfigFollowerInterface>> broadcaster =
makeReference<AsyncVar<ConfigFollowerInterface>>(serverDBInfo->get().configBroadcaster);
state Future<Void> monitor = monitorBroadcaster(serverDBInfo, broadcaster);
loop {
choose {
when(wait(broadcaster->onChange())) {}
when(wait(brokenPromiseToNever(fetchChanges(self, broadcaster->get())))) {
wait(delay(5.0)); // TODO: Make knob?
}
when(wait(monitor)) { ASSERT(false); }
}
}
impl->actors.add(monitorBroadcaster(serverDBInfo, broadcaster));
loop { wait(consumeLoopIteration(self, impl, broadcaster)); }
}
public:
@ -286,6 +277,20 @@ public:
return initFuture;
}
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
// TODO: Remove unnecessary copy
auto f = setSnapshot(this, std::move(snapshot), lastCompactedVersion);
actors.add(f);
return f;
}
Future<Void> addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations,
Version mostRecentVersion) {
auto f = addVersionedMutations(this, versionedMutations, mostRecentVersion);
actors.add(f);
return f;
}
FlowKnobs const& getFlowKnobs() const {
ASSERT(initFuture.isReady());
return flowKnobs;
@ -306,8 +311,8 @@ public:
return testKnobs;
}
Future<Void> consume(Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
return consume(this, serverDBInfo);
Future<Void> consume(LocalConfiguration& self, Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
return consume(&self, this, serverDBInfo);
}
};
@ -319,7 +324,7 @@ LocalConfiguration::LocalConfiguration(std::string const& configPath,
LocalConfiguration::~LocalConfiguration() = default;
Future<Void> LocalConfiguration::init() {
Future<Void> LocalConfiguration::initialize() {
return impl->init();
}
@ -340,7 +345,17 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const {
}
Future<Void> LocalConfiguration::consume(Reference<AsyncVar<ServerDBInfo> const> const& serverDBInfo) {
return impl->consume(serverDBInfo);
return impl->consume(*this, serverDBInfo);
}
Future<Void> LocalConfiguration::setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
return impl->setSnapshot(std::move(snapshot), lastCompactedVersion);
}
Future<Void> LocalConfiguration::addVersionedMutations(
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations,
Version mostRecentVersion) {
return impl->addVersionedMutations(versionedMutations, mostRecentVersion);
}
#define init(knob, value) initKnob(knob, value, #knob)
@ -444,3 +459,54 @@ TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ConfigKnobOverrides") {
ASSERT(k2.TEST2_INT == 10);
return Void();
}
namespace {
std::map<ConfigKey, Value> startingTestSnapshot = {
{ ConfigKeyRef("class-A"_sr, "test_int"_sr), "1"_sr },
{ ConfigKeyRef("class-B"_sr, "test_int"_sr), "2"_sr },
{ ConfigKeyRef("class-C"_sr, "test_int"_sr), "3"_sr },
};
ACTOR Future<Void> runLocalConfig(std::string configPath, std::string dataFolder) {
state LocalConfiguration localConfiguration(configPath, dataFolder, {}, UID{});
wait(localConfiguration.initialize());
std::map<ConfigKey, Value> snapshot;
snapshot[ConfigKeyRef("class-A"_sr, "test_int"_sr)] = "5"_sr;
wait(localConfiguration.setSnapshot(std::move(snapshot), 1));
ASSERT(localConfiguration.getTestKnobs().TEST_INT == 5);
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
{
Tuple tuple;
tuple << "class-A"_sr;
tuple << "test_int"_sr;
auto mutation = ConfigMutationRef::createConfigMutation(tuple.pack(), "7"_sr);
versionedMutations.emplace_back_deep(versionedMutations.arena(), 2, mutation);
}
wait(localConfiguration.addVersionedMutations(versionedMutations, 2));
ASSERT(localConfiguration.getTestKnobs().TEST_INT == 7);
return Void();
}
} // namespace
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Simple") {
wait(runLocalConfig("class-A/class-B", "./"));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Restart") {
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/FreshRestart") {
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ConflictingOverrides") {
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Consume") {
return Void();
}

View File

@ -49,11 +49,14 @@ public:
std::map<Key, Value>&& manualKnobOverrides,
UID id);
~LocalConfiguration();
Future<Void> init();
Future<Void> initialize();
FlowKnobs const& getFlowKnobs() const;
ClientKnobs const& getClientKnobs() const;
ServerKnobs const& getServerKnobs() const;
TestKnobs const& getTestKnobs() const;
// TODO: Only one field of serverDBInfo is required, so improve encapsulation
Future<Void> consume(Reference<AsyncVar<ServerDBInfo> const> const&);
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion);
Future<Void> addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations,
Version mostRecentVersion);
};

View File

@ -1,5 +1,5 @@
/*
* SimpleConfigBroadcaster.actor.cpp
* SimpleConfigConsumer.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -18,8 +18,8 @@
* limitations under the License.
*/
#include "fdbserver/IConfigConsumer.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/SimpleConfigConsumer.h"
class SimpleConfigConsumerImpl {
ConfigFollowerInterface cfi;
@ -45,7 +45,9 @@ class SimpleConfigConsumerImpl {
}
}
ACTOR static Future<Void> fetchChanges(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
// TODO: Make static
ACTOR template <class ConfigStore>
Future<Void> fetchChanges(SimpleConfigConsumerImpl* self, ConfigStore* configStore) {
loop {
try {
ConfigFollowerGetChangesReply reply =
@ -59,7 +61,7 @@ class SimpleConfigConsumerImpl {
.detail("KnobValue", versionedMutation.mutation.getValue());
}
self->mostRecentVersion = reply.mostRecentVersion;
broadcaster->addVersionedMutations(reply.versionedMutations, reply.mostRecentVersion);
configStore->addVersionedMutations(reply.versionedMutations, reply.mostRecentVersion);
wait(delayJittered(POLLING_INTERVAL));
} catch (Error& e) {
++self->failedChangeRequest;
@ -72,7 +74,7 @@ class SimpleConfigConsumerImpl {
ConfigFollowerGetSnapshotRequest{ self->mostRecentVersion, {} }));
// TODO: Remove unnecessary copy
auto snapshot = dbReply.snapshot;
broadcaster->setSnapshot(std::move(snapshot), self->mostRecentVersion);
configStore->setSnapshot(std::move(snapshot), self->mostRecentVersion);
++self->snapshotRequest;
} else {
throw e;
@ -81,7 +83,9 @@ class SimpleConfigConsumerImpl {
}
}
ACTOR static Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
// TODO: Make static
ACTOR template <class ConfigStore>
Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self, ConfigStore* configStore) {
ConfigFollowerGetVersionReply versionReply =
wait(self->cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
self->mostRecentVersion = versionReply.version;
@ -90,7 +94,7 @@ class SimpleConfigConsumerImpl {
TraceEvent(SevDebug, "ConfigGotInitialSnapshot").detail("Version", self->mostRecentVersion);
// TODO: Remove unnecessary copy
auto snapshot = reply.snapshot;
broadcaster->setSnapshot(std::move(snapshot), self->mostRecentVersion);
configStore->setSnapshot(std::move(snapshot), self->mostRecentVersion);
return Void();
}
@ -103,6 +107,8 @@ class SimpleConfigConsumerImpl {
}
public:
SimpleConfigConsumerImpl(ConfigFollowerInterface const& cfi) : SimpleConfigConsumerImpl() { this->cfi = cfi; }
SimpleConfigConsumerImpl(ClusterConnectionString const& ccs) : SimpleConfigConsumerImpl() {
auto coordinators = ccs.coordinators();
std::sort(coordinators.begin(), coordinators.end());
@ -113,14 +119,23 @@ public:
cfi = ConfigFollowerInterface(coordinators.configServers[0]);
}
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) { return getInitialSnapshot(this, &broadcaster); }
template <class ConfigStore>
Future<Void> getInitialSnapshot(ConfigStore& configStore) {
return getInitialSnapshot(this, &configStore);
}
Future<Void> consume(ConfigBroadcaster& broadcaster) { return fetchChanges(this, &broadcaster) || compactor(this); }
template <class ConfigStore>
Future<Void> consume(ConfigStore& configStore) {
return fetchChanges(this, &configStore) || compactor(this);
}
};
const double SimpleConfigConsumerImpl::POLLING_INTERVAL = 0.5;
const double SimpleConfigConsumerImpl::COMPACTION_INTERVAL = 5.0;
SimpleConfigConsumer::SimpleConfigConsumer(ConfigFollowerInterface const& cfi)
: impl(std::make_unique<SimpleConfigConsumerImpl>(cfi)) {}
SimpleConfigConsumer::SimpleConfigConsumer(ClusterConnectionString const& ccs)
: impl(std::make_unique<SimpleConfigConsumerImpl>(ccs)) {}
@ -131,8 +146,16 @@ Future<Void> SimpleConfigConsumer::getInitialSnapshot(ConfigBroadcaster& broadca
return impl->getInitialSnapshot(broadcaster);
}
Future<Void> SimpleConfigConsumer::getInitialSnapshot(LocalConfiguration& localConfiguration) {
return impl->getInitialSnapshot(localConfiguration);
}
Future<Void> SimpleConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
return impl->consume(broadcaster);
}
Future<Void> SimpleConfigConsumer::consume(LocalConfiguration& localConfiguration) {
return impl->consume(localConfiguration);
}
SimpleConfigConsumer::~SimpleConfigConsumer() = default;

View File

@ -0,0 +1,39 @@
/*
* SimpleConfigConsumer.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbserver/IConfigConsumer.h"
#include "fdbserver/LocalConfiguration.h"
#include <memory>
class SimpleConfigConsumer : public IConfigConsumer {
std::unique_ptr<class SimpleConfigConsumerImpl> impl;
public:
SimpleConfigConsumer(ConfigFollowerInterface const& cfi);
SimpleConfigConsumer(ClusterConnectionString const& ccs);
SimpleConfigConsumer(ServerCoordinators const& coordinators);
~SimpleConfigConsumer();
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;
Future<Void> getInitialSnapshot(LocalConfiguration& localConfiguration);
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
Future<Void> consume(LocalConfiguration& localConfiguration);
};

View File

@ -2024,7 +2024,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state LocalConfiguration localConfig(configPath, dataFolder, {}, UID{});
wait(localConfig.init());
wait(localConfig.initialize());
actors.push_back(serveProtocolInfo());

View File

@ -75,6 +75,7 @@ ERROR( batch_transaction_throttled, 1051, "Batch GRV request rate limit exceeded
ERROR( dd_cancelled, 1052, "Data distribution components cancelled")
ERROR( dd_not_found, 1053, "Data distributor not found")
ERROR( version_already_compacted, 1054, "The requested changes have been compacted away")
ERROR( local_config_changed, 1055, "Local configuration file has changed. Restart and apply these changes" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -4,4 +4,4 @@ useDB=false
testName=UnitTests
maxTestCases=10
testsMatching=/fdbserver/ConfigDB/
testsMatching=/fdbserver/ConfigDB/LocalConfiguration/Simple