Reorder registerWorker to prevent potential conflict

This commit is contained in:
Lukas Joswiak 2021-08-10 15:09:35 -07:00
parent f018af6ee4
commit b112560c94
1 changed files with 20 additions and 20 deletions

View File

@ -114,7 +114,6 @@ class ConfigBroadcasterImpl {
client.lastSeenVersion = mostRecentVersion; client.lastSeenVersion = mostRecentVersion;
req.mostRecentVersion = mostRecentVersion; req.mostRecentVersion = mostRecentVersion;
// TODO: Retry in event of failure
++successfulChangeRequest; ++successfulChangeRequest;
return success(client.broadcastInterface.changes.getReply(req)); return success(client.broadcastInterface.changes.getReply(req));
} }
@ -144,7 +143,7 @@ class ConfigBroadcasterImpl {
} }
for (auto& client : clients) { for (auto& client : clients) {
actors.add(pushChanges(client, changes)); actors.add(brokenPromiseToNever(pushChanges(client, changes)));
} }
} }
@ -172,27 +171,28 @@ public:
if (!consumerFuture.isValid()) { if (!consumerFuture.isValid()) {
consumerFuture = consumer->consume(*self); consumerFuture = consumer->consume(*self);
} }
Future<Void> result = Void();
// Push all dynamic knobs to worker if it isn't up to date.
if (lastSeenVersion < mostRecentVersion) {
++snapshotRequest;
ConfigBroadcastSnapshotRequest request;
for (const auto& [key, value] : snapshot) {
if (matchesConfigClass(configClassSet, key.configClass)) {
request.snapshot[key] = value;
}
}
request.version = mostRecentVersion;
TraceEvent(SevDebug, "ConfigBroadcasterSnapshotRequest", id)
.detail("Size", request.snapshot.size())
.detail("Version", request.version);
result = success(broadcastInterface.snapshot.getReply(request));
}
clients.push_back(BroadcastClientDetails{ clients.push_back(BroadcastClientDetails{
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface) }); watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface) });
actors.add(waitForFailure(this, watcher, &clients.back())); actors.add(waitForFailure(this, watcher, &clients.back()));
return result;
// Push all dynamic knobs to worker if it isn't up to date.
if (clients.back().lastSeenVersion >= mostRecentVersion) {
return Void();
}
++snapshotRequest;
ConfigBroadcastSnapshotRequest request;
for (const auto& [key, value] : snapshot) {
if (matchesConfigClass(clients.back().configClassSet, key.configClass)) {
request.snapshot[key] = value;
}
}
request.version = mostRecentVersion;
TraceEvent(SevDebug, "ConfigBroadcasterSnapshotRequest", id)
.detail("Size", request.snapshot.size())
.detail("Version", request.version);
return success(clients.back().broadcastInterface.snapshot.getReply(request));
} }
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,