Fixing granule mismatch bug caused by race in change feed fetch (#8019)
* fixing another feed fetch race causing incorrect data * limiting size of final data check in granule workload to not get too large of a mapping
This commit is contained in:
parent
756e33e7fe
commit
058c720ef3
|
@ -5621,7 +5621,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||
}
|
||||
|
||||
TraceEvent(SevDebug, "ChangeFeedPopQuery", self->thisServerID)
|
||||
.detail("RangeID", req.rangeID)
|
||||
.detail("FeedID", req.rangeID)
|
||||
.detail("Version", req.version)
|
||||
.detail("SSVersion", self->version.get())
|
||||
.detail("Range", req.range);
|
||||
|
@ -5680,7 +5680,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
if (startVersion >= endVersion || (changeFeedInfo->removing)) {
|
||||
CODE_PROBE(true, "Change Feed popped before fetch");
|
||||
TraceEvent(SevDebug, "FetchChangeFeedNoOp", data->thisServerID)
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("FeedID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("StartVersion", startVersion)
|
||||
.detail("EndVersion", endVersion)
|
||||
|
@ -5832,7 +5832,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
if (e.code() != error_code_end_of_stream) {
|
||||
TraceEvent(SevDebug, "FetchChangeFeedError", data->thisServerID)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("FeedID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("EndVersion", endVersion)
|
||||
.detail("Removing", changeFeedInfo->removing)
|
||||
|
@ -5881,7 +5881,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
|
|||
}
|
||||
|
||||
TraceEvent(SevDebug, "FetchChangeFeedDone", data->thisServerID)
|
||||
.detail("RangeID", rangeId)
|
||||
.detail("FeedID", rangeId)
|
||||
.detail("Range", range)
|
||||
.detail("StartVersion", startVersion)
|
||||
.detail("EndVersion", endVersion)
|
||||
|
@ -5901,7 +5901,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
wait(delay(0)); // allow this actor to be cancelled by removals
|
||||
|
||||
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("FeedID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("BeginVersion", beginVersion)
|
||||
.detail("EndVersion", endVersion);
|
||||
|
@ -5910,7 +5910,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
if (cleanupPending != data->changeFeedCleanupDurable.end()) {
|
||||
CODE_PROBE(true, "Change feed waiting for dirty previous move to finish");
|
||||
TraceEvent(SevDebug, "FetchChangeFeedWaitCleanup", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("FeedID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("CleanupVersion", cleanupPending->second)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
|
@ -5923,7 +5923,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
if (cleanupPendingAfter != data->changeFeedCleanupDurable.end()) {
|
||||
ASSERT(cleanupPendingAfter->second >= endVersion);
|
||||
TraceEvent(SevDebug, "FetchChangeFeedCancelledByCleanup", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("FeedID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("BeginVersion", beginVersion)
|
||||
.detail("EndVersion", endVersion);
|
||||
|
@ -5960,7 +5960,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
|
|||
Version cleanupVersion = data->data().getLatestVersion();
|
||||
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetch", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id)
|
||||
.detail("FeedID", changeFeedInfo->id)
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("Version", cleanupVersion);
|
||||
|
||||
|
@ -6041,6 +6041,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
}
|
||||
}
|
||||
}
|
||||
// FIXME: might want to inject delay here sometimes in simulation, so that races that would only happen when a feed
|
||||
// destroy causes a wait are more prominent?
|
||||
|
||||
std::vector<Key> feedIds;
|
||||
feedIds.reserve(feedMetadata.feeds.size());
|
||||
|
@ -6052,7 +6054,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
bool existing = existingEntry != data->uidChangeFeed.end();
|
||||
|
||||
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", cfEntry.feedId)
|
||||
.detail("FeedID", cfEntry.feedId)
|
||||
.detail("Range", cfEntry.range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", cfEntry.emptyVersion)
|
||||
|
@ -6093,27 +6095,30 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
|
||||
auto fid = missingFeeds.find(cfEntry.feedId);
|
||||
if (fid != missingFeeds.end()) {
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
|
||||
.detail("RangeID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
.detail("StopVersion", changeFeedInfo->stopVersion)
|
||||
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
|
||||
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
missingFeeds.erase(fid);
|
||||
ASSERT(!changeFeedInfo->destroyed);
|
||||
ASSERT(changeFeedInfo->removing);
|
||||
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
|
||||
// could possibly be not removing because it was reset while
|
||||
// waiting on destroyedFeeds by a private mutation or another fetch
|
||||
if (changeFeedInfo->removing) {
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfoFromFetch", data->thisServerID)
|
||||
.detail("FeedID", changeFeedInfo->id.printable())
|
||||
.detail("Range", changeFeedInfo->range)
|
||||
.detail("FetchVersion", fetchVersion)
|
||||
.detail("EmptyVersion", changeFeedInfo->emptyVersion)
|
||||
.detail("StopVersion", changeFeedInfo->stopVersion)
|
||||
.detail("PreviousMetadataVersion", changeFeedInfo->metadataVersion)
|
||||
.detail("NewMetadataVersion", cfEntry.feedMetadataVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
||||
changeFeedInfo->removing = false;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
changeFeedInfo->fetchVersion = invalidVersion;
|
||||
changeFeedInfo->durableFetchVersion = NotifiedVersion();
|
||||
CODE_PROBE(true, "re-fetching feed scheduled for deletion! Un-mark it as removing");
|
||||
|
||||
addMutationToLog = true;
|
||||
// TODO only reset data if feed is still removing
|
||||
changeFeedInfo->removing = false;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
changeFeedInfo->fetchVersion = invalidVersion;
|
||||
changeFeedInfo->durableFetchVersion = NotifiedVersion();
|
||||
addMutationToLog = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (changeFeedInfo->destroyed) {
|
||||
|
@ -6183,7 +6188,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
|
||||
CODE_PROBE(true, "Destroying change feed from fetch metadata"); //
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
|
||||
.detail("RangeID", feed.first)
|
||||
.detail("FeedID", feed.first)
|
||||
.detail("Range", existingEntry->second->range)
|
||||
.detail("Version", cleanupVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
|
@ -7664,7 +7669,7 @@ private:
|
|||
auto feed = data->uidChangeFeed.find(changeFeedId);
|
||||
|
||||
TraceEvent(SevDebug, "ChangeFeedPrivateMutation", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion)
|
||||
.detail("PopVersion", popVersion)
|
||||
|
@ -7693,7 +7698,7 @@ private:
|
|||
ASSERT(feed != data->uidChangeFeed.end());
|
||||
|
||||
TraceEvent(SevDebug, "AddingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("EmptyVersion", feed->second->emptyVersion);
|
||||
|
||||
|
@ -7702,6 +7707,29 @@ private:
|
|||
r->value().push_back(changeFeedInfo);
|
||||
}
|
||||
data->keyChangeFeed.coalesce(changeFeedRange.contents());
|
||||
} else if (feed != data->uidChangeFeed.end() && feed->second->removing && !feed->second->destroyed &&
|
||||
status != ChangeFeedStatus::CHANGE_FEED_DESTROY) {
|
||||
// Because we got a private mutation for this change feed, the feed must have moved back after being
|
||||
// moved away. Normally we would later find out about this via a fetch, but in the particular case where
|
||||
// the private mutation is the creation of the change feed, and the following race occurred, we must
|
||||
// refresh it here:
|
||||
// 1. This SS found out about the feed from a fetch, from a SS with a higher version that already got
|
||||
// the feed create mutation
|
||||
// 2. The shard was moved away
|
||||
// 3. The shard was moved back, and this SS fetched change feed metadata from a different SS that did
|
||||
// not yet recieve the private mutation, so the feed was not refreshed
|
||||
// 4. This SS gets the private mutation, the feed is still marked as removing
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfoFromPrivateMutation", data->thisServerID)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
|
||||
CODE_PROBE(true, "private mutation for feed scheduled for deletion! Un-mark it as removing");
|
||||
|
||||
feed->second->removing = false;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
feed->second->fetchVersion = invalidVersion;
|
||||
feed->second->durableFetchVersion = NotifiedVersion();
|
||||
}
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->updateMetadataVersion(currentVersion);
|
||||
|
@ -7734,7 +7762,7 @@ private:
|
|||
|
||||
} else if (status == ChangeFeedStatus::CHANGE_FEED_CREATE && createdFeed) {
|
||||
TraceEvent(SevDebug, "CreatingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
// no-op, already created metadata
|
||||
|
@ -7742,7 +7770,7 @@ private:
|
|||
}
|
||||
if (status == ChangeFeedStatus::CHANGE_FEED_STOP && currentVersion < feed->second->stopVersion) {
|
||||
TraceEvent(SevDebug, "StoppingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
feed->second->stopVersion = currentVersion;
|
||||
|
@ -7750,7 +7778,7 @@ private:
|
|||
}
|
||||
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY && !createdFeed && feed != data->uidChangeFeed.end()) {
|
||||
TraceEvent(SevDebug, "DestroyingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("Version", currentVersion);
|
||||
Key beginClearKey = changeFeedId.withPrefix(persistChangeFeedKeys.begin);
|
||||
|
@ -9423,7 +9451,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||
std::tie(changeFeedRange, popVersion, stopVersion, metadataVersion) =
|
||||
decodeChangeFeedSSValue(changeFeeds[feedLoc].value);
|
||||
TraceEvent(SevDebug, "RestoringChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("StopVersion", stopVersion)
|
||||
.detail("PopVer", popVersion)
|
||||
|
|
|
@ -932,6 +932,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
loop {
|
||||
state RangeResult output;
|
||||
state Version readVersion = invalidVersion;
|
||||
state int64_t bufferedBytes = 0;
|
||||
try {
|
||||
Version ver = wait(tr.getReadVersion());
|
||||
readVersion = ver;
|
||||
|
@ -943,6 +944,11 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
Standalone<RangeResultRef> res = waitNext(results.getFuture());
|
||||
output.arena().dependsOn(res.arena());
|
||||
output.append(output.arena(), res.begin(), res.size());
|
||||
bufferedBytes += res.expectedSize();
|
||||
// force checking if we have enough data
|
||||
if (bufferedBytes >= 10 * SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
|
Loading…
Reference in New Issue