Break down configuration database unit tests and fix some more bugs

This commit is contained in:
sfc-gh-tclinkenbeard 2021-05-16 23:14:46 -07:00
parent 1941d0eab3
commit 4b848867f9
8 changed files with 214 additions and 200 deletions

View File

@ -186,6 +186,10 @@ ConfigBroadcaster::ConfigBroadcaster(ClusterConnectionString const& ccs, UID id)
ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators, UID id)
: impl(std::make_unique<ConfigBroadcasterImpl>(coordinators, id)) {}
ConfigBroadcaster::ConfigBroadcaster(ConfigBroadcaster&&) = default;
ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
ConfigBroadcaster::~ConfigBroadcaster() = default;
Future<Void> ConfigBroadcaster::serve(ConfigFollowerInterface const& cfi) {

View File

@ -33,6 +33,8 @@ public:
ConfigBroadcaster(ConfigFollowerInterface const&, UID id);
ConfigBroadcaster(ClusterConnectionString const&, UID id);
ConfigBroadcaster(ServerCoordinators const&, UID id);
ConfigBroadcaster(ConfigBroadcaster&&);
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster();
Future<Void> serve(ConfigFollowerInterface const&);
Future<Void> addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> const&,

View File

@ -54,121 +54,86 @@ Value longToValue(int64_t v) {
return StringRef(reinterpret_cast<uint8_t const*>(s.c_str()), s.size());
}
Future<Void> addTestClearMutations(SimpleConfigTransaction& tr, Version version /* TODO: shouldn't need this */) {
Future<Void> addTestClearMutation(SimpleConfigTransaction& tr, Optional<KeyRef> configClass) {
tr.fullReset();
auto configKeyA = encodeConfigKey("class-A"_sr, "test_long"_sr);
auto configKeyB = encodeConfigKey("class-B"_sr, "test_long"_sr);
tr.clear(configKeyA);
tr.clear(configKeyB);
auto configKey = encodeConfigKey(configClass, "test_long"_sr);
tr.clear(configKey);
return tr.commit();
}
Future<Void> addTestGlobalSetMutation(SimpleConfigTransaction& tr, int64_t value) {
Future<Void> addTestSetMutation(SimpleConfigTransaction& tr, Optional<KeyRef> configClass, int64_t value) {
tr.fullReset();
auto configKey = encodeConfigKey({}, "test_long"_sr);
auto configKey = encodeConfigKey(configClass, "test_long"_sr);
tr.set(configKey, longToValue(value));
return tr.commit();
}
Future<Void> addTestSetMutations(SimpleConfigTransaction& tr, int64_t value) {
tr.fullReset();
auto configKeyA = encodeConfigKey("class-A"_sr, "test_long"_sr);
auto configKeyB = encodeConfigKey("class-B"_sr, "test_long"_sr);
tr.set(configKeyA, longToValue(value));
tr.set(configKeyB, longToValue(value * 10));
return tr.commit();
}
template <class WriteTo>
Future<Void> addTestClearMutations(WriteTo& writeTo, Version version) {
Future<Void> addTestClearMutation(WriteTo& writeTo, Optional<KeyRef> configKey, Version version) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
appendVersionedMutation(versionedMutations, version, "class-A"_sr, "test_long"_sr, {});
appendVersionedMutation(versionedMutations, version, "class-B"_sr, "test_long"_sr, {});
appendVersionedMutation(versionedMutations, version, configKey, "test_long"_sr, {});
return writeTo.addVersionedMutations(versionedMutations, version);
}
template <class WriteTo>
Future<Void> addTestGlobalSetMutation(WriteTo& writeTo, int64_t value) {
Future<Void> addTestSetMutation(WriteTo& writeTo, Optional<KeyRef> configClass, int64_t value, Version version) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
Version version = value;
appendVersionedMutation(versionedMutations, version, "class-A"_sr, "test_long"_sr, longToValue(value));
appendVersionedMutation(versionedMutations, version, configClass, "test_long"_sr, longToValue(value));
return writeTo.addVersionedMutations(versionedMutations, version);
}
template <class WriteTo>
Future<Void> addTestSetMutations(WriteTo& writeTo, int64_t value) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
Version version = value;
appendVersionedMutation(versionedMutations, version, "class-A"_sr, "test_long"_sr, longToValue(value));
appendVersionedMutation(versionedMutations, version, "class-B"_sr, "test_long"_sr, longToValue(value * 10));
return writeTo.addVersionedMutations(versionedMutations, version);
}
ACTOR Future<Void> readConfigState(SimpleConfigTransaction* tr,
Optional<int64_t> expected,
bool immediate /* TODO : remove this */) {
ACTOR Future<Void> checkImmediate(SimpleConfigTransaction* tr,
Optional<KeyRef> configClass,
Optional<int64_t> expected) {
tr->fullReset();
state Key configKeyA = encodeConfigKey("class-A"_sr, "test_long"_sr);
state Key configKeyB = encodeConfigKey("class-B"_sr, "test_long"_sr);
state Optional<Value> valueA = wait(tr->get(configKeyA));
state Optional<Value> valueB = wait(tr->get(configKeyB));
state Key configKey = encodeConfigKey(configClass, "test_long"_sr);
state Optional<Value> value = wait(tr->get(configKey));
if (expected.present()) {
ASSERT(valueA.get() == longToValue(expected.get()));
ASSERT(valueB.get() == longToValue(expected.get() * 10));
ASSERT(value.get() == longToValue(expected.get()));
} else {
ASSERT(!valueA.present());
ASSERT(!valueB.present());
ASSERT(!value.present());
}
return Void();
}
ACTOR Future<Void> readConfigState(LocalConfiguration const* localConfiguration,
Optional<int64_t> expected,
bool immediate) {
if (immediate) {
if (expected.present()) {
ASSERT_EQ(localConfiguration->getTestKnobs().TEST_LONG, expected.get());
} else {
ASSERT_EQ(localConfiguration->getTestKnobs().TEST_LONG, 0);
}
return Void();
void checkImmediate(LocalConfiguration const& localConfiguration, Optional<int64_t> expected) {
if (expected.present()) {
ASSERT_EQ(localConfiguration.getTestKnobs().TEST_LONG, expected.get());
} else {
loop {
if (localConfiguration->getTestKnobs().TEST_LONG == (expected.present() ? expected.get() : 0)) {
return Void();
}
wait(delayJittered(0.1));
}
ASSERT_EQ(localConfiguration.getTestKnobs().TEST_LONG, 0);
}
}
template <class ConfigStore>
Future<Void> addTestGlobalSetMutation(ConfigStore& configStore, Version& lastWrittenVersion, ValueRef value) {
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations;
++lastWrittenVersion;
appendVersionedMutation(versionedMutations, lastWrittenVersion, {}, "test_long"_sr, value);
return configStore.addVersionedMutations(versionedMutations, lastWrittenVersion);
ACTOR Future<Void> checkEventually(LocalConfiguration const* localConfiguration, Optional<int64_t> expected) {
loop {
if (localConfiguration->getTestKnobs().TEST_LONG == (expected.present() ? expected.get() : 0)) {
return Void();
}
wait(delayJittered(0.1));
}
}
class LocalConfigEnvironment {
LocalConfiguration localConfiguration;
UID id;
Version lastWrittenVersion{ 0 };
public:
LocalConfigEnvironment(std::string const& configPath, std::map<Key, Value> const& manualKnobOverrides)
: localConfiguration(configPath, manualKnobOverrides), id(deterministicRandom()->randomUniqueID()) {}
Future<Void> setup() { return localConfiguration.initialize("./", id); }
Future<Void> restart(std::string const& newConfigPath) {
localConfiguration = LocalConfiguration(newConfigPath, {});
return setup();
}
template <class TestType, class... Args>
Future<Void> run(Args&&... args) {
return TestType::run(localConfiguration, localConfiguration, std::forward<Args>(args)...);
Future<Void> getError() const { return Never(); }
Future<Void> clear(Optional<KeyRef> configClass) {
return addTestClearMutation(localConfiguration, configClass, ++lastWrittenVersion);
}
Future<Void> set(Optional<KeyRef> configClass, int64_t value) {
return addTestSetMutation(localConfiguration, configClass, value, ++lastWrittenVersion);
}
void check(Optional<int64_t> value) const { checkImmediate(localConfiguration, value); }
};
class BroadcasterToLocalConfigEnvironment {
@ -186,7 +151,6 @@ class BroadcasterToLocalConfigEnvironment {
}
}
}
public:
Future<Void> serve() { return serve(this); }
ConfigFollowerInterface const& getInterface() { return cfi; }
@ -194,29 +158,40 @@ class BroadcasterToLocalConfigEnvironment {
ConfigBroadcaster broadcaster;
Reference<AsyncVar<ConfigFollowerInterface>> cfi;
LocalConfiguration localConfiguration;
Version lastWrittenVersion{ 0 };
Future<Void> broadcastServer;
ActorCollection actors{ false };
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) {
wait(self->localConfiguration.initialize("./", deterministicRandom()->randomUniqueID()));
self->actors.add(self->dummyConfigSource.serve());
self->actors.add(self->broadcaster.serve(self->cfi->get()));
self->broadcastServer = self->broadcaster.serve(self->cfi->get());
self->actors.add(
self->localConfiguration.consume(IDependentAsyncVar<ConfigFollowerInterface>::create(self->cfi)));
return Void();
}
public:
BroadcasterToLocalConfigEnvironment()
BroadcasterToLocalConfigEnvironment(std::string const& configPath)
: broadcaster(dummyConfigSource.getInterface(), deterministicRandom()->randomUniqueID()),
cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()), localConfiguration("class-A", {}) {}
cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()), localConfiguration(configPath, {}) {}
Future<Void> setup() { return setup(this); }
template <class TestType, class... Args>
Future<Void> run(Args&&... args) {
return waitOrError(TestType::run(broadcaster, localConfiguration, std::forward<Args>(args)...),
actors.getResult());
Future<Void> set(Optional<KeyRef> configClass, int64_t value) {
return addTestSetMutation(localConfiguration, configClass, value, ++lastWrittenVersion);
}
Future<Void> clear(Optional<KeyRef> configClass) {
return addTestClearMutation(localConfiguration, configClass, ++lastWrittenVersion);
}
Future<Void> check(Optional<int64_t> value) const { return checkEventually(&localConfiguration, value); }
void changeBroadcaster() {
cfi->set(ConfigFollowerInterface());
broadcaster = ConfigBroadcaster(dummyConfigSource.getInterface(), deterministicRandom()->randomUniqueID());
broadcastServer = broadcaster.serve(cfi->get());
}
Future<Void> getError() const { return actors.getResult() || broadcastServer; }
};
class TransactionEnvironment {
@ -236,10 +211,12 @@ public:
Future<Void> setup() { return setup(this); }
template <class TestType, class... Args>
Future<Void> run(Args&&... args) {
return waitOrError(TestType::run(tr, tr, std::forward<Args>(args)...), actors.getResult());
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return addTestSetMutation(tr, configClass, value); }
Future<Void> clear(Optional<KeyRef> configClass) { return addTestClearMutation(tr, configClass); }
Future<Void> check(Optional<KeyRef> configClass, Optional<int64_t> expected) {
return checkImmediate(&tr, configClass, expected);
}
Future<Void> getError() const { return actors.getResult(); }
};
class TransactionToLocalConfigEnvironment {
@ -261,138 +238,181 @@ class TransactionToLocalConfigEnvironment {
}
public:
TransactionToLocalConfigEnvironment()
: cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()), tr(cti), localConfiguration("class-A", {}) {}
TransactionToLocalConfigEnvironment(std::string const& configPath)
: cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()), tr(cti), localConfiguration(configPath, {}) {}
Future<Void> setup() { return setup(this); }
template <class TestType, class... Args>
Future<Void> run(Args&&... args) {
return waitOrError(TestType::run(tr, localConfiguration, std::forward<Args>(args)...), actors.getResult());
}
Future<Void> set(Optional<KeyRef> configClass, int64_t value) { return addTestSetMutation(tr, configClass, value); }
Future<Void> clear(Optional<KeyRef> configClass) { return addTestClearMutation(tr, configClass); }
Future<Void> check(Optional<int64_t> value) { return checkEventually(&localConfiguration, value); }
Future<Void> getError() const { return actors.getResult(); }
};
class TestSet {
public:
template <class WriteTo, class ReadFrom>
static Future<Void> run(WriteTo& writeTo, ReadFrom& readFrom, int64_t expected, bool immediate) {
std::function<Future<Void>(Void const&)> check = [&readFrom, expected, immediate](Void const&) {
return readConfigState(&readFrom, expected, immediate);
};
return addTestSetMutations(writeTo, 1) >>= check;
}
};
class TestClear {
public:
template <class WriteTo, class ReadFrom>
static Future<Void> run(WriteTo& writeTo, ReadFrom& readFrom, bool immediate) {
std::function<Future<Void>(Void const&)> clear = [&writeTo](Void const&) {
return addTestClearMutations(writeTo, 2);
};
std::function<Future<Void>(Void const&)> check = [&readFrom, immediate](Void const&) {
return readConfigState(&readFrom, {}, immediate);
};
return (addTestSetMutations(writeTo, 1) >>= clear) >>= check;
}
};
class TestGlobal {
public:
template <class WriteTo>
static Future<Void> run(WriteTo& writeTo, LocalConfiguration const& readFrom, bool immediate) {
std::function<Future<Void>(Void const&)> globalWrite = [&writeTo](Void const&) {
return addTestGlobalSetMutation(writeTo, 2);
};
std::function<Future<Void>(Void const&)> check = [&readFrom, immediate](Void const&) {
return readConfigState(&readFrom, 2, immediate);
};
return (addTestSetMutations(writeTo, 1) >>= globalWrite) >>= check;
}
};
// TODO: Wait on error
template <class Env, class... Args>
Future<Void> set(Env& env, Args&&... args) {
return waitOrError(env.set(std::forward<Args>(args)...), env.getError());
}
template <class Env, class... Args>
Future<Void> clear(Env& env, Args&&... args) {
return waitOrError(env.clear(std::forward<Args>(args)...), env.getError());
}
template <class Env, class... Args>
Future<Void> check(Env& env, Args&&... args) {
return waitOrError(env.check(std::forward<Args>(args)...), env.getError());
}
} // namespace
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Set") {
state LocalConfigEnvironment environment("class-A", std::map<Key, Value>{});
wait(environment.setup());
wait(environment.run<TestSet>(1, true));
state LocalConfigEnvironment env("class-A", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
env.check(1);
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Restart") {
state LocalConfigEnvironment env("class-A/class-B", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(env.restart("class-A/class-B"));
env.check(1);
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/RestartFresh") {
state LocalConfigEnvironment env("class-A/class-B", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(env.restart("class-B/class-A"));
env.check(Optional<int64_t>{});
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Clear") {
state LocalConfigEnvironment environment("class-A", std::map<Key, Value>{});
wait(environment.setup());
wait(environment.run<TestClear>(true));
state LocalConfigEnvironment env("class-A", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(clear(env, "class-A"_sr));
env.check(Optional<int64_t>{});
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ManualOverride") {
state LocalConfigEnvironment environment("class-A", std::map<Key, Value>{ { "test_long"_sr, "1000"_sr } });
wait(environment.setup());
wait(environment.run<TestSet>(1000, true));
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/GlobalSet") {
state LocalConfigEnvironment env("class-A", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(set(env, Optional<KeyRef>{}, 10));
env.check(10);
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ConflictingOverride") {
state LocalConfigEnvironment environment("class-A/class-B", std::map<Key, Value>{});
wait(environment.setup());
wait(environment.run<TestSet>(10, true));
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/ConflictingOverrides") {
state LocalConfigEnvironment env("class-A/class-B", {});
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(set(env, "class-B"_sr, 10));
env.check(10);
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Global") {
state LocalConfigEnvironment environment("class-A", std::map<Key, Value>{});
wait(environment.setup());
wait(environment.run<TestGlobal>(true));
TEST_CASE("/fdbserver/ConfigDB/LocalConfiguration/Manual") {
state LocalConfigEnvironment env("class-A", { { "test_long"_sr, "1000"_sr } });
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
env.check(1000);
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Set") {
state BroadcasterToLocalConfigEnvironment environment;
wait(environment.setup());
wait(environment.run<TestSet>(1, false));
state BroadcasterToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(check(env, 1));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Clear") {
state BroadcasterToLocalConfigEnvironment environment;
wait(environment.setup());
wait(environment.run<TestClear>(false));
state BroadcasterToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(clear(env, "class-A"_sr));
wait(check(env, Optional<int64_t>{}));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Global") {
state BroadcasterToLocalConfigEnvironment environment;
wait(environment.setup());
wait(environment.run<TestGlobal>(false));
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/Ignore") {
state BroadcasterToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-B"_sr, 1));
choose {
when(wait(delay(5))) {}
when(wait(check(env, 1))) { ASSERT(false); }
}
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Set") {
state TransactionEnvironment environment;
wait(environment.setup());
wait(environment.run<TestSet>(1, true));
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/GlobalSet") {
state BroadcasterToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(set(env, Optional<KeyRef>{}, 10));
wait(check(env, 10));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Clear") {
state TransactionEnvironment environment;
wait(environment.setup());
wait(environment.run<TestClear>(true));
TEST_CASE("/fdbserver/ConfigDB/BroadcasterToLocalConfig/ChangeBroadcaster") {
state BroadcasterToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(check(env, 1));
env.changeBroadcaster();
wait(set(env, "class-A"_sr, 2));
wait(check(env, 2));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/Set") {
state TransactionToLocalConfigEnvironment environment;
wait(environment.setup());
wait(environment.run<TestSet>(1, false));
state TransactionToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(check(env, 1));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/Clear") {
state TransactionToLocalConfigEnvironment environment;
wait(environment.setup());
wait(environment.run<TestClear>(false));
state TransactionToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(clear(env, "class-A"_sr));
wait(check(env, Optional<int64_t>{}));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/TransactionToLocalConfig/GlobalSet") {
state TransactionToLocalConfigEnvironment env("class-A");
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(set(env, Optional<KeyRef>{}, 10));
wait(check(env, 10));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Set") {
state TransactionEnvironment env;
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(check(env, "class-A"_sr, 1));
return Void();
}
TEST_CASE("/fdbserver/ConfigDB/Transaction/Clear") {
state TransactionEnvironment env;
wait(env.setup());
wait(set(env, "class-A"_sr, 1));
wait(clear(env, "class-A"_sr));
wait(check(env, "class-A"_sr, Optional<int64_t>{}));
return Void();
}

View File

@ -277,7 +277,7 @@ class LocalConfigurationImpl : public NonCopyable {
state SimpleConfigConsumer consumer(broadcaster->get(),
impl->configKnobOverrides.getConfigClassSet(),
impl->lastSeenVersion,
Optional<double>{},
0.5,
Optional<double>{},
deterministicRandom()->randomUniqueID());
TraceEvent(SevDebug, "LocalConfigurationStartingConsumer", impl->id)

View File

@ -84,8 +84,6 @@ TEST_CASE("/fdbserver/ConfigDB/SimpleConfigDatabaseNode/Internal/versionedMutati
class SimpleConfigDatabaseNodeImpl {
IKeyValueStore* kvStore; // FIXME: Prevent leak
std::map<std::string, std::string> config;
ActorCollection actors{ false };
FlowLock globalLock;
Future<Void> initFuture;
UID id;
@ -162,8 +160,6 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> getChanges(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetChangesRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
Version lastCompactedVersion = wait(getLastCompactedVersion(self));
if (req.lastSeenVersion < lastCompactedVersion) {
++self->failedChangeRequests;
@ -189,8 +185,6 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> getNewVersion(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetVersionRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state Version currentVersion = wait(getLiveTransactionVersion(self));
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion())));
wait(self->kvStore->commit());
@ -199,8 +193,6 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> get(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
req.reply.sendError(transaction_too_old());
@ -272,8 +264,6 @@ class SimpleConfigDatabaseNodeImpl {
*/
ACTOR static Future<Void> commit(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionCommitRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
Version currentVersion = wait(getLiveTransactionVersion(self));
if (req.version != currentVersion) {
++self->failedCommits;
@ -299,8 +289,6 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> traceQueuedMutations(SimpleConfigDatabaseNodeImpl *self) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state Version currentVersion = wait(getCommittedVersion(self));
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
wait(getMutations(self, 0, currentVersion));
@ -326,27 +314,24 @@ class SimpleConfigDatabaseNodeImpl {
choose {
when(ConfigTransactionGetVersionRequest req = waitNext(cti->getVersion.getFuture())) {
++self->newVersionRequests;
self->actors.add(getNewVersion(self, req));
wait(getNewVersion(self, req));
}
when(ConfigTransactionGetRequest req = waitNext(cti->get.getFuture())) {
++self->getValueRequests;
self->actors.add(get(self, req));
wait(get(self, req));
}
when(ConfigTransactionCommitRequest req = waitNext(cti->commit.getFuture())) {
self->actors.add(commit(self, req));
wait(commit(self, req));
}
when(ConfigTransactionGetRangeRequest req = waitNext(cti->getRange.getFuture())) {
// FIXME: Fix and reenable
// self->actors.add(getRange(self, req));
// wait(getRange(self, req));
}
when(wait(self->actors.getResult())) { ASSERT(false); }
}
}
}
ACTOR static Future<Void> getSnapshot(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerGetSnapshotRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state ConfigFollowerGetSnapshotReply reply;
Standalone<RangeResultRef> data = wait(self->kvStore->readRange(kvKeys));
for (const auto& kv : data) {
@ -376,8 +361,6 @@ class SimpleConfigDatabaseNodeImpl {
}
ACTOR static Future<Void> compact(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerCompactRequest req) {
wait(self->globalLock.take());
state FlowLock::Releaser releaser(self->globalLock);
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
TraceEvent(SevDebug, "ConfigDatabaseNodeCompacting")
.detail("Version", req.version)
@ -423,20 +406,19 @@ class SimpleConfigDatabaseNodeImpl {
choose {
when(ConfigFollowerGetVersionRequest req = waitNext(cfi->getVersion.getFuture())) {
++self->committedVersionRequests;
self->actors.add(getCommittedVersion(self, req));
wait(getCommittedVersion(self, req));
}
when(ConfigFollowerGetSnapshotRequest req = waitNext(cfi->getSnapshot.getFuture())) {
++self->snapshotRequests;
self->actors.add(getSnapshot(self, req));
wait(getSnapshot(self, req));
}
when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) {
self->actors.add(getChanges(self, req));
wait(getChanges(self, req));
}
when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) {
++self->compactRequests;
self->actors.add(compact(self, req));
wait(compact(self, req));
}
when(wait(self->actors.getResult())) { ASSERT(false); }
}
}
}
@ -450,6 +432,12 @@ public:
setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
getValueRequests("GetValueRequests", cc), newVersionRequests("NewVersionRequests", cc) {}
~SimpleConfigDatabaseNodeImpl() {
if (kvStore) {
kvStore->close();
}
}
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
Future<Void> serve(ConfigFollowerInterface const& cfi) { return serve(this, &cfi); }
@ -457,7 +445,7 @@ public:
Future<Void> initialize(std::string const& dataFolder, UID id) {
platform::createDirectory(dataFolder);
this->id = id;
kvStore = keyValueStoreMemory(joinPath(dataFolder, "globalconf-"), id, 500e6);
kvStore = keyValueStoreMemory(joinPath(dataFolder, "globalconf-" + id.toString()), id, 500e6);
logger = traceCounters(
"ConfigDatabaseNodeMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigDatabaseNode");
initFuture = kvStore->init();

View File

@ -227,7 +227,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
"",
-1,
whitelistBinPaths,
{}));
""));
}
if (runBackupAgents != AgentNone) {
futures.push_back(runBackup(connFile));

View File

@ -72,7 +72,7 @@ public:
}
void add(Future<Void> a) { m_add.send(a); }
Future<Void> getResult() { return m_out; }
Future<Void> getResult() const { return m_out; }
void clear(bool returnWhenEmptied) {
m_out.cancel();
m_out = actorCollection(m_add.getFuture(), nullptr, nullptr, nullptr, nullptr, returnWhenEmptied);

View File

@ -1881,9 +1881,9 @@ Future<U> runAfter(Future<T> lhs, Future<U> rhs) {
return res;
}
template <class T, class Res>
Future<Res> operator>>=(Future<T> lhs, std::function<Future<Res>(T const&)> rhs) {
return runAfter(lhs, rhs);
template <class T, class Fun>
auto operator>>=(Future<T> lhs, Fun&& rhs) -> Future<decltype(rhs(std::declval<T>()))> {
return runAfter(lhs, std::forward<Fun>(rhs));
}
template <class T, class U>