diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index bb9fa0dae7..a0809854c2 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -169,12 +169,26 @@ class ConfigBroadcasterImpl { } template - Future setSnapshot(Snapshot&& snapshot, Version snapshotVersion) { - this->snapshot = std::forward(snapshot); + Future setSnapshot(Snapshot& snapshot, Version snapshotVersion) { + this->snapshot = snapshot; this->lastCompactedVersion = snapshotVersion; + std::vector> futures; for (const auto& client : clients) { - actors.add(brokenPromiseToNever(pushSnapshot(this, snapshotVersion, client))); + futures.push_back(brokenPromiseToNever(pushSnapshot(this, snapshotVersion, client))); } + return waitForAll(futures); + } + + ACTOR template + static Future pushSnapshotAndChanges(ConfigBroadcasterImpl* self, + Snapshot snapshot, + Version snapshotVersion, + Standalone> changes, + Version changesVersion, + Standalone> annotations) { + // Make sure all snapshot messages were received before sending changes. + wait(self->setSnapshot(snapshot, snapshotVersion)); + self->addChanges(changes, changesVersion, annotations); return Void(); } @@ -232,8 +246,7 @@ public: .detail("ChangesSize", changes.size()) .detail("ChangesVersion", changesVersion) .detail("AnnotationsSize", annotations.size()); - setSnapshot(std::forward(snapshot), snapshotVersion); - addChanges(changes, changesVersion, annotations); + actors.add(pushSnapshotAndChanges(this, snapshot, snapshotVersion, changes, changesVersion, annotations)); } ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {