added support for removing range feeds

This commit is contained in:
Evan Tschannen 2021-07-30 16:01:46 -07:00
parent 0989c28a6b
commit 125241743d
1 changed files with 33 additions and 19 deletions

View File

@ -3599,26 +3599,40 @@ private:
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2));
} else if (m.type == MutationRef::SetValue && m.param1.startsWith(rangeFeedPrivatePrefix)) {
Key rangeFeedId = m.param1.removePrefix(rangeFeedPrivatePrefix);
KeyRange rangeFeedRange = decodeRangeFeedValue(m.param2);
TraceEvent("AddingRangeFeed", data->thisServerID)
.detail("RangeID", rangeFeedId.printable())
.detail("Range", rangeFeedRange.toString());
Reference<RangeFeedInfo> rangeFeedInfo(new RangeFeedInfo());
rangeFeedInfo->range = rangeFeedRange;
rangeFeedInfo->id = rangeFeedId;
data->uidRangeFeed[rangeFeedId] = rangeFeedInfo;
auto rs = data->keyRangeFeed.modify(rangeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(rangeFeedInfo);
} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) &&
m.param1.startsWith(rangeFeedPrivatePrefix)) {
if (m.type == MutationRef::SetValue) {
Key rangeFeedId = m.param1.removePrefix(rangeFeedPrivatePrefix);
KeyRange rangeFeedRange = decodeRangeFeedValue(m.param2);
TraceEvent("AddingRangeFeed", data->thisServerID)
.detail("RangeID", rangeFeedId.printable())
.detail("Range", rangeFeedRange.toString());
Reference<RangeFeedInfo> rangeFeedInfo(new RangeFeedInfo());
rangeFeedInfo->range = rangeFeedRange;
rangeFeedInfo->id = rangeFeedId;
data->uidRangeFeed[rangeFeedId] = rangeFeedInfo;
auto rs = data->keyRangeFeed.modify(rangeFeedRange);
for (auto r = rs.begin(); r != rs.end(); ++r) {
r->value().push_back(rangeFeedInfo);
}
data->keyRangeFeed.coalesce(rangeFeedRange.contents());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistRangeFeedKeys.begin.toString() + rangeFeedId.toString(),
m.param2));
} else {
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
auto beginFeed = m.param1.removePrefix(rangeFeedPrivatePrefix);
auto endFeed = m.param2.removePrefix(rangeFeedPrivatePrefix);
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
persistRangeFeedKeys.begin.toString() + beginFeed.toString(),
persistRangeFeedKeys.begin.toString() + endFeed.toString()));
data->uidRangeFeed.erase(data->uidRangeFeed.lower_bound(beginFeed),
data->uidRangeFeed.lower_bound(endFeed));
}
data->keyRangeFeed.coalesce(rangeFeedRange.contents());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::SetValue,
persistRangeFeedKeys.begin.toString() + rangeFeedId.toString(),
m.param2));
} else if (m.param1.substr(1).startsWith(tssMappingKeys.begin) &&
(m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange)) {
if (!data->isTss()) {