Simplify change feed fetching to not inject anything at all during fetch and rely on reading from the remote feeds

This commit is contained in:
Josh Slocum 2022-01-28 14:21:26 -06:00
parent 2d3b216f60
commit cf7ed4ee40
1 changed files with 6 additions and 35 deletions

View File

@ -4572,7 +4572,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// we have written)
state Future<Void> fetchDurable = data->durableVersion.whenAtLeast(data->storageVersion() + 1);
wait(dispatchChangeFeeds(data, fetchKeysID, keys, fetchVersion, changeFeedsToFetch));
holdingFKPL.release();
wait(fetchDurable);
@ -4607,6 +4606,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
ASSERT(shard->transferredVersion > data->storageVersion());
ASSERT(shard->transferredVersion == data->data().getLatestVersion());
state Future<Void> cfFetch =
dispatchChangeFeeds(data, fetchKeysID, keys, shard->transferredVersion, changeFeedsToFetch);
TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID)
.detail("FKID", interval.pairID)
.detail("Version", shard->transferredVersion)
@ -4643,6 +4645,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
shard->updates.clear();
// wait on change feed fetch to complete before marking data as available
wait(cfFetch);
setAvailableStatus(data,
keys,
true); // keys will be available when getLatestVersion()==transferredVersion is durable
@ -4726,40 +4731,6 @@ void AddingShard::addMutation(Version version, bool fromFetch, MutationRef const
}
// Add the mutation to the version.
updates.back().mutations.push_back_deep(updates.back().arena(), mutation);
if (!fromFetch) {
if (mutation.type == MutationRef::SetValue) {
for (auto& it : server->keyChangeFeed[mutation.param1]) {
if (!it->stopped) {
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion));
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation);
server->currentChangeFeeds.insert(it->id);
DEBUG_MUTATION("ChangeFeedWriteSet", version, mutation, server->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Source", "Adding");
}
}
} else if (mutation.type == MutationRef::ClearRange) {
auto ranges = server->keyChangeFeed.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2));
for (auto& r : ranges) {
for (auto& it : r.value()) {
if (!it->stopped) {
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version, server->knownCommittedVersion));
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), mutation);
server->currentChangeFeeds.insert(it->id);
DEBUG_MUTATION("ChangeFeedWriteClear", version, mutation, server->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Source", "Adding");
}
}
}
}
}
} else if (phase == Waiting) {
server->addMutation(version, fromFetch, mutation, keys, server->updateEagerReads);
} else