Several change feed popping fixes for behind SS

This commit is contained in:
Josh Slocum 2022-02-01 10:27:41 -06:00
parent 45f4235f2b
commit 2c22d07b39
1 changed files with 34 additions and 3 deletions

View File

@ -1831,6 +1831,8 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
// wait for next commit
wait(data->durableVersion.whenAtLeast(data->durableVersion.get() + 1));
// TODO it may be safer to always just wait for durableVersion whenAtLeast feedVersion?
// To return control back to updateStorage
wait(delay(0));
}
}
RangeResult res = wait(
@ -3739,7 +3741,7 @@ void applyMutation(StorageServer* self,
void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version version) {
if (m.type == MutationRef::SetValue) {
for (auto& it : self->keyChangeFeed[m.param1]) {
if (!it->stopped) {
if (!it->stopped && version > it->emptyVersion) {
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version, self->knownCommittedVersion));
}
@ -3749,13 +3751,21 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
DEBUG_MUTATION("ChangeFeedWriteSet", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id);
} else {
TEST(it->stopped); // Skip CF write because stopped
TEST(version <= it->emptyVersion); // Skip CF write because popped and SS behind
DEBUG_MUTATION("ChangeFeedIgnoreSet", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Stopped", it->stopped)
.detail("EmptyVersion", it->emptyVersion);
}
}
} else if (m.type == MutationRef::ClearRange) {
auto ranges = self->keyChangeFeed.intersectingRanges(KeyRangeRef(m.param1, m.param2));
for (auto& r : ranges) {
for (auto& it : r.value()) {
if (!it->stopped) {
if (!it->stopped && version > it->emptyVersion) {
if (it->mutations.empty() || it->mutations.back().version != version) {
it->mutations.push_back(MutationsAndVersionRef(version, self->knownCommittedVersion));
}
@ -3764,6 +3774,14 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
DEBUG_MUTATION("ChangeFeedWriteClear", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id);
} else {
TEST(it->stopped); // Skip CF clear because stopped
TEST(version <= it->emptyVersion); // Skip CF clear because popped and SS behind
DEBUG_MUTATION("ChangeFeedIgnoreClear", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id)
.detail("Stopped", it->stopped)
.detail("EmptyVersion", it->emptyVersion);
}
}
}
@ -4056,6 +4074,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
return Void();
}
// TODO handle being popped in the middle by removing after fetch
ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
Reference<ChangeFeedInfo> changeFeedInfo,
Key rangeId,
@ -4063,11 +4082,23 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
Version emptyVersion,
Version fetchVersion,
bool existing) {
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
Version startVersion = emptyVersion + 1;
if (startVersion < 0) {
startVersion = 0;
}
if (startVersion >= fetchVersion) {
TEST(true); // Change Feed popped before fetch
TraceEvent(SevDebug, "FetchChangeFeedNoOp", data->thisServerID)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("StartVersion", startVersion)
.detail("FetchVersion", fetchVersion);
return Void();
}
state Reference<ChangeFeedData> feedResults = makeReference<ChangeFeedData>();
// TODO somwhere this is initialized to -2 instead of -1 but it's fine
state Future<Void> feed =
data->cx->getChangeFeedStream(feedResults, rangeId, startVersion, fetchVersion, range, true);