From f9f676abf9dc0433db85da2e62993d9dcfb4955e Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 17 Feb 2022 19:15:51 -0600 Subject: [PATCH] Also re-persisting change feed metadata on fetch after cleanup --- fdbserver/storageserver.actor.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index cbe1011770..67d7f3d4e2 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4331,6 +4331,7 @@ ACTOR Future changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) TraceEvent(SevDebug, "ChangeFeedPopQuery", self->thisServerID) .detail("RangeID", req.rangeID.printable()) .detail("Version", req.version) + .detail("SSVersion", self->version.get()) .detail("Range", req.range.toString()); if (req.version - 1 > feed->second->emptyVersion) { @@ -4662,6 +4663,7 @@ ACTOR Future> fetchChangeFeedMetadata(StorageServer* data, KeyR auto changeFeedInfo = existingEntry->second; auto feedCleanup = data->changeFeedCleanupDurable.find(cfEntry.rangeId); + bool writeToMutationLog = false; if (feedCleanup != data->changeFeedCleanupDurable.end() && changeFeedInfo->removing) { TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing changeFeedInfo->emptyVersion = cfEntry.emptyVersion; @@ -4678,9 +4680,19 @@ ACTOR Future> fetchChangeFeedMetadata(StorageServer* data, KeyR .detail("EmptyVersion", cfEntry.emptyVersion) .detail("CleanupVersion", feedCleanup->second) .detail("Stopped", cfEntry.stopped); + + // Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore + // it + // We may just want to refactor this so updateStorage does explicit deletes based on + // changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup. + // Then we wouldn't have to reset anything here + writeToMutationLog = true; } else if (changeFeedInfo->emptyVersion < cfEntry.emptyVersion) { TEST(true); // Got updated CF emptyVersion from a parallel fetchChangeFeedMetadata changeFeedInfo->emptyVersion = cfEntry.emptyVersion; + writeToMutationLog = true; + } + if (writeToMutationLog) { auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); data->addMutationToMutationLog( mLV,