fixed a variety of bugs with change feeds

This commit is contained in:
Evan Tschannen 2021-10-09 19:24:01 -07:00
parent ac5b580e2d
commit efc4cec53f
2 changed files with 44 additions and 34 deletions

View File

@ -6665,9 +6665,7 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
}
} catch (Error& e) {
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed ||
e.code() == error_code_actor_cancelled) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
results.sendError(e);
@ -6746,12 +6744,12 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
Version end,
KeyRange range) {
state Database cx(db);
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
state Span span("NAPI:GetChangeFeedStream"_loc);
state KeyRange keys;
loop {
state Transaction tr(cx);
loop {
try {
Version readVer = wait(tr.getReadVersion());
@ -6983,14 +6981,23 @@ ACTOR static Future<Void> popChangeFeedBackup(Database cx, StringRef rangeID, Ve
ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, StringRef rangeID, Version version) {
state Database cx(db);
state Transaction tr(cx);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
state Span span("NAPI:PopChangeFeedMutations"_loc);
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
throw unsupported_operation();
state Transaction tr(cx);
state KeyRange keys;
loop {
try {
Optional<Value> val = wait(tr.get(rangeIDKey));
if (!val.present()) {
throw unsupported_operation();
}
keys = std::get<0>(decodeChangeFeedValue(val.get()));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
state KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get()));
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
wait(getKeyRangeLocations(cx,
keys,

View File

@ -1591,43 +1591,44 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
return Void();
}
void filterMutations(Arena& arena, VectorRef<MutationRef>& mutations, KeyRange const& range) {
if (mutations.size() == 1 && mutations.back().param1 == lastEpochEndPrivateKey) {
return;
MutationsAndVersionRef filterMutations(Arena& arena, MutationsAndVersionRef const& m, KeyRange const& range) {
if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) {
return m;
}
Optional<VectorRef<MutationRef>> modifiedMutations;
for (int i = 0; i < mutations.size(); i++) {
if (mutations[i].type == MutationRef::SetValue) {
if (modifiedMutations.present() && range.contains(mutations[i].param1)) {
modifiedMutations.get().push_back(arena, mutations[i]);
for (int i = 0; i < m.mutations.size(); i++) {
if (m.mutations[i].type == MutationRef::SetValue) {
if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) {
modifiedMutations.get().push_back(arena, m.mutations[i]);
}
if (!modifiedMutations.present() && !range.contains(mutations[i].param1)) {
modifiedMutations = mutations;
if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) {
modifiedMutations = m.mutations;
modifiedMutations.get().resize(arena, i);
arena.dependsOn(range.arena());
}
} else {
ASSERT(mutations[i].type == MutationRef::ClearRange);
ASSERT(m.mutations[i].type == MutationRef::ClearRange);
if (!modifiedMutations.present() &&
(mutations[i].param1 < range.begin || mutations[i].param2 > range.end)) {
modifiedMutations = mutations;
(m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) {
modifiedMutations = m.mutations;
modifiedMutations.get().resize(arena, i);
arena.dependsOn(range.arena());
}
if (modifiedMutations.present()) {
if (mutations[i].param1 < range.end && range.begin < mutations[i].param2) {
if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) {
modifiedMutations.get().push_back(arena,
MutationRef(MutationRef::ClearRange,
std::max(range.begin, mutations[i].param1),
std::min(range.end, mutations[i].param2)));
std::max(range.begin, m.mutations[i].param1),
std::min(range.end, m.mutations[i].param2)));
}
}
}
}
if (modifiedMutations.present()) {
mutations = modifiedMutations.get();
return MutationsAndVersionRef(modifiedMutations.get(), m.version);
}
return m;
}
ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, ChangeFeedRequest req) {
@ -1646,6 +1647,7 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
if (feed == data->uidChangeFeed.end()) {
throw unknown_change_feed();
}
state Version dequeVersion = data->version.get();
if (req.end <= feed->second->emptyVersion + 1) {
} else if (feed->second->durableVersion == invalidVersion || req.begin > feed->second->durableVersion) {
for (auto it : feed->second->mutations) {
@ -1654,14 +1656,15 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
}
if (it.version >= req.begin) {
reply.arena.dependsOn(it.arena());
filterMutations(reply.arena, it.mutations, req.range);
reply.mutations.push_back(reply.arena, it);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize();
auto m = filterMutations(reply.arena, it, req.range);
reply.mutations.push_back(reply.arena, m);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
}
}
} else {
state std::deque<Standalone<MutationsAndVersionRef>> mutationsDeque = feed->second->mutations;
state Version startingDurableVersion = feed->second->durableVersion;
RangeResult res = wait(data->storage.readRange(
KeyRangeRef(changeFeedDurableKey(req.rangeID, req.begin), changeFeedDurableKey(req.rangeID, req.end)),
1 << 30,
@ -1679,8 +1682,8 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
std::tie(id, version) = decodeChangeFeedDurableKey(kv.key);
auto mutations = decodeChangeFeedDurableValue(kv.value);
reply.arena.dependsOn(mutations.arena());
filterMutations(reply.arena, mutations, req.range);
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(mutations, version));
auto m = filterMutations(reply.arena, MutationsAndVersionRef(mutations, version), req.range);
reply.mutations.push_back(reply.arena, m);
remainingLimitBytes -=
sizeof(KeyValueRef) +
kv.expectedSize(); // FIXME: this is currently tracking the size on disk rather than the reply size
@ -1696,9 +1699,9 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
isEmpty = false;
}
reply.arena.dependsOn(it.arena());
filterMutations(reply.arena, it.mutations, req.range);
reply.mutations.push_back(reply.arena, it);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize();
auto m = filterMutations(reply.arena, it, req.range);
reply.mutations.push_back(reply.arena, m);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
}
}
if (isEmpty) {
@ -1729,7 +1732,7 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change
}
}
}
Version finalVersion = std::min(req.end - 1, data->version.get());
Version finalVersion = std::min(req.end - 1, dequeVersion);
if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0) {
reply.mutations.push_back(reply.arena, MutationsAndVersionRef(finalVersion));
}