Add pollingInterval and compactionInterval fields to SimpleConfigConsumer

This commit is contained in:
sfc-gh-tclinkenbeard 2021-05-14 11:03:22 -07:00
parent c4a5662d1b
commit 65ea89fa44
5 changed files with 53 additions and 18 deletions

View File

@ -166,7 +166,8 @@ public:
"ConfigBroadcasterMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics");
auto consumerID = deterministicRandom()->randomUniqueID();
TraceEvent(SevDebug, "BroadcasterStartingConsumer", id).detail("Consumer", consumerID);
consumer = std::make_unique<SimpleConfigConsumer>(configSource, Optional<ConfigClassSet>{}, 0, consumerID);
consumer = std::make_unique<SimpleConfigConsumer>(
configSource, Optional<ConfigClassSet>{}, 0, 0.5, Optional<double>{}, consumerID);
}
UID getID() const { return id; }

View File

@ -229,12 +229,12 @@ TEST_CASE("/fdbserver/ConfigDB/ConfigBroadcaster/CheckpointedUpdates") {
actors.add(dummyConfigSource.serve());
actors.add(broadcaster.serve(cfi->get()));
actors.add(localConfiguration.consume(cfi));
while (version <= 100) {
while (version <= 10) {
versionedMutations = Standalone<VectorRef<VersionedConfigMutationRef>>{};
appendVersionedMutation(versionedMutations, version, "class-A"_sr, "test_int"_sr, versionToValue(version));
appendVersionedMutation(versionedMutations, version, "class-A"_sr, "test_long"_sr, versionToValue(version));
wait(broadcaster.addVersionedMutations(versionedMutations, version));
loop {
if (localConfiguration.getTestKnobs().TEST_INT == version) {
if (localConfiguration.getTestKnobs().TEST_LONG == version) {
++version;
break;
}

View File

@ -261,6 +261,8 @@ class LocalConfigurationImpl : public NonCopyable {
state SimpleConfigConsumer consumer(broadcaster->get(),
impl->configKnobOverrides.getConfigClassSet(),
impl->lastSeenVersion,
Optional<double>{},
Optional<double>{},
deterministicRandom()->randomUniqueID());
TraceEvent(SevDebug, "LocalConfigurationStartingConsumer", impl->id)
.detail("Consumer", consumer.getID())

View File

@ -25,6 +25,8 @@ class SimpleConfigConsumerImpl {
ConfigFollowerInterface cfi;
Version lastSeenVersion;
Optional<ConfigClassSet> configClassSet;
Optional<double> pollingInterval;
Optional<double> compactionInterval;
UID id;
CounterCollection cc;
@ -34,12 +36,13 @@ class SimpleConfigConsumerImpl {
Counter snapshotRequest;
Future<Void> logger;
static const double POLLING_INTERVAL; // TODO: Make knob?
static const double COMPACTION_INTERVAL; // TODO: Make knob?
ACTOR static Future<Void> compactor(SimpleConfigConsumerImpl* self) {
if (!self->compactionInterval.present()) {
wait(Never());
return Void();
}
loop {
wait(delayJittered(COMPACTION_INTERVAL));
wait(delayJittered(self->compactionInterval.get()));
// TODO: Enable compaction once bugs are fixed
// wait(self->cfi.compact.getReply(ConfigFollowerCompactRequest{ self->lastSeenVersion }));
//++self->compactRequest;
@ -63,7 +66,9 @@ class SimpleConfigConsumerImpl {
}
self->lastSeenVersion = reply.mostRecentVersion;
wait(configStore->addVersionedMutations(reply.versionedMutations, reply.mostRecentVersion));
wait(delayJittered(POLLING_INTERVAL));
if (self->pollingInterval.present()) {
wait(delayJittered(self->pollingInterval.get()));
}
} catch (Error& e) {
++self->failedChangeRequest;
if (e.code() == error_code_version_already_compacted) {
@ -124,10 +129,13 @@ public:
SimpleConfigConsumerImpl(InterfaceSource const& interfaceSource,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id)
: lastSeenVersion(lastSeenVersion), configClassSet(configClassSet), id(id), cc("ConfigConsumer"),
compactRequest("CompactRequest", cc), successfulChangeRequest("SuccessfulChangeRequest", cc),
failedChangeRequest("FailedChangeRequest", cc), snapshotRequest("SnapshotRequest", cc) {
: configClassSet(configClassSet), lastSeenVersion(lastSeenVersion), pollingInterval(pollingInterval),
compactionInterval(compactionInterval), id(id), cc("ConfigConsumer"), compactRequest("CompactRequest", cc),
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
snapshotRequest("SnapshotRequest", cc) {
cfi = getConfigFollowerInterface(interfaceSource);
logger = traceCounters(
"ConfigConsumerMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigConsumerMetrics");
@ -147,26 +155,44 @@ public:
UID getID() const { return id; }
};
const double SimpleConfigConsumerImpl::POLLING_INTERVAL = 0.5;
const double SimpleConfigConsumerImpl::COMPACTION_INTERVAL = 5.0;
SimpleConfigConsumer::SimpleConfigConsumer(ConfigFollowerInterface const& cfi,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id)
: impl(std::make_unique<SimpleConfigConsumerImpl>(cfi, configClassSet, lastSeenVersion, id)) {}
: impl(std::make_unique<SimpleConfigConsumerImpl>(cfi,
configClassSet,
lastSeenVersion,
pollingInterval,
compactionInterval,
id)) {}
SimpleConfigConsumer::SimpleConfigConsumer(ClusterConnectionString const& ccs,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id)
: impl(std::make_unique<SimpleConfigConsumerImpl>(ccs, configClassSet, lastSeenVersion, id)) {}
: impl(std::make_unique<SimpleConfigConsumerImpl>(ccs,
configClassSet,
lastSeenVersion,
pollingInterval,
compactionInterval,
id)) {}
SimpleConfigConsumer::SimpleConfigConsumer(ServerCoordinators const& coordinators,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id)
: impl(std::make_unique<SimpleConfigConsumerImpl>(coordinators, configClassSet, lastSeenVersion, id)) {}
: impl(std::make_unique<SimpleConfigConsumerImpl>(coordinators,
configClassSet,
lastSeenVersion,
pollingInterval,
compactionInterval,
id)) {}
Future<Void> SimpleConfigConsumer::getInitialSnapshot(ConfigBroadcaster& broadcaster) {
return impl->getInitialSnapshot(broadcaster);

View File

@ -31,14 +31,20 @@ public:
SimpleConfigConsumer(ConfigFollowerInterface const& cfi,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id);
SimpleConfigConsumer(ClusterConnectionString const& ccs,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id);
SimpleConfigConsumer(ServerCoordinators const& coordinators,
Optional<ConfigClassSet> const& configClassSet,
Version lastSeenVersion,
Optional<double> const& pollingInterval,
Optional<double> const& compactionInterval,
UID id);
~SimpleConfigConsumer();
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;