do not return send a reply to a pop request until the clear has been made durable

This commit is contained in:
Evan Tschannen 2021-08-09 22:06:53 -07:00
parent 208a5790ad
commit a1b0053b57
1 changed files with 28 additions and 20 deletions

View File

@ -1511,6 +1511,30 @@ ACTOR Future<Void> watchValueSendReply(StorageServer* data,
}
}
ACTOR Future<Void> rangeFeedPopQ(StorageServer* self, RangeFeedPopRequest req) {
auto& feed = self->uidRangeFeed[req.rangeID];
if (req.version - 1 > feed->emptyVersion) {
feed->emptyVersion = req.version - 1;
while (!feed->mutations.empty() && feed->mutations.front().version < req.version) {
self->uidRangeFeed[req.rangeID]->mutations.pop_front();
}
if (feed->storageVersion != invalidVersion) {
self->storage.clearRange(
KeyRangeRef(rangeFeedDurableKey(feed->id, 0), rangeFeedDurableKey(feed->id, req.version)));
if (req.version > feed->storageVersion) {
feed->storageVersion = invalidVersion;
feed->durableVersion = invalidVersion;
}
wait(self->durableVersion.whenAtLeast(self->storageVersion() + 1));
}
}
TraceEvent("RangeFeedPopQuery", self->thisServerID)
.detail("RangeID", req.rangeID.printable())
.detail("Version", req.version);
req.reply.send(Void());
return Void();
}
ACTOR Future<Void> overlappingRangeFeedsQ(StorageServer* data, OverlappingRangeFeedsRequest req) {
wait(delay(0));
auto ranges = data->keyRangeFeed.intersectingRanges(req.range);
@ -4359,7 +4383,9 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
while (info->mutations.front().version < newOldestVersion) {
info->mutations.pop_front();
}
info->durableVersion = info->mutations.front().version;
if (info->storageVersion != invalidVersion) {
info->durableVersion = info->mutations.front().version;
}
wait(yield(TaskPriority::UpdateStorage));
curFeed++;
}
@ -5284,25 +5310,7 @@ ACTOR Future<Void> serveOverlappingRangeFeedsRequests(
ACTOR Future<Void> serveRangeFeedPopRequests(StorageServer* self, FutureStream<RangeFeedPopRequest> rangeFeedPops) {
loop {
RangeFeedPopRequest req = waitNext(rangeFeedPops);
auto& feed = self->uidRangeFeed[req.rangeID];
if (req.version - 1 > feed->emptyVersion) {
feed->emptyVersion = req.version - 1;
while (!feed->mutations.empty() && feed->mutations.front().version < req.version) {
self->uidRangeFeed[req.rangeID]->mutations.pop_front();
}
if (feed->storageVersion != invalidVersion) {
self->storage.clearRange(
KeyRangeRef(rangeFeedDurableKey(feed->id, 0), rangeFeedDurableKey(feed->id, req.version)));
if (req.version > feed->storageVersion) {
feed->storageVersion = invalidVersion;
feed->durableVersion = invalidVersion;
}
}
}
TraceEvent("RangeFeedPopQuery", self->thisServerID)
.detail("RangeID", req.rangeID.printable())
.detail("Version", req.version);
req.reply.send(Void());
self->actors.add(self->readGuard(req, rangeFeedPopQ));
}
}