diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index ea14244b74..fc41c113e6 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -633,6 +633,7 @@ public: std::set currentChangeFeeds; std::set fetchingChangeFeeds; std::unordered_map> changeFeedClientVersions; + std::unordered_map changeFeedCleanupDurable; // newestAvailableVersion[k] // == invalidVersion -> k is unavailable at all versions @@ -4154,6 +4155,18 @@ ACTOR Future fetchChangeFeed(StorageServer* data, .detail("RangeID", changeFeedInfo->id.printable()) .detail("Range", changeFeedInfo->range.toString()) .detail("FetchVersion", fetchVersion); + + auto cleanupPending = data->changeFeedCleanupDurable.find(changeFeedInfo->id); + if (cleanupPending != data->changeFeedCleanupDurable.end()) { + TraceEvent(SevDebug, "FetchChangeFeedWaitCleanup", data->thisServerID) + .detail("RangeID", changeFeedInfo->id.printable()) + .detail("Range", changeFeedInfo->range.toString()) + .detail("CleanupVersion", cleanupPending->second) + .detail("FetchVersion", fetchVersion); + wait(data->durableVersion.whenAtLeast(cleanupPending->second + 1)); + ASSERT(!data->changeFeedCleanupDurable.count(changeFeedInfo->id)); + } + loop { try { // TODO clean up existing param for !existing @@ -4176,14 +4189,18 @@ ACTOR Future> fetchChangeFeedMetadata(StorageServer* data, KeyR feedIds.reserve(feeds.size()); // create change feed metadata if it does not exist for (auto& cfEntry : feeds) { + bool cleanupPending = data->changeFeedCleanupDurable.count(cfEntry.rangeId); feedIds.push_back(cfEntry.rangeId); bool existing = data->uidChangeFeed.count(cfEntry.rangeId); if (!existing) { + TEST(cleanupPending); // Fetch change feed which is cleanup pending. This means there was a move away and a + // move back, this will remake the metadata TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID) .detail("RangeID", cfEntry.rangeId.printable()) .detail("Range", cfEntry.range.toString()) .detail("FetchVersion", fetchVersion) - .detail("Stopped", cfEntry.stopped); + .detail("Stopped", cfEntry.stopped) + .detail("CleanupPending", cleanupPending); Reference changeFeedInfo = Reference(new ChangeFeedInfo()); changeFeedInfo->range = cfEntry.range; @@ -4202,6 +4219,8 @@ ACTOR Future> fetchChangeFeedMetadata(StorageServer* data, KeyR MutationRef(MutationRef::SetValue, persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(), changeFeedValue(cfEntry.range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE))); + } else { + ASSERT(!cleanupPending); } } return feedIds; @@ -4865,9 +4884,23 @@ void changeServerKeys(StorageServer* data, if (!foundAssigned) { // TODO REMOVE + + Version durableVersion = data->data().getLatestVersion(); TraceEvent(SevDebug, "ChangeFeedCleanup", data->thisServerID) .detail("FeedID", f.first) - .detail("Version", version); + .detail("Version", version) + .detail("DurableVersion", durableVersion); + + data->changeFeedCleanupDurable[f.first] = durableVersion; + + Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin); + auto& mLV = data->addVersionToMutationLog(durableVersion); + data->addMutationToMutationLog( + mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey))); + data->addMutationToMutationLog(mLV, + MutationRef(MutationRef::ClearRange, + changeFeedDurableKey(f.first, 0), + changeFeedDurableKey(f.first, version))); auto rs = data->keyChangeFeed.modify(f.second); for (auto r = rs.begin(); r != rs.end(); ++r) { @@ -4886,14 +4919,6 @@ void changeServerKeys(StorageServer* data, feed->second->newMutations.trigger(); data->uidChangeFeed.erase(feed); } - - Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin); - - // all fetching actors should be cancelled by now because removing=true and moved(), so it's safe to - // clear storage directly - data->storage.clearRange(KeyRangeRef(beginClearKey, keyAfter(beginClearKey))); - data->storage.clearRange( - KeyRangeRef(changeFeedDurableKey(f.first, 0), changeFeedDurableKey(f.first, version))); } else { // if just part of feed's range is moved away auto feed = data->uidChangeFeed.find(f.first); @@ -5196,7 +5221,8 @@ private: .detail("Range", changeFeedRange.toString()) .detail("Version", currentVersion); Key beginClearKey = changeFeedId.withPrefix(persistChangeFeedKeys.begin); - auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + Version cleanupVersion = data->data().getLatestVersion(); + auto& mLV = data->addVersionToMutationLog(cleanupVersion); data->addMutationToMutationLog( mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey))); data->addMutationToMutationLog(mLV, @@ -5214,6 +5240,7 @@ private: } data->keyChangeFeed.coalesce(feed->second->range.contents()); data->uidChangeFeed.erase(feed); + data->changeFeedCleanupDurable[feed->first] = cleanupVersion; } if (addMutationToLog) { @@ -5845,6 +5872,16 @@ ACTOR Future updateStorage(StorageServer* data) { curFeed++; } + // remove any entries from changeFeedCleanupPending that were persisted + auto cfCleanup = data->changeFeedCleanupDurable.begin(); + while (cfCleanup != data->changeFeedCleanupDurable.end()) { + if (cfCleanup->second <= newOldestVersion) { + cfCleanup = data->changeFeedCleanupDurable.erase(cfCleanup); + } else { + cfCleanup++; + } + } + durableInProgress.send(Void()); wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut // down, so delay to check for cancellation