diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index b3c099f187..ea14244b74 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4146,45 +4146,20 @@ ACTOR Future fetchChangeFeedApplier(StorageServer* data, } ACTOR Future fetchChangeFeed(StorageServer* data, - Key rangeId, - KeyRange range, - bool stopped, + Reference changeFeedInfo, Version fetchVersion) { - state Reference changeFeedInfo; wait(delay(0)); // allow this actor to be cancelled by removals - state bool existing = data->uidChangeFeed.count(rangeId); TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID) - .detail("RangeID", rangeId.printable()) - .detail("Range", range.toString()) - .detail("FetchVersion", fetchVersion) - .detail("Existing", existing); - - if (!existing) { - changeFeedInfo = Reference(new ChangeFeedInfo()); - changeFeedInfo->range = range; - changeFeedInfo->id = rangeId; - changeFeedInfo->stopped = stopped; - data->uidChangeFeed[rangeId] = changeFeedInfo; - auto rs = data->keyChangeFeed.modify(range); - for (auto r = rs.begin(); r != rs.end(); ++r) { - r->value().push_back(changeFeedInfo); - } - data->keyChangeFeed.coalesce(range.contents()); - auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); - data->addMutationToMutationLog( - mLV, - MutationRef(MutationRef::SetValue, - persistChangeFeedKeys.begin.toString() + rangeId.toString(), - changeFeedValue(range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE))); - } else { - changeFeedInfo = data->uidChangeFeed[rangeId]; - } - + .detail("RangeID", changeFeedInfo->id.printable()) + .detail("Range", changeFeedInfo->range.toString()) + .detail("FetchVersion", fetchVersion); loop { try { - wait(fetchChangeFeedApplier(data, changeFeedInfo, rangeId, range, fetchVersion, existing)); - data->fetchingChangeFeeds.insert(rangeId); + // TODO clean up existing param for !existing + wait(fetchChangeFeedApplier( + data, changeFeedInfo, changeFeedInfo->id, changeFeedInfo->range, fetchVersion, false)); + data->fetchingChangeFeeds.insert(changeFeedInfo->id); return Void(); } catch (Error& e) { if (e.code() != error_code_change_feed_not_registered) { @@ -4195,17 +4170,62 @@ ACTOR Future fetchChangeFeed(StorageServer* data, } } -ACTOR Future dispatchChangeFeeds(StorageServer* data, UID fetchKeysID, KeyRange keys, Version fetchVersion) { +ACTOR Future> fetchChangeFeedMetadata(StorageServer* data, KeyRange keys, Version fetchVersion) { + std::vector feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1)); + std::vector feedIds; + feedIds.reserve(feeds.size()); + // create change feed metadata if it does not exist + for (auto& cfEntry : feeds) { + feedIds.push_back(cfEntry.rangeId); + bool existing = data->uidChangeFeed.count(cfEntry.rangeId); + if (!existing) { + TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID) + .detail("RangeID", cfEntry.rangeId.printable()) + .detail("Range", cfEntry.range.toString()) + .detail("FetchVersion", fetchVersion) + .detail("Stopped", cfEntry.stopped); + + Reference changeFeedInfo = Reference(new ChangeFeedInfo()); + changeFeedInfo->range = cfEntry.range; + changeFeedInfo->id = cfEntry.rangeId; + + changeFeedInfo->stopped = cfEntry.stopped; + data->uidChangeFeed[cfEntry.rangeId] = changeFeedInfo; + auto rs = data->keyChangeFeed.modify(cfEntry.range); + for (auto r = rs.begin(); r != rs.end(); ++r) { + r->value().push_back(changeFeedInfo); + } + data->keyChangeFeed.coalesce(cfEntry.range.contents()); + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + data->addMutationToMutationLog( + mLV, + MutationRef(MutationRef::SetValue, + persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(), + changeFeedValue(cfEntry.range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE))); + } + } + return feedIds; +} + +ACTOR Future dispatchChangeFeeds(StorageServer* data, + UID fetchKeysID, + KeyRange keys, + Version fetchVersion, + std::vector feedIds) { + // find overlapping range feeds state std::map> feedFetches; state PromiseStream removals; data->changeFeedRemovals[fetchKeysID] = removals; try { - state std::vector feeds = - wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1)); + // TODO add trace events for some of these - for (auto& feed : feeds) { - feedFetches[feed.rangeId] = fetchChangeFeed(data, feed.rangeId, feed.range, feed.stopped, fetchVersion); + for (auto& feedId : feedIds) { + auto feedIt = data->uidChangeFeed.find(feedId); + // TODO REMOVE this assert once we enable change feed deletion + ASSERT(feedIt != data->uidChangeFeed.end()); + Reference feed = feedIt->second; + feedFetches[feed->id] = fetchChangeFeed(data, feed, fetchVersion); } loop { @@ -4264,7 +4284,10 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(SevDebug, interval.begin(), data->thisServerID) .detail("KeyBegin", shard->keys.begin) - .detail("KeyEnd", shard->keys.end); + .detail("KeyEnd", shard->keys.end) + .detail("Version", data->version.get()); + + state Future> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, data->version.get()); validate(data); @@ -4297,6 +4320,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // Fetch keys gets called while the update actor is processing mutations. data->version will not be updated // until all mutations for a version have been processed. We need to take the durableVersionLock to ensure // data->version is greater than the version of the mutation which caused the fetch to be initiated. + + // We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure + // change feed mutations get applied correctly + state std::vector changeFeedsToFetch; + std::vector _cfToFetch = wait(fetchCFMetadata); + changeFeedsToFetch = _cfToFetch; wait(data->durableVersionLock.take()); shard->phase = AddingShard::Fetching; @@ -4465,7 +4494,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // we have written) state Future fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1); - wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion)); + wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion, changeFeedsToFetch)); holdingFKPL.release(); wait(fetchDurable); @@ -4960,10 +4989,9 @@ public: applyPrivateData(data, m); } } else { - // FIXME: enable when DEBUG_MUTATION is active - // for(auto m = changes[c].mutations.begin(); m; ++m) { - // DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID); - //} + if (MUTATION_TRACKING_ENABLED) { + DEBUG_MUTATION("SSUpdateMutation", ver, m, data->thisServerID).detail("FromFetch", fromFetch); + } splitMutation(data, data->shards, m, ver, fromFetch); } diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index 175c27b71a..d8f0495f40 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -341,6 +341,11 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { return; } + TraceEvent ev(SevError, "BGMismatch"); + ev.detail("DirectoryID", format("%08x", threadData->directoryID)) + .detail("RangeStart", format("%08x", startKey)) + .detail("RangeEnd", format("%08x", endKey)) + .detail("Version", readVersion); fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3}\n", format("%08x", threadData->directoryID), format("%08x", startKey),