Change Feed Reading CPU optimization

This commit is contained in:
Josh Slocum 2022-06-17 17:36:15 -05:00
parent 48bcf6eaba
commit f943efb3b1
1 changed files with 62 additions and 10 deletions

View File

@ -24,6 +24,7 @@
#include <unordered_map>
#include "fmt/format.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
@ -2172,6 +2173,58 @@ MutationsAndVersionRef filterMutations(Arena& arena,
DEBUG_CF_MISSING_CF&& keyRange.contains(DEBUG_CF_MISSING_KEY) && \
beginVersion <= DEBUG_CF_MISSING_VERSION&& lastVersion >= DEBUG_CF_MISSING_VERSION
// efficiently searches for the change feed mutation start point at begin version
static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChangeFeedStart(
std::deque<Standalone<MutationsAndVersionRef>> const& mutations,
Version beginVersion,
bool atLatest) {
if (mutations.empty() || beginVersion > mutations.back().version) {
return mutations.end();
} else if (beginVersion <= mutations.front().version) {
return mutations.begin();
}
MutationsAndVersionRef searchKey;
searchKey.version = beginVersion;
if (atLatest) {
int jump = 1;
// exponential search backwards, because atLatest means the new mutations are likely only at the very end
auto lastEnd = mutations.end();
auto currentEnd = mutations.end() - 1;
while (currentEnd > mutations.begin()) {
if (beginVersion >= currentEnd->version) {
break;
}
lastEnd = currentEnd + 1;
currentEnd -= jump;
jump <<= 1;
}
if (currentEnd < mutations.begin()) {
currentEnd = mutations.begin();
}
auto ret = std::lower_bound(currentEnd, lastEnd, searchKey, MutationsAndVersionRef::OrderByVersion());
// TODO REMOVE: for validation
if (ret != mutations.end()) {
if (ret->version < beginVersion) {
fmt::print("ERROR: {0}) {1} < {2}\n", ret - mutations.begin(), ret->version, beginVersion);
}
ASSERT(ret->version >= beginVersion);
}
if (ret != mutations.begin()) {
if ((ret - 1)->version >= beginVersion) {
fmt::print("ERROR: {0}) {1} >= {2}\n", (ret - mutations.begin()) - 1, (ret - 1)->version, beginVersion);
}
ASSERT((ret - 1)->version < beginVersion);
}
return ret;
} else {
// binary search
return std::lower_bound(
mutations.begin(), mutations.end(), searchKey, MutationsAndVersionRef::OrderByVersion());
}
}
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
ChangeFeedStreamRequest req,
bool inverted,
@ -2237,19 +2290,18 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
if (req.end > emptyVersion + 1) {
// FIXME: do exponential backwards search from end to find beginVersion if atLatest to reduce cpu
for (auto& it : feedInfo->mutations) {
if (it.version >= req.end || it.version > dequeVersion || remainingLimitBytes <= 0) {
auto it = searchChangeFeedStart(feedInfo->mutations, req.begin, atLatest);
while (it != feedInfo->mutations.end()) {
if (it->version >= req.end || it->version > dequeVersion || remainingLimitBytes <= 0) {
break;
}
if (it.version >= req.begin) {
auto m = filterMutations(memoryReply.arena, it, req.range, inverted);
if (m.mutations.size()) {
memoryReply.arena.dependsOn(it.arena());
memoryReply.mutations.push_back(memoryReply.arena, m);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
}
auto m = filterMutations(memoryReply.arena, *it, req.range, inverted);
if (m.mutations.size()) {
memoryReply.arena.dependsOn(it->arena());
memoryReply.mutations.push_back(memoryReply.arena, m);
remainingLimitBytes -= sizeof(MutationsAndVersionRef) + m.expectedSize();
}
it++;
}
}