diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8466a8f906..34afdc7b5b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6665,9 +6665,7 @@ ACTOR Future singleChangeFeedStream(StorageServerInterface interf, } } } catch (Error& e) { - if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || - e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed || - e.code() == error_code_actor_cancelled) { + if (e.code() == error_code_actor_cancelled) { throw; } results.sendError(e); @@ -6746,12 +6744,12 @@ ACTOR Future getChangeFeedStreamActor(Reference db, Version end, KeyRange range) { state Database cx(db); - state Transaction tr(cx); state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); state Span span("NAPI:GetChangeFeedStream"_loc); state KeyRange keys; loop { + state Transaction tr(cx); loop { try { Version readVer = wait(tr.getReadVersion()); @@ -6983,14 +6981,23 @@ ACTOR static Future popChangeFeedBackup(Database cx, StringRef rangeID, Ve ACTOR Future popChangeFeedMutationsActor(Reference db, StringRef rangeID, Version version) { state Database cx(db); - state Transaction tr(cx); state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix); state Span span("NAPI:PopChangeFeedMutations"_loc); - Optional val = wait(tr.get(rangeIDKey)); - if (!val.present()) { - throw unsupported_operation(); + + state Transaction tr(cx); + state KeyRange keys; + loop { + try { + Optional val = wait(tr.get(rangeIDKey)); + if (!val.present()) { + throw unsupported_operation(); + } + keys = std::get<0>(decodeChangeFeedValue(val.get())); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } } - state KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get())); state vector>> locations = wait(getKeyRangeLocations(cx, keys, diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index b9ce4214b4..c47d794252 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1591,43 +1591,44 @@ ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChang return Void(); } -void filterMutations(Arena& arena, VectorRef& mutations, KeyRange const& range) { - if (mutations.size() == 1 && mutations.back().param1 == lastEpochEndPrivateKey) { - return; +MutationsAndVersionRef filterMutations(Arena& arena, MutationsAndVersionRef const& m, KeyRange const& range) { + if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) { + return m; } Optional> modifiedMutations; - for (int i = 0; i < mutations.size(); i++) { - if (mutations[i].type == MutationRef::SetValue) { - if (modifiedMutations.present() && range.contains(mutations[i].param1)) { - modifiedMutations.get().push_back(arena, mutations[i]); + for (int i = 0; i < m.mutations.size(); i++) { + if (m.mutations[i].type == MutationRef::SetValue) { + if (modifiedMutations.present() && range.contains(m.mutations[i].param1)) { + modifiedMutations.get().push_back(arena, m.mutations[i]); } - if (!modifiedMutations.present() && !range.contains(mutations[i].param1)) { - modifiedMutations = mutations; + if (!modifiedMutations.present() && !range.contains(m.mutations[i].param1)) { + modifiedMutations = m.mutations; modifiedMutations.get().resize(arena, i); arena.dependsOn(range.arena()); } } else { - ASSERT(mutations[i].type == MutationRef::ClearRange); + ASSERT(m.mutations[i].type == MutationRef::ClearRange); if (!modifiedMutations.present() && - (mutations[i].param1 < range.begin || mutations[i].param2 > range.end)) { - modifiedMutations = mutations; + (m.mutations[i].param1 < range.begin || m.mutations[i].param2 > range.end)) { + modifiedMutations = m.mutations; modifiedMutations.get().resize(arena, i); arena.dependsOn(range.arena()); } if (modifiedMutations.present()) { - if (mutations[i].param1 < range.end && range.begin < mutations[i].param2) { + if (m.mutations[i].param1 < range.end && range.begin < m.mutations[i].param2) { modifiedMutations.get().push_back(arena, MutationRef(MutationRef::ClearRange, - std::max(range.begin, mutations[i].param1), - std::min(range.end, mutations[i].param2))); + std::max(range.begin, m.mutations[i].param1), + std::min(range.end, m.mutations[i].param2))); } } } } if (modifiedMutations.present()) { - mutations = modifiedMutations.get(); + return MutationsAndVersionRef(modifiedMutations.get(), m.version); } + return m; } ACTOR Future getChangeFeedMutations(StorageServer* data, ChangeFeedRequest req) { @@ -1646,6 +1647,7 @@ ACTOR Future getChangeFeedMutations(StorageServer* data, Change if (feed == data->uidChangeFeed.end()) { throw unknown_change_feed(); } + state Version dequeVersion = data->version.get(); if (req.end <= feed->second->emptyVersion + 1) { } else if (feed->second->durableVersion == invalidVersion || req.begin > feed->second->durableVersion) { for (auto it : feed->second->mutations) { @@ -1654,14 +1656,15 @@ ACTOR Future getChangeFeedMutations(StorageServer* data, Change } if (it.version >= req.begin) { reply.arena.dependsOn(it.arena()); - filterMutations(reply.arena, it.mutations, req.range); - reply.mutations.push_back(reply.arena, it); - remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize(); + auto m = filterMutations(reply.arena, it, req.range); + reply.mutations.push_back(reply.arena, m); + remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize(); } } } else { state std::deque> mutationsDeque = feed->second->mutations; state Version startingDurableVersion = feed->second->durableVersion; + RangeResult res = wait(data->storage.readRange( KeyRangeRef(changeFeedDurableKey(req.rangeID, req.begin), changeFeedDurableKey(req.rangeID, req.end)), 1 << 30, @@ -1679,8 +1682,8 @@ ACTOR Future getChangeFeedMutations(StorageServer* data, Change std::tie(id, version) = decodeChangeFeedDurableKey(kv.key); auto mutations = decodeChangeFeedDurableValue(kv.value); reply.arena.dependsOn(mutations.arena()); - filterMutations(reply.arena, mutations, req.range); - reply.mutations.push_back(reply.arena, MutationsAndVersionRef(mutations, version)); + auto m = filterMutations(reply.arena, MutationsAndVersionRef(mutations, version), req.range); + reply.mutations.push_back(reply.arena, m); remainingLimitBytes -= sizeof(KeyValueRef) + kv.expectedSize(); // FIXME: this is currently tracking the size on disk rather than the reply size @@ -1696,9 +1699,9 @@ ACTOR Future getChangeFeedMutations(StorageServer* data, Change isEmpty = false; } reply.arena.dependsOn(it.arena()); - filterMutations(reply.arena, it.mutations, req.range); - reply.mutations.push_back(reply.arena, it); - remainingLimitBytes -= sizeof(MutationsAndVersionRef) + it.expectedSize(); + auto m = filterMutations(reply.arena, it, req.range); + reply.mutations.push_back(reply.arena, m); + remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize(); } } if (isEmpty) { @@ -1729,7 +1732,7 @@ ACTOR Future getChangeFeedMutations(StorageServer* data, Change } } } - Version finalVersion = std::min(req.end - 1, data->version.get()); + Version finalVersion = std::min(req.end - 1, dequeVersion); if ((reply.mutations.empty() || reply.mutations.back().version < finalVersion) && remainingLimitBytes > 0) { reply.mutations.push_back(reply.arena, MutationsAndVersionRef(finalVersion)); }