Wait for all snapshot replies before sending incremental changes

This commit is contained in:
Lukas Joswiak 2021-08-11 11:17:51 -07:00
parent c098a1128d
commit 1faec36bc6
1 changed files with 18 additions and 5 deletions

View File

@ -169,12 +169,26 @@ class ConfigBroadcasterImpl {
}
template <class Snapshot>
Future<Void> setSnapshot(Snapshot&& snapshot, Version snapshotVersion) {
this->snapshot = std::forward<Snapshot>(snapshot);
Future<Void> setSnapshot(Snapshot& snapshot, Version snapshotVersion) {
this->snapshot = snapshot;
this->lastCompactedVersion = snapshotVersion;
std::vector<Future<Void>> futures;
for (const auto& client : clients) {
actors.add(brokenPromiseToNever(pushSnapshot(this, snapshotVersion, client)));
futures.push_back(brokenPromiseToNever(pushSnapshot(this, snapshotVersion, client)));
}
return waitForAll(futures);
}
ACTOR template <class Snapshot>
static Future<Void> pushSnapshotAndChanges(ConfigBroadcasterImpl* self,
Snapshot snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> annotations) {
// Make sure all snapshot messages were received before sending changes.
wait(self->setSnapshot(snapshot, snapshotVersion));
self->addChanges(changes, changesVersion, annotations);
return Void();
}
@ -232,8 +246,7 @@ public:
.detail("ChangesSize", changes.size())
.detail("ChangesVersion", changesVersion)
.detail("AnnotationsSize", annotations.size());
setSnapshot(std::forward<Snapshot>(snapshot), snapshotVersion);
addChanges(changes, changesVersion, annotations);
actors.add(pushSnapshotAndChanges(this, snapshot, snapshotVersion, changes, changesVersion, annotations));
}
ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {