diff --git a/fdbserver/ConfigFollowerInterface.h b/fdbserver/ConfigFollowerInterface.h index 8b396d7558..033d0e6f5b 100644 --- a/fdbserver/ConfigFollowerInterface.h +++ b/fdbserver/ConfigFollowerInterface.h @@ -78,19 +78,32 @@ struct ConfigFollowerGetFullDatabaseRequest { } }; +struct VersionedMutationRef { + Version version; + MutationRef mutation; + + VersionedMutationRef()=default; + explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {} + + template + void serialize(Ar &ar) { + serializer(ar, version, mutation); + } +}; + struct ConfigFollowerGetChangesReply { static constexpr FileIdentifier file_identifier = 234859; Version mostRecentVersion; - Standalone> mutations; + Standalone> versionedMutations; ConfigFollowerGetChangesReply() : mostRecentVersion(-1) {} explicit ConfigFollowerGetChangesReply(Version mostRecentVersion, - Standalone> const& mutations) - : mostRecentVersion(mostRecentVersion), mutations(mutations) {} + Standalone> const& versionedMutations) + : mostRecentVersion(mostRecentVersion), versionedMutations(versionedMutations) {} template void serialize(Ar& ar) { - serializer(ar, mostRecentVersion, mutations); + serializer(ar, mostRecentVersion, versionedMutations); } }; diff --git a/fdbserver/SimpleConfigBroadcaster.actor.cpp b/fdbserver/SimpleConfigBroadcaster.actor.cpp index 7ef8b0b48d..b238bdd38b 100644 --- a/fdbserver/SimpleConfigBroadcaster.actor.cpp +++ b/fdbserver/SimpleConfigBroadcaster.actor.cpp @@ -22,21 +22,100 @@ class SimpleConfigBroadcasterImpl { ConfigFollowerInterface subscriber; + std::map fullDatabase; + Standalone> versionedMutations; + Version lastCompactedVersion; + Version mostRecentVersion; + ActorCollection actors{ false }; + + static const double POLLING_INTERVAL; // TODO: Make knob? + + ACTOR static Future fetchUpdates(SimpleConfigBroadcasterImpl *self) { + loop { + ConfigFollowerGetChangesReply reply = wait(self->subscriber.getChanges.getReply(ConfigFollowerGetChangesRequest{self->mostRecentVersion, {}})); + for (const auto &versionedMutation : reply.versionedMutations) { + self->versionedMutations.push_back(self->versionedMutations.arena(), versionedMutation); + } + self->mostRecentVersion = reply.mostRecentVersion; + wait(delay(POLLING_INTERVAL)); + } + } + + void traceQueuedMutations() { + TraceEvent te("SimpleConfigBroadcasterQueuedMutations"); + te.detail("Size", versionedMutations.size()); + int index = 0; + for (const auto &versionedMutation : versionedMutations) { + te.detail(format("Version%d", index), versionedMutation.version); + te.detail(format("Mutation%d", index), versionedMutation.mutation.type); + te.detail(format("FirstParam%d", index), versionedMutation.mutation.param1); + te.detail(format("SecondParam%d", index), versionedMutation.mutation.param2); + ++index; + } + } + + ACTOR static Future serve(SimpleConfigBroadcasterImpl *self, ConfigFollowerInterface *publisher) { + ConfigFollowerGetVersionReply versionReply = wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{})); + self->mostRecentVersion = versionReply.version; + ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{self->mostRecentVersion, Optional{}})); + self->fullDatabase = reply.database; + self->actors.add(fetchUpdates(self)); + loop { + self->traceQueuedMutations(); + choose { + when(ConfigFollowerGetVersionRequest req = waitNext(publisher->getVersion.getFuture())) { + req.reply.send(self->mostRecentVersion); + } + when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher->getFullDatabase.getFuture())) { + ConfigFollowerGetFullDatabaseReply reply; + reply.database = self->fullDatabase; + for (const auto &versionedMutation : self->versionedMutations) { + const auto &version = versionedMutation.version; + const auto &mutation = versionedMutation.mutation; + if (version > req.version) { + break; + } + if (mutation.type == MutationRef::SetValue) { + reply.database[mutation.param1] = mutation.param2; + } else if (mutation.type == MutationRef::ClearRange) { + reply.database.erase(reply.database.find(mutation.param1), reply.database.find(mutation.param2)); + } + } + req.reply.send(ConfigFollowerGetFullDatabaseReply{self->fullDatabase}); + } + when(ConfigFollowerGetChangesRequest req = waitNext(publisher->getChanges.getFuture())) { + ConfigFollowerGetChangesReply reply; + reply.mostRecentVersion = self->mostRecentVersion; + for (const auto &versionedMutation : self->versionedMutations) { + if (versionedMutation.version > req.lastSeenVersion) { + reply.versionedMutations.push_back(reply.versionedMutations.arena(), versionedMutation); + } + } + req.reply.send(reply); + } + when(ConfigFollowerCompactRequest req = waitNext(publisher->compact.getFuture())) { + // TODO: Implement + req.reply.send(Void()); + } + when(wait(self->actors.getResult())) { ASSERT(false); } + } + } + } public: - SimpleConfigBroadcasterImpl(ClusterConnectionString const& ccs) { + SimpleConfigBroadcasterImpl(ClusterConnectionString const& ccs) : lastCompactedVersion(0), mostRecentVersion(0) { auto coordinators = ccs.coordinators(); std::sort(coordinators.begin(), coordinators.end()); subscriber = ConfigFollowerInterface(coordinators[0]); } Future serve(ConfigFollowerInterface& publisher) { - // TODO: Implement - wait(Never()); - return Void(); + return serve(this, &publisher); } }; +const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5; + SimpleConfigBroadcaster::SimpleConfigBroadcaster(ClusterConnectionString const& ccs) : impl(std::make_unique(ccs)) {} diff --git a/fdbserver/SimpleConfigDatabaseNode.actor.cpp b/fdbserver/SimpleConfigDatabaseNode.actor.cpp index 680fade7f2..3de2a75a07 100644 --- a/fdbserver/SimpleConfigDatabaseNode.actor.cpp +++ b/fdbserver/SimpleConfigDatabaseNode.actor.cpp @@ -29,18 +29,12 @@ namespace { -const KeyRef versionKey = LiteralStringRef("version"); +const KeyRef liveTransactionVersionKey = LiteralStringRef("liveTransactionVersion"); +const KeyRef committedVersionKey = LiteralStringRef("committedVersion"); const KeyRangeRef kvKeys = KeyRangeRef(LiteralStringRef("kv/"), LiteralStringRef("kv0")); const KeyRangeRef mutationKeys = KeyRangeRef(LiteralStringRef("mutation/"), LiteralStringRef("mutation0")); -struct VersionedMutationRef { - Version version; - MutationRef mutation; - - VersionedMutationRef()=default; // FIXME Undefined memory? - explicit VersionedMutationRef(Arena &arena, Version version, MutationRef mutation) : version(version), mutation(arena, mutation) {} -}; - +// FIXME: negative versions break ordering Key versionedMutationKey(Version version, int index) { BinaryWriter bw(IncludeVersion()); bw << version; @@ -63,24 +57,33 @@ class SimpleConfigDatabaseNodeImpl { ActorCollection actors{ false }; FlowLock globalLock; - ACTOR static Future getCurrentVersion(SimpleConfigDatabaseNodeImpl *self) { - Optional value = wait(self->kvStore->readValue(versionKey)); - state Version version = 0; + ACTOR static Future getLiveTransactionVersion(SimpleConfigDatabaseNodeImpl *self) { + Optional value = wait(self->kvStore->readValue(liveTransactionVersionKey)); + state Version liveTransactionVersion = 0; if (value.present()) { - BinaryReader br(value.get(), IncludeVersion()); - br >> version; + liveTransactionVersion = BinaryReader::fromStringRef(value.get(), IncludeVersion()); } else { - BinaryWriter bw(IncludeVersion()); - bw << version; - self->kvStore->set(KeyValueRef(versionKey, bw.toValue())); + self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(liveTransactionVersion, IncludeVersion()))); wait(self->kvStore->commit()); } - return version; + return liveTransactionVersion; } - ACTOR static Future>> getMutations(SimpleConfigDatabaseNodeImpl *self, int startVersion, int endVersion) { - Value serializedStartVersion = BinaryWriter::toValue(startVersion, IncludeVersion()); - KeyRangeRef keys(serializedStartVersion.withPrefix(mutationKeys.begin), mutationKeys.end); + ACTOR static Future getCommittedVersion(SimpleConfigDatabaseNodeImpl *self) { + Optional value = wait(self->kvStore->readValue(committedVersionKey)); + state Version committedVersion = 0; + if (value.present()) { + committedVersion = BinaryReader::fromStringRef(value.get(), IncludeVersion()); + } else { + self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(committedVersion, IncludeVersion()))); + wait(self->kvStore->commit()); + } + return committedVersion; + } + + ACTOR static Future>> getMutations(SimpleConfigDatabaseNodeImpl *self, Version startVersion, Version endVersion) { + Key startVersionKey = versionedMutationKey(startVersion, 0); + state KeyRangeRef keys(startVersionKey, mutationKeys.end); Standalone range = wait(self->kvStore->readRange(keys)); Standalone> result; for (const auto &kv : range) { @@ -94,17 +97,24 @@ class SimpleConfigDatabaseNodeImpl { return result; } - ACTOR static Future getCurrentVersion(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetVersionRequest req) { - Version version = wait(getCurrentVersion(self)); - req.reply.send(ConfigFollowerGetVersionReply(version)); + ACTOR static Future getChanges(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetChangesRequest req) { + state Version committedVersion = wait(getCommittedVersion(self)); + Standalone> mutations = wait(getMutations(self, req.lastSeenVersion+1, committedVersion)); + req.reply.send(ConfigFollowerGetChangesReply{committedVersion, mutations}); + return Void(); + } + + ACTOR static Future getCommittedVersion(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetVersionRequest req) { + Version committedVersion = wait(getCommittedVersion(self)); + req.reply.send(ConfigFollowerGetVersionReply(committedVersion)); return Void(); } ACTOR static Future getNewVersion(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetVersionRequest req) { wait(self->globalLock.take()); state FlowLock::Releaser releaser(self->globalLock); - state Version currentVersion = wait(getCurrentVersion(self)); - self->kvStore->set(KeyValueRef(versionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion()))); + state Version currentVersion = wait(getLiveTransactionVersion(self)); + self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion()))); wait(self->kvStore->commit()); req.reply.send(ConfigTransactionGetVersionReply(currentVersion)); return Void(); @@ -113,13 +123,13 @@ class SimpleConfigDatabaseNodeImpl { ACTOR static Future get(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetRequest req) { wait(self->globalLock.take()); state FlowLock::Releaser releaser(self->globalLock); - Version currentVersion = wait(getCurrentVersion(self)); + Version currentVersion = wait(getLiveTransactionVersion(self)); if (req.version != currentVersion) { req.reply.sendError(transaction_too_old()); return Void(); } state Optional value = wait(self->kvStore->readValue(req.key.withPrefix(kvKeys.begin))); - Standalone> versionedMutations = wait(getMutations(self, -1, req.version)); + Standalone> versionedMutations = wait(getMutations(self, 0, req.version)); for (const auto &versionedMutation : versionedMutations) { const auto &mutation = versionedMutation.mutation; if (mutation.type == MutationRef::Type::SetValue) { @@ -141,7 +151,7 @@ class SimpleConfigDatabaseNodeImpl { ACTOR static Future commit(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionCommitRequest req) { wait(self->globalLock.take()); state FlowLock::Releaser releaser(self->globalLock); - Version currentVersion = wait(getCurrentVersion(self)); + Version currentVersion = wait(getLiveTransactionVersion(self)); if (req.version != currentVersion) { req.reply.sendError(transaction_too_old()); return Void(); @@ -152,13 +162,32 @@ class SimpleConfigDatabaseNodeImpl { Value value = BinaryWriter::toValue(mutation, IncludeVersion()); self->kvStore->set(KeyValueRef(key, value)); } + self->kvStore->set(KeyValueRef(committedVersionKey, BinaryWriter::toValue(req.version, IncludeVersion()))); wait(self->kvStore->commit()); req.reply.send(Void()); return Void(); } + ACTOR static Future traceQueuedMutations(SimpleConfigDatabaseNodeImpl *self) { + state Version currentVersion = wait(getCommittedVersion(self)); + Standalone> versionedMutations = wait(getMutations(self, 0, currentVersion)); + TraceEvent te("SimpleConfigNodeQueuedMutations"); + te.detail("Size", versionedMutations.size()); + te.detail("CommittedVersion", currentVersion); + int index = 0; + for (const auto &versionedMutation : versionedMutations) { + te.detail(format("Version%d", index), versionedMutation.version); + te.detail(format("Mutation%d", index), versionedMutation.mutation.type); + te.detail(format("FirstParam%d", index), versionedMutation.mutation.param1); + te.detail(format("SecondParam%d", index), versionedMutation.mutation.param2); + ++index; + } + return Void(); + } + ACTOR static Future serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface* cti) { loop { + wait(traceQueuedMutations(self)); choose { when(ConfigTransactionGetVersionRequest req = waitNext(cti->getVersion.getFuture())) { self->actors.add(getNewVersion(self, req)); @@ -182,7 +211,7 @@ class SimpleConfigDatabaseNodeImpl { reply.database[kv.key] = kv.value; } Standalone> versionedMutations = - wait(getMutations(self, ::invalidVersion, req.version)); + wait(getMutations(self, 0, req.version)); for (const auto& versionedMutation : versionedMutations) { const auto& mutation = versionedMutation.mutation; if (mutation.type == MutationRef::SetValue) { @@ -199,15 +228,13 @@ class SimpleConfigDatabaseNodeImpl { loop { choose { when(ConfigFollowerGetVersionRequest req = waitNext(cfi->getVersion.getFuture())) { - self->actors.add(getCurrentVersion(self, req)); + self->actors.add(getCommittedVersion(self, req)); } when(ConfigFollowerGetFullDatabaseRequest req = waitNext(cfi->getFullDatabase.getFuture())) { self->actors.add(getFullDatabase(self, req)); - continue; } when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) { - // TODO: Implement - continue; + self->actors.add(getChanges(self, req)); } when(ConfigFollowerCompactRequest req = waitNext(cfi->compact.getFuture())) { // TODO: Implement diff --git a/fdbserver/workloads/ConfigurationDatabase.actor.cpp b/fdbserver/workloads/ConfigurationDatabase.actor.cpp index d60bd161db..2c5b29be5e 100644 --- a/fdbserver/workloads/ConfigurationDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigurationDatabase.actor.cpp @@ -20,6 +20,8 @@ #include "fdbclient/IConfigTransaction.h" #include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/ConfigFollowerInterface.h" +#include "fdbserver/IConfigBroadcaster.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" @@ -52,7 +54,7 @@ class ConfigurationDatabaseWorkload : public TestWorkload { } } - ACTOR static Future start(ConfigurationDatabaseWorkload* self, Database cx) { + ACTOR static Future runClient(Database cx) { state Key key = LiteralStringRef("key"); state Key value = LiteralStringRef("value"); Optional currentValue = wait(get(cx, key)); @@ -65,6 +67,41 @@ class ConfigurationDatabaseWorkload : public TestWorkload { return Void(); } + ACTOR static Future runBroadcaster(Database cx, ConfigFollowerInterface cfi) { + state SimpleConfigBroadcaster broadcaster(cx->getConnectionFile()->getConnectionString()); + wait(success(timeout(broadcaster.serve(cfi), 60.0))); + return Void(); + } + + ACTOR static Future getCurrentVersion(ConfigFollowerInterface cfi) { + ConfigFollowerGetVersionReply versionReply = wait(cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{})); + return versionReply.version; + } + + ACTOR static Future runConsumer(ConfigFollowerInterface cfi) { + state Version mostRecentVersion = wait(getCurrentVersion(cfi)); + ConfigFollowerGetFullDatabaseReply fullDBReply = wait(cfi.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{mostRecentVersion, {}})); + state std::map database = fullDBReply.database; + state int runs = 0; + loop { + state ConfigFollowerGetChangesReply changesReply = wait(cfi.getChanges.getReply(ConfigFollowerGetChangesRequest{mostRecentVersion, {}})); + wait(delay(1.0)); + if (++runs > 5) { + return Void(); + } + } + } + + ACTOR static Future start(ConfigurationDatabaseWorkload *self, Database cx) { + state std::vector> futures; + state ConfigFollowerInterface cfi; + futures.push_back(runClient(cx)); + futures.push_back(runBroadcaster(cx, cfi)); + futures.push_back(runConsumer(cfi)); + wait(waitForAll(futures)); + return Void(); + } + public: ConfigurationDatabaseWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {} Future setup(Database const& cx) override { return Void(); }