Added FeedDiskReadState to make change feeds reading a large amount of disk data more efficient (#8830)
* Added FeedDiskReadState to make change feeds reading a large amount of disk data more efficient * documentation for FeedDiskReadState
This commit is contained in:
parent
fe06114bcd
commit
90b1ad9941
|
@ -2751,10 +2751,35 @@ static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChan
|
|||
}
|
||||
}
|
||||
|
||||
// The normal read case for a change feed stream query is that it will first read the disk portion, which is at a lower
|
||||
// version than the memory portion, and then will effectively switch to reading only the memory portion. The complexity
|
||||
// lies in the fact that the feed does not know the switchover point ahead of time before reading from disk, and the
|
||||
// switchover point is constantly changing as the SS persists the in-memory data to disk. As a result, the
|
||||
// implementation first reads from memory, then reads from disk if necessary, then merges the result and potentially
|
||||
// discards the in-memory read data if the disk data is large and behind the in-memory data. The goal of
|
||||
// FeedDiskReadState is that we want to skip doing the full memory read if we still have a lot of disk reads to catch up
|
||||
// on. In the DISK_CATCHUP phase, the feed query will read only the first row from memory, to
|
||||
// determine if it's hit the switchover point, instead of reading (potentially) both in the normal phase. We also want
|
||||
// to default to the normal behavior at the start in case there is not a lot of disk data. This guarantees that if we
|
||||
// somehow incorrectly went into DISK_CATCHUP when there wasn't much more data on disk, we only have one cycle of
|
||||
// getChangeFeedMutations in the incorrect mode that returns a smaller result before switching to NORMAL mode.
|
||||
//
|
||||
// Put another way, the state transitions are:
|
||||
//
|
||||
// STARTING ->
|
||||
// DISK_CATCHUP (if after the first read, there is more disk data to read before the first memory data)
|
||||
// NORMAL (otherwise)
|
||||
// DISK_CATCHUP ->
|
||||
// still DISK_CATCHUP (if there is still more disk data to read before the first memory data)
|
||||
// NORMAL (otherwise)
|
||||
// NORMAL -> NORMAL (always)
|
||||
enum FeedDiskReadState { STARTING, NORMAL, DISK_CATCHUP };
|
||||
|
||||
ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(StorageServer* data,
|
||||
ChangeFeedStreamRequest req,
|
||||
bool inverted,
|
||||
bool atLatest) {
|
||||
bool atLatest,
|
||||
FeedDiskReadState* feedDiskReadState) {
|
||||
state ChangeFeedStreamReply reply;
|
||||
state ChangeFeedStreamReply memoryReply;
|
||||
state int remainingLimitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
|
||||
|
@ -2823,9 +2848,15 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
if (req.end > emptyVersion + 1) {
|
||||
auto it = searchChangeFeedStart(feedInfo->mutations, req.begin, atLatest);
|
||||
while (it != feedInfo->mutations.end()) {
|
||||
// If DISK_CATCHUP, only read 1 mutation from the memory queue
|
||||
if (it->version >= req.end || it->version > dequeVersion || remainingLimitBytes <= 0) {
|
||||
break;
|
||||
}
|
||||
if ((*feedDiskReadState) == FeedDiskReadState::DISK_CATCHUP && !memoryReply.mutations.empty()) {
|
||||
// so we don't add an empty mutation at the end
|
||||
remainingLimitBytes = -1;
|
||||
break;
|
||||
}
|
||||
MutationsAndVersionRef m = *it;
|
||||
if (doFilterMutations) {
|
||||
m = filterMutations(memoryReply.arena, *it, req.range, inverted);
|
||||
|
@ -2980,6 +3011,28 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
lastVersion = version;
|
||||
lastKnownCommitted = knownCommittedVersion;
|
||||
}
|
||||
|
||||
if ((*feedDiskReadState) == FeedDiskReadState::STARTING ||
|
||||
(*feedDiskReadState) == FeedDiskReadState::DISK_CATCHUP) {
|
||||
if (!memoryReply.mutations.empty() && !reply.mutations.empty() &&
|
||||
reply.mutations.back().version < memoryReply.mutations.front().version && remainingDurableBytes <= 0) {
|
||||
// if we read a full batch from disk and the entire disk read was still less than the first memory
|
||||
// mutation, switch to disk_catchup mode
|
||||
*feedDiskReadState = FeedDiskReadState::DISK_CATCHUP;
|
||||
CODE_PROBE(true, "Feed switching to disk_catchup mode");
|
||||
} else {
|
||||
// for testing
|
||||
if ((*feedDiskReadState) == FeedDiskReadState::STARTING && BUGGIFY_WITH_PROB(0.001)) {
|
||||
*feedDiskReadState = FeedDiskReadState::DISK_CATCHUP;
|
||||
CODE_PROBE(true, "Feed forcing disk_catchup mode");
|
||||
} else {
|
||||
// else switch to normal mode
|
||||
CODE_PROBE(true, "Feed switching to normal mode");
|
||||
*feedDiskReadState = FeedDiskReadState::NORMAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (remainingDurableBytes > 0) {
|
||||
reply.arena.dependsOn(memoryReply.arena);
|
||||
auto it = memoryReply.mutations.begin();
|
||||
|
@ -3001,6 +3054,7 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
|
|||
}
|
||||
} else {
|
||||
reply = memoryReply;
|
||||
*feedDiskReadState = FeedDiskReadState::NORMAL;
|
||||
}
|
||||
|
||||
bool gotAll = remainingLimitBytes > 0 && remainingDurableBytes > 0 && data->version.get() == startVersion;
|
||||
|
@ -3159,6 +3213,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
state Span span("SS:getChangeFeedStream"_loc, req.spanContext);
|
||||
state bool atLatest = false;
|
||||
state bool removeUID = false;
|
||||
state FeedDiskReadState feedDiskReadState = STARTING;
|
||||
state Optional<Version> blockedVersion;
|
||||
|
||||
try {
|
||||
|
@ -3244,7 +3299,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
|||
|
||||
// keep this as not state variable so it is freed after sending to reduce memory
|
||||
Future<std::pair<ChangeFeedStreamReply, bool>> feedReplyFuture =
|
||||
getChangeFeedMutations(data, req, false, atLatest);
|
||||
getChangeFeedMutations(data, req, false, atLatest, &feedDiskReadState);
|
||||
if (atLatest && !removeUID && !feedReplyFuture.isReady()) {
|
||||
data->changeFeedClientVersions[req.reply.getEndpoint().getPrimaryAddress()][req.id] =
|
||||
blockedVersion.present() ? blockedVersion.get() : data->prevVersion;
|
||||
|
|
Loading…
Reference in New Issue