Fixing issue with change_feed_popped

This commit is contained in:
Josh Slocum 2022-01-31 17:19:52 -06:00
parent d4931348c1
commit 25a0d857fa
1 changed files with 43 additions and 15 deletions

View File

@ -1717,7 +1717,9 @@ MutationsAndVersionRef filterMutations(Arena& arena,
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
ChangeFeedStreamRequest req,
bool inverted) {
bool inverted,
UID streamUID
/*TODO REMOVE*/) {
state ChangeFeedStreamReply reply;
state ChangeFeedStreamReply memoryReply;
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
@ -1725,9 +1727,10 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Version startVersion = data->version.get();
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: GCFM [%s - %s) %lld - %lld\n",
printf("CFM: SS %s CF %s: GCFM %s [%s - %s) %lld - %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
@ -1760,7 +1763,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Reference<ChangeFeedInfo> feedInfo = feed->second;
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got version %lld >= %lld\n",
printf("CFM: SS %s CF %s: got version %lld >= %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
data->version.get(),
@ -1768,6 +1771,12 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
if (!req.canReadPopped && req.begin <= feedInfo->emptyVersion) {
printf("CFM: SS %s CF %s: %s popped! req.begin=%lld, emptyVersion=%lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.begin,
feedInfo->emptyVersion);
throw change_feed_popped();
}
@ -1778,7 +1787,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
state Version emptyVersion = feedInfo->emptyVersion;
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: dequeVersion=%lld, emptyVersion=%lld, storageVersion=%lld, durableVersion=%lld, "
printf("CFM: SS %s CF %s: dequeVersion=%lld, emptyVersion=%lld, storageVersion=%lld, durableVersion=%lld, "
"fetchVersion=%lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
@ -1802,7 +1811,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from memory\n",
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from memory\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
memoryReply.mutations.empty() ? invalidVersion : memoryReply.mutations.front().version,
@ -1852,7 +1861,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
lastVersion = version;
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from disk\n",
printf("CFM: SS %s CF %s: got %lld - %lld (%d) from disk\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
reply.mutations.empty() ? invalidVersion : reply.mutations.front().version,
@ -1875,6 +1884,12 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
// check if pop happened concurrently with read
if (!req.canReadPopped && req.begin <= feedInfo->emptyVersion) {
printf("SS %s: CF %s popped after read! req.begin=%lld, emptyVersion=%lld, emptyBeforeRead=%lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
req.begin,
feedInfo->emptyVersion,
emptyVersion);
throw change_feed_popped();
}
@ -1883,7 +1898,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0 &&
remainingDurableBytes > 0) {
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: adding empty %lld\n",
printf("CFM: SS %s CF %s: adding empty %lld\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
finalVersion);
@ -1909,7 +1924,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: result %lld - %lld (%d)\n",
printf("CFM: SS %s CF %s: result %lld - %lld (%d)\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
reply.mutations.empty() ? invalidVersion : reply.mutations.front().version,
@ -1945,7 +1960,7 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
feedRequest.end = end;
feedRequest.range = range;
state std::pair<ChangeFeedStreamReply, bool> feedReply =
wait(getChangeFeedMutations(data, feedRequest, true));
wait(getChangeFeedMutations(data, feedRequest, true, UID()));
begin = feedReply.first.mutations.back().version + 1;
state int resultLoc = 0;
while (resultLoc < feedReply.first.mutations.size()) {
@ -1995,9 +2010,10 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
wait(delay(0, TaskPriority::DefaultEndpoint));
if (DEBUG_SS_CFM(data->thisServerID, req.rangeID, req.begin)) {
printf("CFM: SS %s CF %s: got CFSQ [%s - %s) %lld - %lld, crp=%s\n",
printf("CFM: SS %s CF %s: got CFSQ %s [%s - %s) %lld - %lld, crp=%s\n",
data->thisServerID.toString().substr(0, 4).c_str(),
req.rangeID.printable().substr(0, 6).c_str(),
streamUID.toString().substr(0, 8).c_str(),
req.range.begin.printable().c_str(),
req.range.end.printable().c_str(),
req.begin,
@ -2015,7 +2031,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
}
wait(onReady);
state Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
getChangeFeedMutations(data, req, false);
getChangeFeedMutations(data, req, false, streamUID);
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][streamUID] =
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
@ -2060,10 +2076,9 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
req.reply.sendError(unknown_change_feed());
return Void();
}
state Version emptyBefore = feed->second->emptyVersion;
choose {
when(wait(feed->second->newMutations.onTrigger())) {
} // FIXME: check that this is triggered when the range is moved to a different
// server, also check that the stream is closed
when(wait(feed->second->newMutations.onTrigger())) {}
when(wait(req.end == std::numeric_limits<Version>::max() ? Future<Void>(Never())
: data->version.whenAtLeast(req.end))) {}
}
@ -2072,11 +2087,21 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
req.reply.sendError(unknown_change_feed());
return Void();
}
if (emptyBefore != feed->second->emptyVersion && !req.canReadPopped &&
req.begin <= feed->second->emptyVersion) {
// Change feed was popped with no new mutations, update its begin version to skip those versions so
// it doesn't get change_feed_popped. This is safe because change_feed_popped is to ensure an old
// read can't miss mutations from a change feed stream, and this read is guaranteed not to (it was
// caught up before the pop, and trigger wasn't called on any new mutations before the pop)
req.begin = feed->second->emptyVersion + 1;
}
} else {
blockedVersion = feedReply.mutations.back().version;
}
}
} catch (Error& e) {
// TODO REMOVE
printf("CFSQ %s got error %s\n", streamUID.toString().substr(0, 8).c_str(), e.name());
auto it = data->changeFeedClientVersions.find(req.reply.getEndpoint().getPrimaryAddress());
if (it != data->changeFeedClientVersions.end()) {
if (removeUID) {
@ -4023,6 +4048,8 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
feed->second->durableVersion = invalidVersion;
}
}
// wake up requests that didn't get any mutations since the last pop to update their beginVersion
feed->second->newMutations.trigger();
wait(self->durableVersion.whenAtLeast(durableVersion));
}
req.reply.send(Void());
@ -4037,7 +4064,6 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
Version fetchVersion,
bool existing) {
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
printf("DBG: fetching %lld %lld %lld\n", emptyVersion, emptyVersion + 1, fetchVersion);
Version startVersion = emptyVersion + 1;
if (startVersion < 0) {
startVersion = 0;
@ -5250,6 +5276,8 @@ private:
addMutationToLog = true;
}
feed->second->stopped = (status == ChangeFeedStatus::CHANGE_FEED_STOP);
// wake up requests that didn't get any mutations since the last pop to update their beginVersion
feed->second->newMutations.trigger();
} else if (status == ChangeFeedStatus::CHANGE_FEED_CREATE) {
TraceEvent(SevDebug, "CreatingChangeFeed", data->thisServerID)
.detail("RangeID", changeFeedId.printable())