Fixing another change feed fetch race by making refreshing idempotent
This commit is contained in:
parent
058f44a4ae
commit
094999439d
|
@ -445,6 +445,7 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
|
|||
bool removing = false;
|
||||
bool destroyed = false;
|
||||
bool possiblyDestroyed = false;
|
||||
bool refreshInProgress = false;
|
||||
|
||||
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
|
||||
|
||||
|
@ -481,6 +482,7 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
|
|||
void destroy(Version destroyVersion) {
|
||||
removing = true;
|
||||
destroyed = true;
|
||||
refreshInProgress = false;
|
||||
moved(range);
|
||||
newMutations.trigger();
|
||||
}
|
||||
|
@ -5499,8 +5501,12 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
cfInfo->removing = false;
|
||||
// because we now have a gap in the metadata, it's possible this feed was destroyed
|
||||
cfInfo->possiblyDestroyed = true;
|
||||
// Set refreshInProgress, so that if this actor is replaced by an expanded move actor, the new actor
|
||||
// picks up the refresh
|
||||
cfInfo->refreshInProgress = true;
|
||||
// reset fetch versions because everything previously fetched was cleaned up
|
||||
cfInfo->fetchVersion = invalidVersion;
|
||||
|
||||
cfInfo->durableFetchVersion = NotifiedVersion();
|
||||
|
||||
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
|
||||
|
@ -5510,6 +5516,9 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
.detail("EmptyVersion", cfInfo->emptyVersion)
|
||||
.detail("StopVersion", cfInfo->stopVersion)
|
||||
.detail("FKID", fetchKeysID);
|
||||
} else if (cfInfo->refreshInProgress) {
|
||||
TEST(true); // Racing refreshes for same change feed in fetch
|
||||
destroyedFeedIds.insert(cfInfo->id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5618,7 +5627,8 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
TEST(!destroyedFeedIds.empty()); // Feed destroyed between move away and move back
|
||||
for (auto& feedId : refreshedFeedIds) {
|
||||
auto existingEntry = data->uidChangeFeed.find(feedId);
|
||||
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
|
||||
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed ||
|
||||
!existingEntry->second->refreshInProgress) {
|
||||
TEST(true); // feed refreshed
|
||||
continue;
|
||||
}
|
||||
|
@ -5647,6 +5657,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
|
|||
.detail("StopVersion", existingEntry->second->stopVersion)
|
||||
.detail("FKID", fetchKeysID)
|
||||
.detail("MetadataVersion", metadataVersion);
|
||||
existingEntry->second->refreshInProgress = false;
|
||||
}
|
||||
for (auto& feedId : destroyedFeedIds) {
|
||||
auto existingEntry = data->uidChangeFeed.find(feedId);
|
||||
|
@ -6468,6 +6479,7 @@ void changeServerKeys(StorageServer* data,
|
|||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->removing = true;
|
||||
feed->second->refreshInProgress = false;
|
||||
feed->second->moved(feed->second->range);
|
||||
feed->second->newMutations.trigger();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue