Merge pull request #6074 from sfc-gh-etschannen/blob_integration

fix: fetched change feeds did not get their durable version updated
This commit is contained in:
Evan Tschannen 2021-11-30 13:33:30 -08:00 committed by GitHub
commit 0b9505c52a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 4 additions and 1 deletions

View File

@ -622,6 +622,7 @@ public:
Deque<std::pair<std::vector<Key>, Version>> changeFeedVersions;
std::map<UID, PromiseStream<Key>> changeFeedRemovals;
std::set<Key> currentChangeFeeds;
std::set<Key> fetchingChangeFeeds;
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
// newestAvailableVersion[k]
@ -3981,6 +3982,7 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data,
loop {
try {
wait(fetchChangeFeedApplier(data, changeFeedInfo, rangeId, range, fetchVersion, existing));
data->fetchingChangeFeeds.insert(rangeId);
return Void();
} catch (Error& e) {
if (e.code() != error_code_change_feed_not_registered) {
@ -5457,7 +5459,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
break;
}
std::set<Key> modifiedChangeFeeds;
std::set<Key> modifiedChangeFeeds = data->fetchingChangeFeeds;
data->fetchingChangeFeeds.clear();
while (!data->changeFeedVersions.empty() && data->changeFeedVersions.front().second <= newOldestVersion) {
modifiedChangeFeeds.insert(data->changeFeedVersions.front().first.begin(),
data->changeFeedVersions.front().first.end());