From b7be356a98bbf80104f365b3e0ff9a78a069d5cc Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Wed, 21 Sep 2022 14:44:51 -0500 Subject: [PATCH] removing overhead of local change feed stream copy (#8205) --- fdbserver/storageserver.actor.cpp | 172 +++++++++--------------------- 1 file changed, 49 insertions(+), 123 deletions(-) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index e0b930ca98..a2bc419548 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2929,49 +2929,6 @@ ACTOR Future> getChangeFeedMutations(Stor return std::make_pair(reply, gotAll); } -ACTOR Future localChangeFeedStream(StorageServer* data, - PromiseStream> results, - Key rangeID, - Version begin, - Version end, - KeyRange range) { - try { - loop { - state ChangeFeedStreamRequest feedRequest; - feedRequest.rangeID = rangeID; - feedRequest.begin = begin; - feedRequest.end = end; - feedRequest.range = range; - state std::pair feedReply = - wait(getChangeFeedMutations(data, feedRequest, true, false, UID())); - begin = feedReply.first.mutations.back().version + 1; - state int resultLoc = 0; - while (resultLoc < feedReply.first.mutations.size()) { - if (feedReply.first.mutations[resultLoc].mutations.size() || - feedReply.first.mutations[resultLoc].version == end - 1) { - wait(results.onEmpty()); - results.send(feedReply.first.mutations[resultLoc]); - } - resultLoc++; - } - - if (begin == end) { - return Void(); - } - } - } catch (Error& e) { - if (e.code() == error_code_unknown_change_feed) { - CODE_PROBE(true, "CF was moved away, no more local data to merge with"); - // Send endVersion so local stream is effectively done. We couldn't have send that already, because that - // would mean the stream would have finished without error - results.send(MutationsAndVersionRef(end, invalidVersion)); - } else { - TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID).error(e).detail("CFID", rangeID); - } - throw; - } -} - // Change feed stream must be sent an error as soon as it is moved away, or change feed can get incorrect results ACTOR Future stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) { auto feed = data->uidChangeFeed.find(req.rangeID); @@ -5834,17 +5791,9 @@ ACTOR Future fetchChangeFeedApplier(StorageServer* data, state Version lastVersion = invalidVersion; state int64_t versionsFetched = 0; - state PromiseStream> localResults; + // ensure SS is at least caught up to begin version, to maintain behavior with old fetch + wait(data->version.whenAtLeast(startVersion)); - // Add 1 to fetch version to make sure the local stream will have more versions in the stream than the remote stream - // to avoid edge cases in the merge logic - - state Future localStream = - localChangeFeedStream(data, localResults, rangeId, startVersion, endVersion + 1, range); - state Standalone localResult; - - Standalone _localResult = waitNext(localResults.getFuture()); - localResult = _localResult; try { loop { while (data->fetchKeysBudgetUsed.get()) { @@ -5854,6 +5803,10 @@ ACTOR Future fetchChangeFeedApplier(StorageServer* data, state Standalone> remoteResult = waitNext(feedResults->mutations.getFuture()); state int remoteLoc = 0; + // ensure SS is at least caught up to begin version, to maintain behavior with old fetch + if (!remoteResult.empty()) { + wait(data->version.whenAtLeast(remoteResult.back().version)); + } while (remoteLoc < remoteResult.size()) { if (feedResults->popVersion - 1 > changeFeedInfo->emptyVersion) { @@ -5881,80 +5834,53 @@ ACTOR Future fetchChangeFeedApplier(StorageServer* data, ++data->counters.kvSystemClearRanges; } - Version localVersion = localResult.version; Version remoteVersion = remoteResult[remoteLoc].version; - - if (remoteVersion <= localVersion) { - if (remoteVersion > changeFeedInfo->emptyVersion) { - // merge if same version - if (remoteVersion == localVersion && remoteResult[remoteLoc].mutations.size() && - remoteResult[remoteLoc].mutations.back().param1 != lastEpochEndPrivateKey) { - int remoteSize = remoteResult[remoteLoc].mutations.size(); - ASSERT(localResult.mutations.size()); - remoteResult[remoteLoc].mutations.append( - remoteResult.arena(), localResult.mutations.begin(), localResult.mutations.size()); - if (MUTATION_TRACKING_ENABLED) { - int midx = 0; - for (auto& m : remoteResult[remoteLoc].mutations) { - DEBUG_MUTATION("ChangeFeedWriteMoveMerge", remoteVersion, m, data->thisServerID) - .detail("Range", range) - .detail("FromLocal", midx >= remoteSize) - .detail("ChangeFeedID", rangeId); - midx++; - } - } - } else { - if (MUTATION_TRACKING_ENABLED) { - for (auto& m : remoteResult[remoteLoc].mutations) { - DEBUG_MUTATION("ChangeFeedWriteMove", remoteVersion, m, data->thisServerID) - .detail("Range", range) - .detail("ChangeFeedID", rangeId); - } - } - } - - data->storage.writeKeyValue( - KeyValueRef(changeFeedDurableKey(rangeId, remoteVersion), - changeFeedDurableValue(remoteResult[remoteLoc].mutations, - remoteResult[remoteLoc].knownCommittedVersion))); - ++data->counters.kvSystemClearRanges; - changeFeedInfo->fetchVersion = std::max(changeFeedInfo->fetchVersion, remoteVersion); - - if (firstVersion == invalidVersion) { - firstVersion = remoteVersion; - } - lastVersion = remoteVersion; - versionsFetched++; - } else { - CODE_PROBE(true, "Change feed ignoring write on move because it was popped concurrently"); - if (MUTATION_TRACKING_ENABLED) { - for (auto& m : remoteResult[remoteLoc].mutations) { - DEBUG_MUTATION("ChangeFeedWriteMoveIgnore", remoteVersion, m, data->thisServerID) - .detail("Range", range) - .detail("ChangeFeedID", rangeId) - .detail("EmptyVersion", changeFeedInfo->emptyVersion); - } - } - if (versionsFetched > 0) { - ASSERT(firstVersion != invalidVersion); - ASSERT(lastVersion != invalidVersion); - data->storage.clearRange( - KeyRangeRef(changeFeedDurableKey(changeFeedInfo->id, firstVersion), - changeFeedDurableKey(changeFeedInfo->id, lastVersion + 1))); - ++data->counters.kvSystemClearRanges; - firstVersion = invalidVersion; - lastVersion = invalidVersion; - versionsFetched = 0; + // ensure SS is at least caught up to this version, to maintain behavior with old fetch + ASSERT(remoteVersion <= data->version.get()); + if (remoteVersion > changeFeedInfo->emptyVersion) { + if (MUTATION_TRACKING_ENABLED) { + for (auto& m : remoteResult[remoteLoc].mutations) { + DEBUG_MUTATION("ChangeFeedWriteMove", remoteVersion, m, data->thisServerID) + .detail("Range", range) + .detail("ChangeFeedID", rangeId); } } - remoteLoc++; - } - if (localVersion <= remoteVersion) { - // Do this once per wait instead of once per version for efficiency - data->fetchingChangeFeeds.insert(changeFeedInfo->id); - Standalone _localResult = waitNext(localResults.getFuture()); - localResult = _localResult; + + data->storage.writeKeyValue( + KeyValueRef(changeFeedDurableKey(rangeId, remoteVersion), + changeFeedDurableValue(remoteResult[remoteLoc].mutations, + remoteResult[remoteLoc].knownCommittedVersion))); + ++data->counters.kvSystemClearRanges; + changeFeedInfo->fetchVersion = std::max(changeFeedInfo->fetchVersion, remoteVersion); + + if (firstVersion == invalidVersion) { + firstVersion = remoteVersion; + } + lastVersion = remoteVersion; + versionsFetched++; + } else { + CODE_PROBE(true, "Change feed ignoring write on move because it was popped concurrently"); + if (MUTATION_TRACKING_ENABLED) { + for (auto& m : remoteResult[remoteLoc].mutations) { + DEBUG_MUTATION("ChangeFeedWriteMoveIgnore", remoteVersion, m, data->thisServerID) + .detail("Range", range) + .detail("ChangeFeedID", rangeId) + .detail("EmptyVersion", changeFeedInfo->emptyVersion); + } + } + if (versionsFetched > 0) { + ASSERT(firstVersion != invalidVersion); + ASSERT(lastVersion != invalidVersion); + data->storage.clearRange( + KeyRangeRef(changeFeedDurableKey(changeFeedInfo->id, firstVersion), + changeFeedDurableKey(changeFeedInfo->id, lastVersion + 1))); + ++data->counters.kvSystemClearRanges; + firstVersion = invalidVersion; + lastVersion = invalidVersion; + versionsFetched = 0; + } } + remoteLoc++; } // Do this once per wait instead of once per version for efficiency data->fetchingChangeFeeds.insert(changeFeedInfo->id);