Add cleanUpChangeFeeds(). (#7598)
Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
parent
4cbe5dd683
commit
4dc742493e
|
@ -6384,6 +6384,65 @@ void ShardInfo::addMutation(Version version, bool fromFetch, MutationRef const&
|
|||
enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE, CSK_ASSIGN_EMPTY };
|
||||
const char* changeServerKeysContextName[] = { "Update", "Restore" };
|
||||
|
||||
void cleanUpChangeFeeds(StorageServer* data, const KeyRangeRef& keys, Version version) {
|
||||
std::map<Key, KeyRange> candidateFeeds;
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
|
||||
for (auto r : ranges) {
|
||||
for (auto feed : r.value()) {
|
||||
candidateFeeds[feed->id] = feed->range;
|
||||
}
|
||||
}
|
||||
for (auto f : candidateFeeds) {
|
||||
bool foundAssigned = false;
|
||||
auto shards = data->shards.intersectingRanges(f.second);
|
||||
for (auto shard : shards) {
|
||||
if (shard->value()->assigned()) {
|
||||
foundAssigned = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foundAssigned) {
|
||||
Version durableVersion = data->data().getLatestVersion();
|
||||
TraceEvent(SevDebug, "ChangeFeedCleanup", data->thisServerID)
|
||||
.detail("FeedID", f.first)
|
||||
.detail("Version", version)
|
||||
.detail("DurableVersion", durableVersion);
|
||||
|
||||
data->changeFeedCleanupDurable[f.first] = durableVersion;
|
||||
|
||||
Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin);
|
||||
auto& mLV = data->addVersionToMutationLog(durableVersion);
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
++data->counters.kvSystemClearRanges;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(f.first, 0),
|
||||
changeFeedDurableKey(f.first, version)));
|
||||
|
||||
// We can't actually remove this change feed fully until the mutations clearing its data become durable.
|
||||
// If the SS restarted at version R before the clearing mutations became durable at version D (R < D),
|
||||
// then the restarted SS would restore the change feed clients would be able to read data and would miss
|
||||
// mutations from versions [R, D), up until we got the private mutation triggering the cleanup again.
|
||||
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->removing = true;
|
||||
feed->second->refreshInProgress = false;
|
||||
feed->second->moved(feed->second->range);
|
||||
feed->second->newMutations.trigger();
|
||||
}
|
||||
} else {
|
||||
// if just part of feed's range is moved away
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->moved(keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void changeServerKeys(StorageServer* data,
|
||||
const KeyRangeRef& keys,
|
||||
bool nowAssigned,
|
||||
|
@ -6528,62 +6587,7 @@ void changeServerKeys(StorageServer* data,
|
|||
|
||||
// find any change feeds that no longer have shards on this server, and clean them up
|
||||
if (!nowAssigned) {
|
||||
std::map<Key, KeyRange> candidateFeeds;
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
|
||||
for (auto r : ranges) {
|
||||
for (auto feed : r.value()) {
|
||||
candidateFeeds[feed->id] = feed->range;
|
||||
}
|
||||
}
|
||||
for (auto f : candidateFeeds) {
|
||||
bool foundAssigned = false;
|
||||
auto shards = data->shards.intersectingRanges(f.second);
|
||||
for (auto shard : shards) {
|
||||
if (shard->value()->assigned()) {
|
||||
foundAssigned = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foundAssigned) {
|
||||
Version durableVersion = data->data().getLatestVersion();
|
||||
TraceEvent(SevDebug, "ChangeFeedCleanup", data->thisServerID)
|
||||
.detail("FeedID", f.first)
|
||||
.detail("Version", version)
|
||||
.detail("DurableVersion", durableVersion);
|
||||
|
||||
data->changeFeedCleanupDurable[f.first] = durableVersion;
|
||||
|
||||
Key beginClearKey = f.first.withPrefix(persistChangeFeedKeys.begin);
|
||||
auto& mLV = data->addVersionToMutationLog(durableVersion);
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||
++data->counters.kvSystemClearRanges;
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::ClearRange,
|
||||
changeFeedDurableKey(f.first, 0),
|
||||
changeFeedDurableKey(f.first, version)));
|
||||
|
||||
// We can't actually remove this change feed fully until the mutations clearing its data become durable.
|
||||
// If the SS restarted at version R before the clearing mutations became durable at version D (R < D),
|
||||
// then the restarted SS would restore the change feed clients would be able to read data and would miss
|
||||
// mutations from versions [R, D), up until we got the private mutation triggering the cleanup again.
|
||||
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->removing = true;
|
||||
feed->second->refreshInProgress = false;
|
||||
feed->second->moved(feed->second->range);
|
||||
feed->second->newMutations.trigger();
|
||||
}
|
||||
} else {
|
||||
// if just part of feed's range is moved away
|
||||
auto feed = data->uidChangeFeed.find(f.first);
|
||||
if (feed != data->uidChangeFeed.end()) {
|
||||
feed->second->moved(keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanUpChangeFeeds(data, keys, version);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue