Support global configuration database updates
This commit is contained in:
parent
1eeb341f16
commit
d7692b628f
|
@ -24,5 +24,9 @@
|
|||
ConfigKey ConfigKeyRef::decodeKey(KeyRef const& key) {
|
||||
auto tuple = Tuple::unpack(key);
|
||||
ASSERT(tuple.size() == 2); // TODO: Fail gracefully
|
||||
return ConfigKeyRef(tuple.getString(0), tuple.getString(1));
|
||||
if (tuple.getType(0) == Tuple::NULL_TYPE) {
|
||||
return ConfigKeyRef({}, tuple.getString(1));
|
||||
} else {
|
||||
return ConfigKeyRef(tuple.getString(0), tuple.getString(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,14 +25,16 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
struct ConfigKeyRef {
|
||||
KeyRef configClass;
|
||||
// Empty config class means the update is global
|
||||
Optional<KeyRef> configClass;
|
||||
KeyRef knobName;
|
||||
|
||||
ConfigKeyRef() = default;
|
||||
ConfigKeyRef(KeyRef configClass, KeyRef knobName) : configClass(configClass), knobName(knobName) {}
|
||||
ConfigKeyRef(Arena& arena, KeyRef configClass, KeyRef knobName)
|
||||
explicit ConfigKeyRef(Optional<KeyRef> configClass, KeyRef knobName)
|
||||
: configClass(configClass), knobName(knobName) {}
|
||||
explicit ConfigKeyRef(Arena& arena, Optional<KeyRef> configClass, KeyRef knobName)
|
||||
: configClass(arena, configClass), knobName(arena, knobName) {}
|
||||
ConfigKeyRef(Arena& arena, ConfigKeyRef const& rhs) : ConfigKeyRef(arena, rhs.configClass, rhs.knobName) {}
|
||||
explicit ConfigKeyRef(Arena& arena, ConfigKeyRef const& rhs) : ConfigKeyRef(arena, rhs.configClass, rhs.knobName) {}
|
||||
|
||||
static Standalone<ConfigKeyRef> decodeKey(KeyRef const&);
|
||||
|
||||
|
@ -72,11 +74,11 @@ public:
|
|||
|
||||
ConfigKeyRef getKey() const { return key; }
|
||||
|
||||
KeyRef getConfigClass() const { return key.configClass; }
|
||||
Optional<KeyRef> getConfigClass() const { return key.configClass; }
|
||||
|
||||
KeyRef getKnobName() const { return key.knobName; }
|
||||
|
||||
ValueRef getValue() const { return value.get(); }
|
||||
Optional<ValueRef> getValue() const { return value; }
|
||||
|
||||
ConfigMutationRef(Arena& arena, ConfigMutationRef const& rhs)
|
||||
: key(arena, rhs.key), value(arena, rhs.value), description(arena, rhs.description), timestamp(rhs.timestamp) {}
|
||||
|
|
|
@ -25,8 +25,8 @@
|
|||
|
||||
namespace {
|
||||
|
||||
bool matchesConfigClass(Optional<ConfigClassSet> const& configClassSet, KeyRef configClass) {
|
||||
return (!configClassSet.present() || configClassSet.get().contains(configClass));
|
||||
bool matchesConfigClass(Optional<ConfigClassSet> const& configClassSet, Optional<KeyRef> configClass) {
|
||||
return !configClassSet.present() || !configClass.present() || configClassSet.get().contains(configClass.get());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -61,7 +61,11 @@ class ConfigBroadcasterImpl {
|
|||
when(ConfigFollowerGetSnapshotRequest req = waitNext(cfi.getSnapshot.getFuture())) {
|
||||
++impl->snapshotRequest;
|
||||
ConfigFollowerGetSnapshotReply reply;
|
||||
reply.snapshot = impl->snapshot;
|
||||
for (const auto& [key, value] : impl->snapshot) {
|
||||
if (matchesConfigClass(req.configClassSet, key.configClass)) {
|
||||
reply.snapshot[key] = value;
|
||||
}
|
||||
}
|
||||
for (const auto& versionedMutation : impl->versionedMutations) {
|
||||
const auto& version = versionedMutation.version;
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
|
@ -76,7 +80,7 @@ class ConfigBroadcasterImpl {
|
|||
.detail("KnobName", mutation.getKnobName())
|
||||
.detail("KnobValue", mutation.getValue());
|
||||
if (mutation.isSet()) {
|
||||
reply.snapshot[mutation.getKey()] = mutation.getValue();
|
||||
reply.snapshot[mutation.getKey()] = mutation.getValue().get();
|
||||
} else {
|
||||
reply.snapshot.erase(mutation.getKey());
|
||||
}
|
||||
|
@ -125,7 +129,7 @@ class ConfigBroadcasterImpl {
|
|||
.detail("KnobValue", mutation.getValue())
|
||||
.detail("LastCompactedVersion", impl->lastCompactedVersion);
|
||||
if (mutation.isSet()) {
|
||||
impl->snapshot[mutation.getKey()] = mutation.getValue();
|
||||
impl->snapshot[mutation.getKey()] = mutation.getValue().get();
|
||||
} else {
|
||||
impl->snapshot.erase(mutation.getKey());
|
||||
}
|
||||
|
|
|
@ -69,11 +69,15 @@ Future<Void> setTestSnapshot(ConfigStore* configStore, Version* lastWrittenVersi
|
|||
|
||||
void appendVersionedMutation(Standalone<VectorRef<VersionedConfigMutationRef>>& versionedMutations,
|
||||
Version version,
|
||||
KeyRef configClass,
|
||||
Optional<KeyRef> configClass,
|
||||
KeyRef knobName,
|
||||
Optional<ValueRef> knobValue) {
|
||||
Tuple tuple;
|
||||
tuple << configClass;
|
||||
if (configClass.present()) {
|
||||
tuple.append(configClass.get());
|
||||
} else {
|
||||
tuple.appendNull();
|
||||
}
|
||||
tuple << knobName;
|
||||
auto mutation = ConfigMutationRef::createConfigMutation(tuple.pack(), knobValue);
|
||||
versionedMutations.emplace_back_deep(versionedMutations.arena(), version, mutation);
|
||||
|
@ -113,6 +117,14 @@ Future<Void> addClearTestUpdate(ConfigStore& configStore, Version& lastWrittenVe
|
|||
return configStore.addVersionedMutations(versionedMutations, lastWrittenVersion);
|
||||
}
|
||||
|
||||
template <class ConfigStore>
|
||||
Future<Void> addGlobalTestUpdate(ConfigStore& configStore, Version& lastWrittenVersion, ValueRef value) {
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
|
||||
++lastWrittenVersion;
|
||||
appendVersionedMutation(versionedMutations, lastWrittenVersion, {}, "test_long"_sr, value);
|
||||
return configStore.addVersionedMutations(versionedMutations, lastWrittenVersion);
|
||||
}
|
||||
|
||||
ACTOR template <class ConfigStore>
|
||||
Future<Void> runTestUpdates(ConfigStore* configStore, Version* lastWrittenVersion) {
|
||||
wait(setTestSnapshot(configStore, lastWrittenVersion));
|
||||
|
@ -158,7 +170,7 @@ TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/FreshRestart") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Manual") {
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ManualOverride") {
|
||||
state LocalConfiguration localConfiguration("class-A/class-B", { { "test_long"_sr, "1000"_sr } });
|
||||
state Version lastWrittenVersion = 0;
|
||||
wait(localConfiguration.initialize("./", deterministicRandom()->randomUniqueID()));
|
||||
|
@ -167,7 +179,7 @@ TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Manual") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Conflicting") {
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ConfigClassOverride") {
|
||||
state LocalConfiguration localConfiguration("class-A/class-B", {});
|
||||
state Version lastWrittenVersion = 0;
|
||||
wait(localConfiguration.initialize("./", deterministicRandom()->randomUniqueID()));
|
||||
|
@ -176,6 +188,16 @@ TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Conflicting") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/GlobalConfigClassOverride") {
|
||||
state LocalConfiguration localConfiguration("class-A/class-B", {});
|
||||
state Version lastWrittenVersion = 0;
|
||||
wait(localConfiguration.initialize("./", deterministicRandom()->randomUniqueID()));
|
||||
wait(addSequentialTestUpdates(localConfiguration, lastWrittenVersion));
|
||||
wait(addGlobalTestUpdate(localConfiguration, lastWrittenVersion, "100"_sr));
|
||||
ASSERT(localConfiguration.getTestKnobs().TEST_LONG == 100);
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Clear") {
|
||||
state LocalConfiguration localConfiguration("class-A", {});
|
||||
state Version lastWrittenVersion = 0;
|
||||
|
|
|
@ -71,9 +71,8 @@ struct ConfigFollowerGetSnapshotReply {
|
|||
std::map<ConfigKey, Value> snapshot;
|
||||
|
||||
ConfigFollowerGetSnapshotReply() = default;
|
||||
explicit ConfigFollowerGetSnapshotReply(std::map<ConfigKey, Value> const& snapshot) : snapshot(snapshot) {
|
||||
// TODO: Support move constructor as well
|
||||
}
|
||||
explicit ConfigFollowerGetSnapshotReply(std::map<ConfigKey, Value>&& snapshot) : snapshot(std::move(snapshot)) {}
|
||||
explicit ConfigFollowerGetSnapshotReply(std::map<ConfigKey, Value> const& snapshot) : snapshot(snapshot) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
@ -133,11 +132,11 @@ struct ConfigFollowerGetChangesReply {
|
|||
|
||||
struct ConfigFollowerGetChangesRequest {
|
||||
static constexpr FileIdentifier file_identifier = 178935;
|
||||
Version lastSeenVersion;
|
||||
Version lastSeenVersion{ 0 };
|
||||
Optional<ConfigClassSet> configClassSet;
|
||||
ReplyPromise<ConfigFollowerGetChangesReply> reply;
|
||||
|
||||
ConfigFollowerGetChangesRequest() : lastSeenVersion(::invalidVersion) {}
|
||||
ConfigFollowerGetChangesRequest() = default;
|
||||
explicit ConfigFollowerGetChangesRequest(Version lastSeenVersion, Optional<ConfigClassSet> const& configClassSet)
|
||||
: lastSeenVersion(lastSeenVersion), configClassSet(configClassSet) {}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ bool updateSingleKnob(Key knobName, Value knobValue, K& k, Rest&... rest) {
|
|||
|
||||
class ConfigKnobOverrides {
|
||||
Standalone<VectorRef<KeyRef>> configPath;
|
||||
std::map<Key, std::map<Key, Value>> configClassToKnobToValue;
|
||||
std::map<Optional<Key>, std::map<Key, Value>> configClassToKnobToValue;
|
||||
|
||||
public:
|
||||
ConfigKnobOverrides() = default;
|
||||
|
@ -60,23 +60,30 @@ public:
|
|||
configPath.push_back_deep(configPath.arena(), s.eat("/"_sr));
|
||||
configClassToKnobToValue[configPath.back()] = {};
|
||||
}
|
||||
configClassToKnobToValue[{}] = {};
|
||||
}
|
||||
ConfigClassSet getConfigClassSet() const { return ConfigClassSet(configPath); }
|
||||
void set(KeyRef configClass, KeyRef knobName, ValueRef value) {
|
||||
configClassToKnobToValue[configClass][knobName] = value;
|
||||
void set(Optional<KeyRef> configClass, KeyRef knobName, ValueRef value) {
|
||||
configClassToKnobToValue[configClass.castTo<Key>()][knobName] = value;
|
||||
}
|
||||
void remove(Optional<KeyRef> configClass, KeyRef knobName) {
|
||||
configClassToKnobToValue[configClass.castTo<Key>()].erase(knobName);
|
||||
}
|
||||
void remove(KeyRef configClass, KeyRef knobName) { configClassToKnobToValue[configClass].erase(knobName); }
|
||||
|
||||
template <class... KS>
|
||||
void update(KS&... knobCollections) const {
|
||||
for (const auto& configClass : configPath) {
|
||||
const auto& knobToValue = configClassToKnobToValue.find(configClass);
|
||||
ASSERT(knobToValue != configClassToKnobToValue.end());
|
||||
for (const auto& [knobName, knobValue] : knobToValue->second) {
|
||||
const auto& knobToValue = configClassToKnobToValue.at(configClass);
|
||||
for (const auto& [knobName, knobValue] : knobToValue) {
|
||||
// Assert here because we should be validating on the client
|
||||
ASSERT(updateSingleKnob(knobName, knobValue, knobCollections...));
|
||||
}
|
||||
}
|
||||
// TODO: Test this
|
||||
const auto& knobToValue = configClassToKnobToValue.at({});
|
||||
for (const auto& [knobName, knobValue] : knobToValue) {
|
||||
ASSERT(updateSingleKnob(knobName, knobValue, knobCollections...));
|
||||
}
|
||||
}
|
||||
|
||||
bool hasSameConfigPath(ConfigKnobOverrides const& other) const { return configPath == other.configPath; }
|
||||
|
@ -105,6 +112,11 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR template <class F>
|
||||
Future<Void> forever(F f) {
|
||||
loop { wait(f()); }
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
class LocalConfigurationImpl : public NonCopyable {
|
||||
|
@ -241,8 +253,10 @@ class LocalConfigurationImpl : public NonCopyable {
|
|||
const auto &mutation = versionedMutation.mutation;
|
||||
auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion());
|
||||
if (mutation.isSet()) {
|
||||
self->kvStore->set(KeyValueRef(serializedKey.withPrefix(knobOverrideKeys.begin), mutation.getValue()));
|
||||
self->configKnobOverrides.set(mutation.getConfigClass(), mutation.getKnobName(), mutation.getValue());
|
||||
self->kvStore->set(
|
||||
KeyValueRef(serializedKey.withPrefix(knobOverrideKeys.begin), mutation.getValue().get()));
|
||||
self->configKnobOverrides.set(
|
||||
mutation.getConfigClass(), mutation.getKnobName(), mutation.getValue().get());
|
||||
} else {
|
||||
self->kvStore->clear(singleKeyRange(serializedKey.withPrefix(knobOverrideKeys.begin)));
|
||||
self->configKnobOverrides.remove(mutation.getConfigClass(), mutation.getKnobName());
|
||||
|
@ -278,7 +292,7 @@ class LocalConfigurationImpl : public NonCopyable {
|
|||
ACTOR static Future<Void> consume(LocalConfiguration* self,
|
||||
LocalConfigurationImpl* impl,
|
||||
Reference<IDependentAsyncVar<ConfigFollowerInterface> const> broadcaster) {
|
||||
wait(impl->initFuture);
|
||||
ASSERT(impl->initFuture.isReady());
|
||||
loop { wait(consumeLoopIteration(self, impl, broadcaster)); }
|
||||
}
|
||||
|
||||
|
|
|
@ -210,14 +210,8 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations = wait(getMutations(self, 0, req.version));
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
const auto &mutation = versionedMutation.mutation;
|
||||
if (mutation.isSet()) {
|
||||
if (mutation.getKey() == req.key) {
|
||||
value = mutation.getValue();
|
||||
}
|
||||
} else {
|
||||
if (mutation.getKey() == req.key) {
|
||||
value = Optional<Value>{};
|
||||
}
|
||||
if (mutation.getKey() == req.key) {
|
||||
value = mutation.getValue().castTo<Value>();
|
||||
}
|
||||
}
|
||||
req.reply.send(ConfigTransactionGetReply(value));
|
||||
|
@ -370,7 +364,7 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
for (const auto& versionedMutation : versionedMutations) {
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (mutation.isSet()) {
|
||||
reply.snapshot[mutation.getKey()] = mutation.getValue();
|
||||
reply.snapshot[mutation.getKey()] = mutation.getValue().get();
|
||||
} else {
|
||||
reply.snapshot.erase(mutation.getKey());
|
||||
}
|
||||
|
@ -407,7 +401,7 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
.detail("ReqVersion", req.version);
|
||||
auto serializedKey = BinaryWriter::toValue(mutation.getKey(), IncludeVersion());
|
||||
if (mutation.isSet()) {
|
||||
self->kvStore->set(KeyValueRef(serializedKey.withPrefix(kvKeys.begin), mutation.getValue()));
|
||||
self->kvStore->set(KeyValueRef(serializedKey.withPrefix(kvKeys.begin), mutation.getValue().get()));
|
||||
} else {
|
||||
self->kvStore->clear(singleKeyRange(serializedKey.withPrefix(kvKeys.begin)));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue