Add GetSnapshotAndChangesRequest type
This commit is contained in:
parent
fcc6efd3b1
commit
748a3ebfbe
|
@ -334,7 +334,7 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
|
|||
}
|
||||
|
||||
TransportData::TransportData(uint64_t transportId)
|
||||
: endpoints(/*wellKnownTokenCount*/ 19), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
|
||||
: endpoints(/*wellKnownTokenCount*/ 18), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
|
||||
warnAlwaysForLargePacket(true), lastIncompatibleMessage(0), transportId(transportId),
|
||||
numIncompatibleConnections(0) {
|
||||
degraded = makeReference<AsyncVar<bool>>(false);
|
||||
|
|
|
@ -50,21 +50,18 @@ class ConfigBroadcasterImpl {
|
|||
Future<Void> logger;
|
||||
|
||||
ACTOR static Future<Void> serve(ConfigBroadcaster* self, ConfigBroadcasterImpl* impl, ConfigFollowerInterface cfi) {
|
||||
wait(impl->consumer->getInitialSnapshot(*self));
|
||||
impl->actors.add(impl->consumer->consume(*self));
|
||||
loop {
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(cfi.getVersion.getFuture())) {
|
||||
req.reply.send(impl->mostRecentVersion);
|
||||
}
|
||||
when(ConfigFollowerGetSnapshotRequest req = waitNext(cfi.getSnapshot.getFuture())) {
|
||||
when(ConfigFollowerGetSnapshotAndChangesRequest req = waitNext(cfi.getSnapshotAndChanges.getFuture())) {
|
||||
++impl->snapshotRequest;
|
||||
ConfigFollowerGetSnapshotReply reply;
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply;
|
||||
for (const auto& [key, value] : impl->snapshot) {
|
||||
if (matchesConfigClass(req.configClassSet, key.configClass)) {
|
||||
reply.snapshot[key] = value;
|
||||
}
|
||||
}
|
||||
reply.snapshotVersion = reply.changesVersion = impl->mostRecentVersion;
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(cfi.getChanges.getFuture())) {
|
||||
|
@ -73,6 +70,8 @@ class ConfigBroadcasterImpl {
|
|||
++impl->failedChangeRequest;
|
||||
continue;
|
||||
}
|
||||
// TODO: If there are no new changes, register the request and push
|
||||
// changes when ready
|
||||
ConfigFollowerGetChangesReply reply;
|
||||
reply.mostRecentVersion = impl->mostRecentVersion;
|
||||
for (const auto& versionedMutation : impl->versionedMutations) {
|
||||
|
@ -138,8 +137,9 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version snapshotVersion) {
|
||||
this->snapshot = std::move(snapshot);
|
||||
template <class Snapshot>
|
||||
Future<Void> setSnapshot(Snapshot&& snapshot, Version snapshotVersion) {
|
||||
this->snapshot = std::move(std::forward<Snapshot>(snapshot));
|
||||
this->lastCompactedVersion = snapshotVersion;
|
||||
return Void();
|
||||
}
|
||||
|
@ -221,6 +221,10 @@ Future<Void> ConfigBroadcaster::addVersionedMutations(
|
|||
return impl->addVersionedMutations(versionedMutations, mostRecentVersion);
|
||||
}
|
||||
|
||||
Future<Void> ConfigBroadcaster::setSnapshot(std::map<ConfigKey, Value> const& snapshot, Version snapshotVersion) {
|
||||
return impl->setSnapshot(snapshot, snapshotVersion);
|
||||
}
|
||||
|
||||
Future<Void> ConfigBroadcaster::setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version snapshotVersion) {
|
||||
return impl->setSnapshot(std::move(snapshot), snapshotVersion);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ public:
|
|||
Future<Void> serve(ConfigFollowerInterface const&);
|
||||
Future<Void> addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> const&,
|
||||
Version mostRecentVersion);
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value> const& snapshot, Version snapshotVersion);
|
||||
Future<Void> setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version snapshotVersion);
|
||||
UID getID() const;
|
||||
JsonBuilderObject getStatus() const;
|
||||
|
|
|
@ -139,24 +139,6 @@ public:
|
|||
};
|
||||
|
||||
class BroadcasterToLocalConfigEnvironment {
|
||||
class DummyConfigSource {
|
||||
ConfigFollowerInterface cfi;
|
||||
ACTOR static Future<Void> serve(DummyConfigSource* self) {
|
||||
loop {
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(self->cfi.getVersion.getFuture())) {
|
||||
req.reply.send(0);
|
||||
}
|
||||
when(ConfigFollowerGetSnapshotRequest req = waitNext(self->cfi.getSnapshot.getFuture())) {
|
||||
req.reply.send(ConfigFollowerGetSnapshotReply{});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
public:
|
||||
Future<Void> serve() { return serve(this); }
|
||||
ConfigFollowerInterface const& getInterface() { return cfi; }
|
||||
} dummyConfigSource;
|
||||
ConfigBroadcaster broadcaster;
|
||||
Reference<AsyncVar<ConfigFollowerInterface>> cfi;
|
||||
LocalConfiguration localConfiguration;
|
||||
|
@ -166,40 +148,32 @@ class BroadcasterToLocalConfigEnvironment {
|
|||
|
||||
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) {
|
||||
wait(self->localConfiguration.initialize("./", deterministicRandom()->randomUniqueID()));
|
||||
self->actors.add(self->dummyConfigSource.serve());
|
||||
self->broadcastServer = self->broadcaster.serve(self->cfi->get());
|
||||
self->actors.add(
|
||||
self->localConfiguration.consume(IDependentAsyncVar<ConfigFollowerInterface>::create(self->cfi)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> compact(BroadcasterToLocalConfigEnvironment* self) {
|
||||
ConfigFollowerGetVersionReply reply =
|
||||
wait(self->cfi->get().getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
wait(self->cfi->get().compact.getReply(ConfigFollowerCompactRequest{ reply.version }));
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
BroadcasterToLocalConfigEnvironment(std::string const& configPath)
|
||||
: broadcaster(dummyConfigSource.getInterface()), cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()),
|
||||
: broadcaster(ConfigFollowerInterface{}), cfi(makeReference<AsyncVar<ConfigFollowerInterface>>()),
|
||||
localConfiguration(configPath, {}) {}
|
||||
|
||||
Future<Void> setup() { return setup(this); }
|
||||
|
||||
Future<Void> set(Optional<KeyRef> configClass, int64_t value) {
|
||||
return addTestSetMutation(localConfiguration, configClass, value, ++lastWrittenVersion);
|
||||
return addTestSetMutation(broadcaster, configClass, value, ++lastWrittenVersion);
|
||||
}
|
||||
Future<Void> clear(Optional<KeyRef> configClass) {
|
||||
return addTestClearMutation(localConfiguration, configClass, ++lastWrittenVersion);
|
||||
return addTestClearMutation(broadcaster, configClass, ++lastWrittenVersion);
|
||||
}
|
||||
Future<Void> check(Optional<int64_t> value) const { return checkEventually(&localConfiguration, value); }
|
||||
void changeBroadcaster() {
|
||||
cfi->set(ConfigFollowerInterface());
|
||||
broadcaster = ConfigBroadcaster(dummyConfigSource.getInterface());
|
||||
broadcastServer.cancel();
|
||||
cfi->set(ConfigFollowerInterface{});
|
||||
broadcastServer = broadcaster.serve(cfi->get());
|
||||
}
|
||||
Future<Void> compact() { return compact(this); }
|
||||
Future<Void> compact() { return cfi->get().compact.getReply(ConfigFollowerCompactRequest{ lastWrittenVersion }); }
|
||||
|
||||
Future<Void> getError() const { return actors.getResult() || broadcastServer; }
|
||||
};
|
||||
|
|
|
@ -23,8 +23,8 @@
|
|||
#include "fdbserver/CoordinationInterface.h"
|
||||
|
||||
void ConfigFollowerInterface::setupWellKnownEndpoints() {
|
||||
getVersion.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETVERSION, TaskPriority::Coordination);
|
||||
getSnapshot.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOT, TaskPriority::Coordination);
|
||||
getSnapshotAndChanges.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES,
|
||||
TaskPriority::Coordination);
|
||||
getChanges.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_GETCHANGES, TaskPriority::Coordination);
|
||||
compact.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_COMPACT, TaskPriority::Coordination);
|
||||
}
|
||||
|
@ -32,8 +32,8 @@ void ConfigFollowerInterface::setupWellKnownEndpoints() {
|
|||
ConfigFollowerInterface::ConfigFollowerInterface() : _id(deterministicRandom()->randomUniqueID()) {}
|
||||
|
||||
ConfigFollowerInterface::ConfigFollowerInterface(NetworkAddress const& remote)
|
||||
: _id(deterministicRandom()->randomUniqueID()), getVersion(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETVERSION)),
|
||||
getSnapshot(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOT)),
|
||||
: _id(deterministicRandom()->randomUniqueID()),
|
||||
getSnapshotAndChanges(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES)),
|
||||
getChanges(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCHANGES)),
|
||||
compact(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_COMPACT)) {}
|
||||
|
||||
|
|
|
@ -44,58 +44,6 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetVersionReply {
|
||||
static constexpr FileIdentifier file_identifier = 1028349;
|
||||
Version version;
|
||||
|
||||
explicit ConfigFollowerGetVersionReply(Version version = -1) : version(version) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetVersionRequest {
|
||||
static constexpr FileIdentifier file_identifier = 9840156;
|
||||
ReplyPromise<ConfigFollowerGetVersionReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetSnapshotReply {
|
||||
static constexpr FileIdentifier file_identifier = 1734095;
|
||||
std::map<ConfigKey, Value> snapshot;
|
||||
|
||||
ConfigFollowerGetSnapshotReply() = default;
|
||||
explicit ConfigFollowerGetSnapshotReply(std::map<ConfigKey, Value>&& snapshot) : snapshot(std::move(snapshot)) {}
|
||||
explicit ConfigFollowerGetSnapshotReply(std::map<ConfigKey, Value> const& snapshot) : snapshot(snapshot) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, snapshot);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetSnapshotRequest {
|
||||
static constexpr FileIdentifier file_identifier = 294811;
|
||||
Version version;
|
||||
Optional<ConfigClassSet> configClassSet;
|
||||
ReplyPromise<ConfigFollowerGetSnapshotReply> reply;
|
||||
|
||||
ConfigFollowerGetSnapshotRequest() : version(-1) {}
|
||||
explicit ConfigFollowerGetSnapshotRequest(Version version, Optional<ConfigClassSet> const& configClassSet)
|
||||
: version(version), configClassSet(configClassSet) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, configClassSet, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct VersionedConfigMutationRef {
|
||||
Version version;
|
||||
ConfigMutationRef mutation;
|
||||
|
@ -108,12 +56,51 @@ struct VersionedConfigMutationRef {
|
|||
|
||||
size_t expectedSize() const { return sizeof(Version) + mutation.expectedSize(); }
|
||||
|
||||
template<class Ar>
|
||||
void serialize(Ar &ar) {
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, mutation);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetSnapshotAndChangesReply {
|
||||
static constexpr FileIdentifier file_identifier = 1734095;
|
||||
Version snapshotVersion;
|
||||
Version changesVersion;
|
||||
std::map<ConfigKey, Value> snapshot;
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> changes;
|
||||
|
||||
ConfigFollowerGetSnapshotAndChangesReply() = default;
|
||||
template <class Snapshot>
|
||||
explicit ConfigFollowerGetSnapshotAndChangesReply(Version snapshotVersion,
|
||||
Version changesVersion,
|
||||
Snapshot&& snapshot,
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> changes)
|
||||
: snapshotVersion(snapshotVersion), changesVersion(changesVersion), snapshot(std::forward<Snapshot>(snapshot)),
|
||||
changes(changes) {
|
||||
ASSERT(changesVersion >= snapshotVersion);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, snapshotVersion, changesVersion, snapshot, changes);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetSnapshotAndChangesRequest {
|
||||
static constexpr FileIdentifier file_identifier = 294811;
|
||||
Optional<ConfigClassSet> configClassSet;
|
||||
ReplyPromise<ConfigFollowerGetSnapshotAndChangesReply> reply;
|
||||
|
||||
ConfigFollowerGetSnapshotAndChangesRequest() = default;
|
||||
explicit ConfigFollowerGetSnapshotAndChangesRequest(Optional<ConfigClassSet> const& configClassSet)
|
||||
: configClassSet(configClassSet) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, configClassSet, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct ConfigFollowerGetChangesReply {
|
||||
static constexpr FileIdentifier file_identifier = 234859;
|
||||
Version mostRecentVersion;
|
||||
|
@ -165,8 +152,7 @@ class ConfigFollowerInterface {
|
|||
|
||||
public:
|
||||
static constexpr FileIdentifier file_identifier = 7721102;
|
||||
RequestStream<ConfigFollowerGetVersionRequest> getVersion;
|
||||
RequestStream<ConfigFollowerGetSnapshotRequest> getSnapshot;
|
||||
RequestStream<ConfigFollowerGetSnapshotAndChangesRequest> getSnapshotAndChanges;
|
||||
RequestStream<ConfigFollowerGetChangesRequest> getChanges;
|
||||
RequestStream<ConfigFollowerCompactRequest> compact;
|
||||
|
||||
|
@ -179,6 +165,6 @@ public:
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, _id, getVersion, getSnapshot, getChanges);
|
||||
serializer(ar, _id, getSnapshotAndChanges, getChanges, compact);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -635,7 +635,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder, Optional<bool> use
|
|||
} else {
|
||||
configDatabaseNode = IConfigDatabaseNode::createPaxos();
|
||||
}
|
||||
wait(configDatabaseNode->initialize(dataFolder, deterministicRandom()->randomUniqueID()));
|
||||
wait(configDatabaseNode->initialize(dataFolder, UID{}));
|
||||
configDatabaseServer =
|
||||
configDatabaseNode->serve(configTransactionInterface) || configDatabaseNode->serve(configFollowerInterface);
|
||||
}
|
||||
|
|
|
@ -32,10 +32,9 @@ constexpr UID WLTOKEN_LEADERELECTIONREG_FORWARD(-1, 7);
|
|||
constexpr UID WLTOKEN_GENERATIONREG_READ(-1, 8);
|
||||
constexpr UID WLTOKEN_GENERATIONREG_WRITE(-1, 9);
|
||||
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETVERSION(-1, 15);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOT(-1, 16);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCHANGES(-1, 17);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_COMPACT(-1, 18);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETSNAPSHOTANDCHANGES(-1, 15);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_GETCHANGES(-1, 16);
|
||||
constexpr UID WLTOKEN_CONFIGFOLLOWER_COMPACT(-1, 17);
|
||||
|
||||
struct GenerationRegInterface {
|
||||
constexpr static FileIdentifier file_identifier = 16726744;
|
||||
|
|
|
@ -30,7 +30,6 @@
|
|||
class IConfigConsumer {
|
||||
public:
|
||||
virtual ~IConfigConsumer() = default;
|
||||
virtual Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) = 0;
|
||||
virtual Future<Void> consume(ConfigBroadcaster& broadcaster) = 0;
|
||||
virtual UID getID() const = 0;
|
||||
|
||||
|
|
|
@ -31,12 +31,6 @@ PaxosConfigConsumer::PaxosConfigConsumer(ServerCoordinators const& cfi,
|
|||
|
||||
PaxosConfigConsumer::~PaxosConfigConsumer() = default;
|
||||
|
||||
Future<Void> PaxosConfigConsumer::getInitialSnapshot(ConfigBroadcaster& broadcaster) {
|
||||
// TODO: Implement
|
||||
ASSERT(false);
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> PaxosConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
|
||||
// TODO: Implement
|
||||
ASSERT(false);
|
||||
|
|
|
@ -30,7 +30,6 @@ public:
|
|||
Optional<double> pollingInterval,
|
||||
Optional<double> compactionInterval);
|
||||
~PaxosConfigConsumer();
|
||||
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;
|
||||
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
|
||||
UID getID() const override;
|
||||
};
|
||||
|
|
|
@ -51,6 +51,7 @@ class SimpleConfigConsumerImpl {
|
|||
|
||||
ACTOR template <class ConfigStore>
|
||||
static Future<Void> fetchChanges(SimpleConfigConsumerImpl* self, ConfigStore* configStore) {
|
||||
wait(getInitialSnapshot(self, configStore));
|
||||
loop {
|
||||
try {
|
||||
ConfigFollowerGetChangesReply reply = wait(self->cfi.getChanges.getReply(
|
||||
|
@ -71,15 +72,7 @@ class SimpleConfigConsumerImpl {
|
|||
} catch (Error& e) {
|
||||
++self->failedChangeRequest;
|
||||
if (e.code() == error_code_version_already_compacted) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
ASSERT(versionReply.version > self->lastSeenVersion);
|
||||
self->lastSeenVersion = versionReply.version;
|
||||
ConfigFollowerGetSnapshotReply dbReply = wait(self->cfi.getSnapshot.getReply(
|
||||
ConfigFollowerGetSnapshotRequest{ self->lastSeenVersion, self->configClassSet }));
|
||||
// TODO: Remove unnecessary copy
|
||||
auto snapshot = dbReply.snapshot;
|
||||
wait(configStore->setSnapshot(std::move(snapshot), self->lastSeenVersion));
|
||||
wait(getInitialSnapshot(self, configStore));
|
||||
++self->snapshotRequest;
|
||||
} else {
|
||||
throw e;
|
||||
|
@ -88,17 +81,30 @@ class SimpleConfigConsumerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR template <class ConfigStore>
|
||||
static Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self, ConfigStore* configStore) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
self->lastSeenVersion = versionReply.version;
|
||||
ConfigFollowerGetSnapshotReply reply = wait(self->cfi.getSnapshot.getReply(
|
||||
ConfigFollowerGetSnapshotRequest{ self->lastSeenVersion, self->configClassSet }));
|
||||
TraceEvent(SevDebug, "ConfigGotInitialSnapshot").detail("Version", self->lastSeenVersion);
|
||||
// TODO: Remove unnecessary copy
|
||||
auto snapshot = reply.snapshot;
|
||||
wait(configStore->setSnapshot(std::move(snapshot), self->lastSeenVersion));
|
||||
ACTOR static Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self,
|
||||
LocalConfiguration* localConfiguration) {
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply = wait(self->cfi.getSnapshotAndChanges.getReply(
|
||||
ConfigFollowerGetSnapshotAndChangesRequest{ self->configClassSet }));
|
||||
ASSERT(reply.changes.empty() && reply.changesVersion == reply.snapshotVersion);
|
||||
TraceEvent(SevDebug, "ConfigGotInitialSnapshot")
|
||||
.detail("Version", reply.snapshotVersion)
|
||||
.detail("SnapshotSize", reply.snapshot.size());
|
||||
self->lastSeenVersion = reply.snapshotVersion;
|
||||
wait(localConfiguration->setSnapshot(std::move(reply.snapshot), self->lastSeenVersion));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
|
||||
ConfigFollowerGetSnapshotAndChangesReply reply = wait(self->cfi.getSnapshotAndChanges.getReply(
|
||||
ConfigFollowerGetSnapshotAndChangesRequest{ self->configClassSet }));
|
||||
TraceEvent(SevDebug, "BroadcasterGotInitialSnapshot", self->id)
|
||||
.detail("SnapshotVersion", reply.snapshotVersion)
|
||||
.detail("SnapshotSize", reply.snapshot.size())
|
||||
.detail("ChangesVersion", reply.changesVersion)
|
||||
.detail("ChangesSize", reply.changes.size());
|
||||
broadcaster->setSnapshot(std::move(reply.snapshot), reply.snapshotVersion);
|
||||
broadcaster->addVersionedMutations(reply.changes, reply.changesVersion);
|
||||
self->lastSeenVersion = reply.changesVersion;
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -138,11 +144,6 @@ public:
|
|||
"ConfigConsumerMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigConsumerMetrics");
|
||||
}
|
||||
|
||||
template <class ConfigStore>
|
||||
Future<Void> getInitialSnapshot(ConfigStore& configStore) {
|
||||
return getInitialSnapshot(this, &configStore);
|
||||
}
|
||||
|
||||
template <class ConfigStore>
|
||||
Future<Void> consume(ConfigStore& configStore) {
|
||||
// TODO: Reenable compaction
|
||||
|
@ -174,14 +175,6 @@ SimpleConfigConsumer::SimpleConfigConsumer(ServerCoordinators const& coordinator
|
|||
pollingInterval,
|
||||
compactionInterval)) {}
|
||||
|
||||
Future<Void> SimpleConfigConsumer::getInitialSnapshot(ConfigBroadcaster& broadcaster) {
|
||||
return impl->getInitialSnapshot(broadcaster);
|
||||
}
|
||||
|
||||
Future<Void> SimpleConfigConsumer::getInitialSnapshot(LocalConfiguration& localConfiguration) {
|
||||
return impl->getInitialSnapshot(localConfiguration);
|
||||
}
|
||||
|
||||
Future<Void> SimpleConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
|
||||
return impl->consume(broadcaster);
|
||||
}
|
||||
|
|
|
@ -39,8 +39,6 @@ public:
|
|||
Optional<double> pollingInterval,
|
||||
Optional<double> compactionInterval);
|
||||
~SimpleConfigConsumer();
|
||||
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;
|
||||
Future<Void> getInitialSnapshot(LocalConfiguration& localConfiguration);
|
||||
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
|
||||
Future<Void> consume(LocalConfiguration& localConfiguration);
|
||||
UID getID() const override;
|
||||
|
|
|
@ -94,7 +94,6 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
Counter successfulChangeRequests;
|
||||
Counter failedChangeRequests;
|
||||
Counter snapshotRequests;
|
||||
Counter committedVersionRequests;
|
||||
|
||||
// Transaction counters
|
||||
Counter successfulCommits;
|
||||
|
@ -178,12 +177,6 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getCommittedVersion(SimpleConfigDatabaseNodeImpl *self, ConfigFollowerGetVersionRequest req) {
|
||||
Version committedVersion = wait(getCommittedVersion(self));
|
||||
req.reply.send(ConfigFollowerGetVersionReply(committedVersion));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getNewVersion(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionGetVersionRequest req) {
|
||||
state Version currentVersion = wait(getLiveTransactionVersion(self));
|
||||
self->kvStore->set(KeyValueRef(liveTransactionVersionKey, BinaryWriter::toValue(++currentVersion, IncludeVersion())));
|
||||
|
@ -288,25 +281,6 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> traceQueuedMutations(SimpleConfigDatabaseNodeImpl *self) {
|
||||
state Version currentVersion = wait(getCommittedVersion(self));
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
|
||||
wait(getMutations(self, 0, currentVersion));
|
||||
TraceEvent te("SimpleConfigNodeQueuedMutations", self->id);
|
||||
te.detail("Size", versionedMutations.size());
|
||||
te.detail("CommittedVersion", currentVersion);
|
||||
int index = 0;
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
te.detail(format("Version%d", index), versionedMutation.version);
|
||||
te.detail(format("Mutation%d", index), versionedMutation.mutation.isSet());
|
||||
te.detail(format("ConfigClass%d", index), versionedMutation.mutation.getConfigClass());
|
||||
te.detail(format("KnobName%d", index), versionedMutation.mutation.getKnobName());
|
||||
te.detail(format("KnobValue%d", index), versionedMutation.mutation.getValue());
|
||||
++index;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> serve(SimpleConfigDatabaseNodeImpl* self, ConfigTransactionInterface const* cti) {
|
||||
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
|
||||
loop {
|
||||
|
@ -331,31 +305,23 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getSnapshot(SimpleConfigDatabaseNodeImpl* self, ConfigFollowerGetSnapshotRequest req) {
|
||||
state ConfigFollowerGetSnapshotReply reply;
|
||||
ACTOR static Future<Void> getSnapshotAndChanges(SimpleConfigDatabaseNodeImpl* self,
|
||||
ConfigFollowerGetSnapshotAndChangesRequest req) {
|
||||
state ConfigFollowerGetSnapshotAndChangesReply reply;
|
||||
Standalone<RangeResultRef> data = wait(self->kvStore->readRange(kvKeys));
|
||||
for (const auto& kv : data) {
|
||||
reply
|
||||
.snapshot[BinaryReader::fromStringRef<ConfigKey>(kv.key.removePrefix(kvKeys.begin), IncludeVersion())] =
|
||||
kv.value;
|
||||
}
|
||||
state Version lastCompactedVersion = wait(getLastCompactedVersion(self));
|
||||
wait(store(reply.snapshotVersion, getLastCompactedVersion(self)));
|
||||
wait(store(reply.changesVersion, getCommittedVersion(self)));
|
||||
wait(store(reply.changes, getMutations(self, reply.snapshotVersion + 1, reply.changesVersion)));
|
||||
TraceEvent(SevDebug, "ConfigDatabaseNodeGettingSnapshot")
|
||||
.detail("ReqVersion", req.version)
|
||||
.detail("LastCompactedVersion", lastCompactedVersion);
|
||||
if (lastCompactedVersion > req.version) {
|
||||
req.reply.sendError(version_already_compacted());
|
||||
}
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> versionedMutations =
|
||||
wait(getMutations(self, lastCompactedVersion + 1, req.version));
|
||||
for (const auto& versionedMutation : versionedMutations) {
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (mutation.isSet()) {
|
||||
reply.snapshot[mutation.getKey()] = mutation.getValue().get();
|
||||
} else {
|
||||
reply.snapshot.erase(mutation.getKey());
|
||||
}
|
||||
}
|
||||
.detail("SnapshotVersion", reply.snapshotVersion)
|
||||
.detail("ChangesVersion", reply.changesVersion)
|
||||
.detail("SnapshotSize", reply.snapshot.size())
|
||||
.detail("ChangesSize", reply.changes.size());
|
||||
req.reply.send(reply);
|
||||
return Void();
|
||||
}
|
||||
|
@ -404,13 +370,10 @@ class SimpleConfigDatabaseNodeImpl {
|
|||
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
|
||||
loop {
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(cfi->getVersion.getFuture())) {
|
||||
++self->committedVersionRequests;
|
||||
wait(getCommittedVersion(self, req));
|
||||
}
|
||||
when(ConfigFollowerGetSnapshotRequest req = waitNext(cfi->getSnapshot.getFuture())) {
|
||||
when(ConfigFollowerGetSnapshotAndChangesRequest req =
|
||||
waitNext(cfi->getSnapshotAndChanges.getFuture())) {
|
||||
++self->snapshotRequests;
|
||||
wait(getSnapshot(self, req));
|
||||
wait(getSnapshotAndChanges(self, req));
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(cfi->getChanges.getFuture())) {
|
||||
wait(getChanges(self, req));
|
||||
|
@ -427,9 +390,8 @@ public:
|
|||
SimpleConfigDatabaseNodeImpl()
|
||||
: cc("ConfigDatabaseNode"), compactRequests("CompactRequests", cc),
|
||||
successfulChangeRequests("SuccessfulChangeRequests", cc), failedChangeRequests("FailedChangeRequests", cc),
|
||||
snapshotRequests("SnapshotRequests", cc), committedVersionRequests("CommittedVersionRequests", cc),
|
||||
successfulCommits("SuccessfulCommits", cc), failedCommits("FailedCommits", cc),
|
||||
setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
|
||||
snapshotRequests("SnapshotRequests", cc), successfulCommits("SuccessfulCommits", cc),
|
||||
failedCommits("FailedCommits", cc), setMutations("SetMutations", cc), clearMutations("ClearMutations", cc),
|
||||
getValueRequests("GetValueRequests", cc), newVersionRequests("NewVersionRequests", cc) {}
|
||||
|
||||
~SimpleConfigDatabaseNodeImpl() {
|
||||
|
|
|
@ -2027,7 +2027,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
|
|||
|
||||
if (useTestConfigDB.present()) {
|
||||
// TODO: Shouldn't block here
|
||||
wait(localConfig.initialize(dataFolder, deterministicRandom()->randomUniqueID()));
|
||||
wait(localConfig.initialize(dataFolder, UID{}));
|
||||
}
|
||||
|
||||
actors.push_back(serveProtocolInfo());
|
||||
|
|
Loading…
Reference in New Issue