Only register each worker once with config broadcaster (consumer currently disabled)
This commit is contained in:
parent
7d87aa8203
commit
616a01d01d
|
@ -62,6 +62,14 @@ class ConfigBroadcasterImpl {
|
|||
broadcastInterface == rhs.broadcastInterface;
|
||||
}
|
||||
bool operator!=(BroadcastClientDetails const& rhs) const { return !(*this == rhs); }
|
||||
|
||||
BroadcastClientDetails() = default;
|
||||
BroadcastClientDetails(Future<Void> watcher,
|
||||
ConfigClassSet const& configClassSet,
|
||||
Version lastSeenVersion,
|
||||
ConfigBroadcastInterface broadcastInterface)
|
||||
: watcher(watcher), configClassSet(configClassSet), lastSeenVersion(lastSeenVersion),
|
||||
broadcastInterface(broadcastInterface) {}
|
||||
};
|
||||
|
||||
std::map<ConfigKey, KnobValue> snapshot;
|
||||
|
@ -72,7 +80,7 @@ class ConfigBroadcasterImpl {
|
|||
std::unique_ptr<IConfigConsumer> consumer;
|
||||
Future<Void> consumerFuture;
|
||||
ActorCollection actors{ false };
|
||||
std::vector<BroadcastClientDetails> clients;
|
||||
std::map<UID, BroadcastClientDetails> clients;
|
||||
|
||||
UID id;
|
||||
CounterCollection cc;
|
||||
|
@ -82,16 +90,14 @@ class ConfigBroadcasterImpl {
|
|||
Counter snapshotRequest;
|
||||
Future<Void> logger;
|
||||
|
||||
Future<Void> pushSnapshot(ConfigBroadcasterImpl* self,
|
||||
Version snapshotVersion,
|
||||
BroadcastClientDetails const& client) {
|
||||
Future<Void> pushSnapshot(Version snapshotVersion, BroadcastClientDetails const& client) {
|
||||
if (client.lastSeenVersion >= snapshotVersion) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
++snapshotRequest;
|
||||
ConfigBroadcastSnapshotRequest request;
|
||||
for (const auto& [key, value] : self->snapshot) {
|
||||
for (const auto& [key, value] : snapshot) {
|
||||
if (matchesConfigClass(client.configClassSet, key.configClass)) {
|
||||
request.snapshot[key] = value;
|
||||
}
|
||||
|
@ -163,7 +169,7 @@ class ConfigBroadcasterImpl {
|
|||
}
|
||||
}
|
||||
|
||||
for (auto& client : clients) {
|
||||
for (auto& [id, client] : clients) {
|
||||
actors.add(brokenPromiseToNever(pushChanges(client, changes)));
|
||||
}
|
||||
}
|
||||
|
@ -173,8 +179,8 @@ class ConfigBroadcasterImpl {
|
|||
this->snapshot = snapshot;
|
||||
this->lastCompactedVersion = snapshotVersion;
|
||||
std::vector<Future<Void>> futures;
|
||||
for (const auto& client : clients) {
|
||||
futures.push_back(brokenPromiseToNever(pushSnapshot(this, snapshotVersion, client)));
|
||||
for (const auto& [id, client] : clients) {
|
||||
futures.push_back(brokenPromiseToNever(pushSnapshot(snapshotVersion, client)));
|
||||
}
|
||||
return waitForAll(futures);
|
||||
}
|
||||
|
@ -192,34 +198,38 @@ class ConfigBroadcasterImpl {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self,
|
||||
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self, Future<Void> watcher, UID clientUID) {
|
||||
wait(watcher);
|
||||
self->clients.erase(clientUID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> registerWorker(ConfigBroadcasterImpl* self,
|
||||
Version lastSeenVersion,
|
||||
ConfigClassSet configClassSet,
|
||||
Future<Void> watcher,
|
||||
BroadcastClientDetails* client) {
|
||||
wait(success(watcher));
|
||||
self->clients.erase(std::remove(self->clients.begin(), self->clients.end(), *client));
|
||||
ConfigBroadcastInterface broadcastInterface) {
|
||||
state BroadcastClientDetails client(
|
||||
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
|
||||
|
||||
if (self->clients.count(broadcastInterface.id())) {
|
||||
// Client already registered
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Push full snapshot to worker if it isn't up to date.
|
||||
wait(self->pushSnapshot(self->mostRecentVersion, client));
|
||||
self->clients[broadcastInterface.id()] = client;
|
||||
self->actors.add(waitForFailure(self, watcher, broadcastInterface.id()));
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
Future<Void> registerWorker(ConfigBroadcaster* self,
|
||||
Version lastSeenVersion,
|
||||
Future<Void> registerWorker(Version lastSeenVersion,
|
||||
ConfigClassSet configClassSet,
|
||||
Future<Void> watcher,
|
||||
ConfigBroadcastInterface broadcastInterface) {
|
||||
if (!consumerFuture.isValid()) {
|
||||
consumerFuture = consumer->consume(*self);
|
||||
}
|
||||
|
||||
auto client = BroadcastClientDetails{
|
||||
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface)
|
||||
};
|
||||
|
||||
// Push all dynamic knobs to worker if it isn't up to date.
|
||||
Future<Void> result = pushSnapshot(this, mostRecentVersion, client);
|
||||
|
||||
clients.push_back(std::move(client));
|
||||
actors.add(waitForFailure(this, watcher, &clients.back()));
|
||||
return result;
|
||||
return registerWorker(this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
|
||||
}
|
||||
|
||||
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
|
||||
|
@ -342,10 +352,10 @@ ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
|
|||
ConfigBroadcaster::~ConfigBroadcaster() = default;
|
||||
|
||||
Future<Void> ConfigBroadcaster::registerWorker(Version lastSeenVersion,
|
||||
ConfigClassSet configClassSet,
|
||||
ConfigClassSet const& configClassSet,
|
||||
Future<Void> watcher,
|
||||
ConfigBroadcastInterface broadcastInterface) {
|
||||
return impl->registerWorker(this, lastSeenVersion, std::move(configClassSet), watcher, broadcastInterface);
|
||||
return impl->registerWorker(lastSeenVersion, configClassSet, watcher, broadcastInterface);
|
||||
}
|
||||
|
||||
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
|
||||
|
|
|
@ -44,7 +44,7 @@ public:
|
|||
ConfigBroadcaster& operator=(ConfigBroadcaster&&);
|
||||
~ConfigBroadcaster();
|
||||
Future<Void> registerWorker(Version lastSeenVersion,
|
||||
ConfigClassSet configClassSet,
|
||||
ConfigClassSet const& configClassSet,
|
||||
Future<Void> watcher,
|
||||
ConfigBroadcastInterface worker);
|
||||
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
|
||||
|
|
Loading…
Reference in New Issue