Using higher transferred version when fetching change feeds

This commit is contained in:
Josh Slocum 2022-02-10 07:59:23 -06:00
parent 7a76b86b53
commit 4af1b24c74
1 changed files with 21 additions and 7 deletions

View File

@ -4439,9 +4439,12 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
Version endVersion,
std::vector<Key> feedIds,
std::unordered_set<Key> newFeedIds) {
state std::unordered_map<Key, Version> feedMaxFetched;
if (feedIds.empty() && newFeedIds.empty()) {
return feedMaxFetched;
}
// find overlapping range feeds
state std::unordered_map<Key, Version> feedMaxFetched;
state std::map<Key, Future<Version>> feedFetches;
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
@ -4846,12 +4849,20 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys,
true); // keys will be available when getLatestVersion()==transferredVersion is durable
// Similar to transferred version, but wait for all feed data and
Version feedTransferredVersion = data->version.get() + 1;
TraceEvent(SevDebug, "FetchKeysHaveFeedData", data->thisServerID)
.detail("FKID", interval.pairID)
.detail("Version", feedTransferredVersion)
.detail("StorageVersion", data->storageVersion());
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until this
// point.
// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(shard->transferredVersion));
// Wait for the transferred version (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(feedTransferredVersion));
// TODO if this works, remove all of the fetch version stuff
// Also wait on all fetched change feed data to become committed and durable
while (!feedFetchedVersions.empty()) {
auto feed = feedFetchedVersions.begin();
@ -4860,10 +4871,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
feedFetchedVersions.erase(feed);
auto feedIt = data->uidChangeFeed.find(feedId);
if (feedIt != data->uidChangeFeed.end() && feedIt->second->durableFetchVersion.get() < maxFetched) {
wait(feedIt->second->durableFetchVersion.whenAtLeast(maxFetched));
// return to updateStorage
wait(delay(0));
/*if (feedIt != data->uidChangeFeed.end() && feedIt->second->durableFetchVersion.get() < maxFetched) {
wait(feedIt->second->durableFetchVersion.whenAtLeast(maxFetched));
// return to updateStorage
wait(delay(0));
}*/
if (feedIt != data->uidChangeFeed.end()) {
ASSERT(feedIt->second->durableFetchVersion.get() >= maxFetched);
}
}