diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a2b47e0913..b14c1c667d 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4572,7 +4572,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // we have written) state Future fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1); - wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion, changeFeedsToFetch)); holdingFKPL.release(); wait(fetchDurable); @@ -4607,6 +4606,9 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { ASSERT(shard->transferredVersion > data->storageVersion()); ASSERT(shard->transferredVersion == data->data().getLatestVersion()); + state Future cfFetch = + dispatchChangeFeeds(data, fetchKeysID, keys, shard->transferredVersion, changeFeedsToFetch); + TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID) .detail("FKID", interval.pairID) .detail("Version", shard->transferredVersion) @@ -4643,6 +4645,9 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard->updates.clear(); + // wait on change feed fetch to complete before marking data as available + wait(cfFetch); + setAvailableStatus(data, keys, true); // keys will be available when getLatestVersion()==transferredVersion is durable @@ -4726,40 +4731,6 @@ void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const } // Add the mutation to the version. updates.back().mutations.push_back_deep(updates.back().arena(), mutation); - if (!fromFetch) { - if (mutation.type == MutationRef::SetValue) { - for (auto& it : server->keyChangeFeed[mutation.param1]) { - if (!it->stopped) { - if (it->mutations.empty() || it->mutations.back().version != version) { - it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion)); - } - it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation); - server->currentChangeFeeds.insert(it->id); - DEBUG_MUTATION("ChangeFeedWriteSet", version, mutation, server->thisServerID) - .detail("Range", it->range) - .detail("ChangeFeedID", it->id) - .detail("Source", "Adding"); - } - } - } else if (mutation.type == MutationRef::ClearRange) { - auto ranges = server->keyChangeFeed.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2)); - for (auto& r : ranges) { - for (auto& it : r.value()) { - if (!it->stopped) { - if (it->mutations.empty() || it->mutations.back().version != version) { - it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion)); - } - it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation); - server->currentChangeFeeds.insert(it->id); - DEBUG_MUTATION("ChangeFeedWriteClear", version, mutation, server->thisServerID) - .detail("Range", it->range) - .detail("ChangeFeedID", it->id) - .detail("Source", "Adding"); - } - } - } - } - } } else if (phase == Waiting) { server->addMutation(version, fromFetch, mutation, keys, server->updateEagerReads); } else