Fixing change feed private mutation handling

This commit is contained in:
Josh Slocum 2022-01-18 20:17:28 -06:00
parent 08f8700636
commit 79704aa05f
2 changed files with 57 additions and 12 deletions

View File

@ -7160,6 +7160,10 @@ Version ChangeFeedData::getVersion() {
#define DEBUG_CF_VERSION(cfId, v) \
DEBUG_CF_START_VERSION <= v&& v <= DEBUG_CF_END_VERSION && (""_sr == DEBUG_CF_ID || cfId.printable() == DEBUG_CF_ID)
#define DEBUG_CF_VERSION_RANGE(cfId, vStart, vEnd) \
DEBUG_CF_START_VERSION <= vEnd&& vStart <= DEBUG_CF_END_VERSION && \
(""_sr == DEBUG_CF_ID || cfId.printable() == DEBUG_CF_ID)
#define DEBUG_CF_WAIT(cfId, v) DEBUG_CF_WAIT_VERSION == v && (""_sr == DEBUG_CF_ID || cfId.printable() == DEBUG_CF_ID)
// This function is essentially bubbling the information about what has been processed from the server through the
@ -7314,7 +7318,8 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
int idx /* TODO REMOVE this param after correctness clean */) {
int idx /* TODO REMOVE this param after correctness clean */,
KeyRange range /* TODO REMOVE this param after correctness clean */) {
// calling lastReturnedVersion's callbacks could cause us to be cancelled
state Promise<Void> refresh = feedData->refresh;
@ -7323,12 +7328,20 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
try {
loop {
if (nextVersion >= end) {
if (DEBUG_CF_VERSION(feedData->id, end)) {
fmt::print(" single {0} {1} [{2} - {3}): sending EOS\n",
idx,
interf.id().toString().substr(0, 4),
range.begin.printable(),
range.end.printable());
}
results.sendError(end_of_stream());
return Void();
}
choose {
when(state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture())) {
if (DEBUG_CF_VERSION(feedData->id, rep.mutations.back().version)) {
if (DEBUG_CF_VERSION_RANGE(
feedData->id, rep.mutations.front().version, rep.mutations.back().version)) {
fmt::print(" single {0} {1}: response {2} - {3} ({4}), atLatest={5}, rep.atLatest={6}, "
"notAtLatest={7}, "
"minSV={8}\n",
@ -7343,6 +7356,12 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
rep.minStreamVersion);
}
// TODO REMOVE, just for debugging
// set next version so debug statements trigger
if (nextVersion == 0) {
nextVersion = rep.mutations.front().version;
}
if (rep.mutations.back().version > feedData->maxSeenVersion) {
feedData->maxSeenVersion = rep.mutations.back().version;
}
@ -7351,9 +7370,11 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
while (resultLoc < rep.mutations.size()) {
wait(results.onEmpty());
if (DEBUG_CF_VERSION(feedData->id, rep.mutations[resultLoc].version)) {
fmt::print(" single {0} {1}: sending {2}/{3} {4} ({5})\n",
fmt::print(" single {0} {1} [{2} - {3}): sending {4}/{5} {6} ({7})\n",
idx,
interf.id().toString().substr(0, 4),
range.begin.printable(),
range.end.printable(),
resultLoc,
rep.mutations.size(),
rep.mutations[resultLoc].version,
@ -7423,7 +7444,14 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
}
} catch (Error& e) {
// TODO REMOVE eventually, useful for debugging for now
fmt::print("NAS: CFError {}\n", e.name());
if (DEBUG_CF_VERSION(feedData->id, nextVersion)) {
fmt::print(" single {0} {1} [{2} - {3}): CFError {4}\n",
idx,
interf.id().toString().substr(0, 4),
range.begin.printable(),
range.end.printable(),
e.name());
}
if (e.code() == error_code_actor_cancelled) {
throw;
}
@ -7451,10 +7479,6 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
wait(delay(0));
ASSERT(results->mutations.isEmpty());
// push one initial mutation from each stream
// Without this delay, weird issues with the last stream getting on another stream's callstack can happen
wait(delay(0));
state int interfNum = 0;
// TODO minor optimization - could make this just a vector of indexes if each MutationAndVersionStream remembered
@ -7465,6 +7489,7 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
streamsUsed.push_back(stream);
}
state Version nextVersion;
loop {
// bring all of the streams up to date to ensure we have the latest element from each stream in mutations
interfNum = 0;
@ -7495,7 +7520,7 @@ ACTOR Future<Void> doCFMerge(Reference<ChangeFeedData> results,
// pop first item off queue - this will be mutation with the lowest version
Standalone<VectorRef<MutationsAndVersionRef>> nextOut;
state Version nextVersion = nextVersion = mutations.top().next.version;
nextVersion = mutations.top().next.version;
streamsUsed.push_back(mutations.top());
nextOut.push_back_deep(nextOut.arena(), mutations.top().next);
@ -7593,10 +7618,19 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
results->notAtLatest.set(interfs.size());
refresh.send(Void());
if (DEBUG_CF_START_VERSION != invalidVersion) {
fmt::print("Starting merge cursor for {0} @ {1} - {2}\n", interfs.size(), *begin, end);
}
for (int i = 0; i < interfs.size(); i++) {
onErrors[i] = results->streams[i].onError();
fetchers[i] = partialChangeFeedStream(
interfs[i].first, streams[i].results, results->streams[i], end, results, results->storageData[i], i);
fetchers[i] = partialChangeFeedStream(interfs[i].first,
streams[i].results,
results->streams[i],
end,
results,
results->storageData[i],
i,
interfs[i].second);
}
wait(onCFErrors(onErrors) || doCFMerge(results, interfs, streams, begin, end));

View File

@ -4023,6 +4023,7 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("FetchVersion", fetchVersion)
.detail("Existing", existing);
if (!existing) {
@ -4068,6 +4069,7 @@ ACTOR Future<Void> dispatchChangeFeeds(StorageServer* data, UID fetchKeysID, Key
try {
state std::vector<OverlappingChangeFeedEntry> feeds =
wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1));
// TODO add trace events for some of these
for (auto& feed : feeds) {
feedFetches[feed.rangeId] = fetchChangeFeed(data, feed.rangeId, feed.range, feed.stopped, fetchVersion);
}
@ -4492,6 +4494,10 @@ void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation);
server->currentChangeFeeds.insert(it->id);
DEBUG_MUTATION("ChangeFeedWriteSet", version, mutation, server->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Source", "Adding");
}
}
} else if (mutation.type == MutationRef::ClearRange) {
@ -4504,6 +4510,10 @@ void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation);
server->currentChangeFeeds.insert(it->id);
DEBUG_MUTATION("ChangeFeedWriteClear", version, mutation, server->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Source", "Adding");
}
}
}
@ -4972,7 +4982,8 @@ private:
}
data->keyChangeFeed.coalesce(feed->second->range.contents());
data->uidChangeFeed.erase(feed);
} else {
} else if (status != ChangeFeedStatus::CHANGE_FEED_CREATE) {
// Can be a change feed create from move, ignore
// must be pop or stop
if (status == ChangeFeedStatus::CHANGE_FEED_STOP) {
TraceEvent(SevDebug, "StoppingChangeFeed", data->thisServerID)