Also re-persisting change feed metadata on fetch after cleanup
This commit is contained in:
parent
a7a3dccba5
commit
f9f676abf9
|
@ -4331,6 +4331,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||
TraceEvent(SevDebug, "ChangeFeedPopQuery", self->thisServerID)
|
||||
.detail("RangeID", req.rangeID.printable())
|
||||
.detail("Version", req.version)
|
||||
.detail("SSVersion", self->version.get())
|
||||
.detail("Range", req.range.toString());
|
||||
|
||||
if (req.version - 1 > feed->second->emptyVersion) {
|
||||
|
@ -4662,6 +4663,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
|
|||
auto changeFeedInfo = existingEntry->second;
|
||||
auto feedCleanup = data->changeFeedCleanupDurable.find(cfEntry.rangeId);
|
||||
|
||||
bool writeToMutationLog = false;
|
||||
if (feedCleanup != data->changeFeedCleanupDurable.end() && changeFeedInfo->removing) {
|
||||
TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing
|
||||
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
|
||||
|
@ -4678,9 +4680,19 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data, KeyR
|
|||
.detail("EmptyVersion", cfEntry.emptyVersion)
|
||||
.detail("CleanupVersion", feedCleanup->second)
|
||||
.detail("Stopped", cfEntry.stopped);
|
||||
|
||||
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
|
||||
// it
|
||||
// We may just want to refactor this so updateStorage does explicit deletes based on
|
||||
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
|
||||
// Then we wouldn't have to reset anything here
|
||||
writeToMutationLog = true;
|
||||
} else if (changeFeedInfo->emptyVersion < cfEntry.emptyVersion) {
|
||||
TEST(true); // Got updated CF emptyVersion from a parallel fetchChangeFeedMetadata
|
||||
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
|
||||
writeToMutationLog = true;
|
||||
}
|
||||
if (writeToMutationLog) {
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
|
|
Loading…
Reference in New Issue