Fetch Change Feed metadata at the start of fetchKeys to fix mutation race

This commit is contained in:
Josh Slocum 2022-01-27 18:46:22 -06:00
parent c26e11c2c3
commit 91df569fdd
2 changed files with 77 additions and 44 deletions

View File

@ -4146,45 +4146,20 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
}
ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
Key rangeId,
KeyRange range,
bool stopped,
Reference<ChangeFeedInfo> changeFeedInfo,
Version fetchVersion) {
state Reference<ChangeFeedInfo> changeFeedInfo;
wait(delay(0)); // allow this actor to be cancelled by removals
state bool existing = data->uidChangeFeed.count(rangeId);
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("FetchVersion", fetchVersion)
.detail("Existing", existing);
if (!existing) {
changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = range;
changeFeedInfo->id = rangeId;
changeFeedInfo->stopped = stopped;
data->uidChangeFeed[rangeId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(range);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(changeFeedInfo);
}
data->keyChangeFeed.coalesce(range.contents());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + rangeId.toString(),
changeFeedValue(range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE)));
} else {
changeFeedInfo = data->uidChangeFeed[rangeId];
}
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())
.detail("FetchVersion", fetchVersion);
loop {
try {
wait(fetchChangeFeedApplier(data, changeFeedInfo, rangeId, range, fetchVersion, existing));
data->fetchingChangeFeeds.insert(rangeId);
// TODO clean up existing param for !existing
wait(fetchChangeFeedApplier(
data, changeFeedInfo, changeFeedInfo->id, changeFeedInfo->range, fetchVersion, false));
data->fetchingChangeFeeds.insert(changeFeedInfo->id);
return Void();
} catch (Error& e) {
if (e.code() != error_code_change_feed_not_registered) {
@ -4195,17 +4170,62 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
}
}
ACTOR Future<Void> dispatchChangeFeeds(StorageServer* data, UID fetchKeysID, KeyRange keys, Version fetchVersion) {
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyRange keys, Version fetchVersion) {
std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1));
std::vector<Key> feedIds;
feedIds.reserve(feeds.size());
// create change feed metadata if it does not exist
for (auto& cfEntry : feeds) {
feedIds.push_back(cfEntry.rangeId);
bool existing = data->uidChangeFeed.count(cfEntry.rangeId);
if (!existing) {
TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID)
.detail("RangeID", cfEntry.rangeId.printable())
.detail("Range", cfEntry.range.toString())
.detail("FetchVersion", fetchVersion)
.detail("Stopped", cfEntry.stopped);
Reference<ChangeFeedInfo> changeFeedInfo = Reference<ChangeFeedInfo>(new ChangeFeedInfo());
changeFeedInfo->range = cfEntry.range;
changeFeedInfo->id = cfEntry.rangeId;
changeFeedInfo->stopped = cfEntry.stopped;
data->uidChangeFeed[cfEntry.rangeId] = changeFeedInfo;
auto rs = data->keyChangeFeed.modify(cfEntry.range);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(changeFeedInfo);
}
data->keyChangeFeed.coalesce(cfEntry.range.contents());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + cfEntry.rangeId.toString(),
changeFeedValue(cfEntry.range, invalidVersion, ChangeFeedStatus::CHANGE_FEED_CREATE)));
}
}
return feedIds;
}
ACTOR Future<Void> dispatchChangeFeeds(StorageServer* data,
UID fetchKeysID,
KeyRange keys,
Version fetchVersion,
std::vector<Key> feedIds) {
// find overlapping range feeds
state std::map<Key, Future<Void>> feedFetches;
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
try {
state std::vector<OverlappingChangeFeedEntry> feeds =
wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1));
// TODO add trace events for some of these
for (auto& feed : feeds) {
feedFetches[feed.rangeId] = fetchChangeFeed(data, feed.rangeId, feed.range, feed.stopped, fetchVersion);
for (auto& feedId : feedIds) {
auto feedIt = data->uidChangeFeed.find(feedId);
// TODO REMOVE this assert once we enable change feed deletion
ASSERT(feedIt != data->uidChangeFeed.end());
Reference<ChangeFeedInfo> feed = feedIt->second;
feedFetches[feed->id] = fetchChangeFeed(data, feed, fetchVersion);
}
loop {
@ -4264,7 +4284,10 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
.detail("KeyBegin", shard->keys.begin)
.detail("KeyEnd", shard->keys.end);
.detail("KeyEnd", shard->keys.end)
.detail("Version", data->version.get());
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, data->version.get());
validate(data);
@ -4297,6 +4320,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// Fetch keys gets called while the update actor is processing mutations. data->version will not be updated
// until all mutations for a version have been processed. We need to take the durableVersionLock to ensure
// data->version is greater than the version of the mutation which caused the fetch to be initiated.
// We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure
// change feed mutations get applied correctly
state std::vector<Key> changeFeedsToFetch;
std::vector<Key> _cfToFetch = wait(fetchCFMetadata);
changeFeedsToFetch = _cfToFetch;
wait(data->durableVersionLock.take());
shard->phase = AddingShard::Fetching;
@ -4465,7 +4494,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// we have written)
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion));
wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion, changeFeedsToFetch));
holdingFKPL.release();
wait(fetchDurable);
@ -4960,10 +4989,9 @@ public:
applyPrivateData(data, m);
}
} else {
// FIXME: enable when DEBUG_MUTATION is active
// for(auto m = changes[c].mutations.begin(); m; ++m) {
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID);
//}
if (MUTATION_TRACKING_ENABLED) {
DEBUG_MUTATION("SSUpdateMutation", ver, m, data->thisServerID).detail("FromFetch", fromFetch);
}
splitMutation(data, data->shards, m, ver, fromFetch);
}

View File

@ -341,6 +341,11 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
return;
}
TraceEvent ev(SevError, "BGMismatch");
ev.detail("DirectoryID", format("%08x", threadData->directoryID))
.detail("RangeStart", format("%08x", startKey))
.detail("RangeEnd", format("%08x", endKey))
.detail("Version", readVersion);
fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3}\n",
format("%08x", threadData->directoryID),
format("%08x", startKey),