removing overhead of local change feed stream copy (#8205)

This commit is contained in:
Josh Slocum 2022-09-21 14:44:51 -05:00 committed by GitHub
parent 6270016bed
commit b7be356a98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 123 deletions

View File

@ -2929,49 +2929,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
return std::make_pair(reply, gotAll);
}
ACTOR Future<Void> localChangeFeedStream(StorageServer* data,
PromiseStream<Standalone<MutationsAndVersionRef>> results,
Key rangeID,
Version begin,
Version end,
KeyRange range) {
try {
loop {
state ChangeFeedStreamRequest feedRequest;
feedRequest.rangeID = rangeID;
feedRequest.begin = begin;
feedRequest.end = end;
feedRequest.range = range;
state std::pair<ChangeFeedStreamReply, bool> feedReply =
wait(getChangeFeedMutations(data, feedRequest, true, false, UID()));
begin = feedReply.first.mutations.back().version + 1;
state int resultLoc = 0;
while (resultLoc < feedReply.first.mutations.size()) {
if (feedReply.first.mutations[resultLoc].mutations.size() ||
feedReply.first.mutations[resultLoc].version == end - 1) {
wait(results.onEmpty());
results.send(feedReply.first.mutations[resultLoc]);
}
resultLoc++;
}
if (begin == end) {
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_unknown_change_feed) {
CODE_PROBE(true, "CF was moved away, no more local data to merge with");
// Send endVersion so local stream is effectively done. We couldn't have send that already, because that
// would mean the stream would have finished without error
results.send(MutationsAndVersionRef(end, invalidVersion));
} else {
TraceEvent(SevError, "LocalChangeFeedError", data->thisServerID).error(e).detail("CFID", rangeID);
}
throw;
}
}
// Change feed stream must be sent an error as soon as it is moved away, or change feed can get incorrect results
ACTOR Future<Void> stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) {
auto feed = data->uidChangeFeed.find(req.rangeID);
@ -5834,17 +5791,9 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
state Version lastVersion = invalidVersion;
state int64_t versionsFetched = 0;
state PromiseStream<Standalone<MutationsAndVersionRef>> localResults;
// ensure SS is at least caught up to begin version, to maintain behavior with old fetch
wait(data->version.whenAtLeast(startVersion));
// Add 1 to fetch version to make sure the local stream will have more versions in the stream than the remote stream
// to avoid edge cases in the merge logic
state Future<Void> localStream =
localChangeFeedStream(data, localResults, rangeId, startVersion, endVersion + 1, range);
state Standalone<MutationsAndVersionRef> localResult;
Standalone<MutationsAndVersionRef> _localResult = waitNext(localResults.getFuture());
localResult = _localResult;
try {
loop {
while (data->fetchKeysBudgetUsed.get()) {
@ -5854,6 +5803,10 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
state Standalone<VectorRef<MutationsAndVersionRef>> remoteResult =
waitNext(feedResults->mutations.getFuture());
state int remoteLoc = 0;
// ensure SS is at least caught up to begin version, to maintain behavior with old fetch
if (!remoteResult.empty()) {
wait(data->version.whenAtLeast(remoteResult.back().version));
}
while (remoteLoc < remoteResult.size()) {
if (feedResults->popVersion - 1 > changeFeedInfo->emptyVersion) {
@ -5881,80 +5834,53 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
++data->counters.kvSystemClearRanges;
}
Version localVersion = localResult.version;
Version remoteVersion = remoteResult[remoteLoc].version;
if (remoteVersion <= localVersion) {
if (remoteVersion > changeFeedInfo->emptyVersion) {
// merge if same version
if (remoteVersion == localVersion && remoteResult[remoteLoc].mutations.size() &&
remoteResult[remoteLoc].mutations.back().param1 != lastEpochEndPrivateKey) {
int remoteSize = remoteResult[remoteLoc].mutations.size();
ASSERT(localResult.mutations.size());
remoteResult[remoteLoc].mutations.append(
remoteResult.arena(), localResult.mutations.begin(), localResult.mutations.size());
if (MUTATION_TRACKING_ENABLED) {
int midx = 0;
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMoveMerge", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("FromLocal", midx >= remoteSize)
.detail("ChangeFeedID", rangeId);
midx++;
}
}
} else {
if (MUTATION_TRACKING_ENABLED) {
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMove", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("ChangeFeedID", rangeId);
}
}
}
data->storage.writeKeyValue(
KeyValueRef(changeFeedDurableKey(rangeId, remoteVersion),
changeFeedDurableValue(remoteResult[remoteLoc].mutations,
remoteResult[remoteLoc].knownCommittedVersion)));
++data->counters.kvSystemClearRanges;
changeFeedInfo->fetchVersion = std::max(changeFeedInfo->fetchVersion, remoteVersion);
if (firstVersion == invalidVersion) {
firstVersion = remoteVersion;
}
lastVersion = remoteVersion;
versionsFetched++;
} else {
CODE_PROBE(true, "Change feed ignoring write on move because it was popped concurrently");
if (MUTATION_TRACKING_ENABLED) {
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMoveIgnore", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("ChangeFeedID", rangeId)
.detail("EmptyVersion", changeFeedInfo->emptyVersion);
}
}
if (versionsFetched > 0) {
ASSERT(firstVersion != invalidVersion);
ASSERT(lastVersion != invalidVersion);
data->storage.clearRange(
KeyRangeRef(changeFeedDurableKey(changeFeedInfo->id, firstVersion),
changeFeedDurableKey(changeFeedInfo->id, lastVersion + 1)));
++data->counters.kvSystemClearRanges;
firstVersion = invalidVersion;
lastVersion = invalidVersion;
versionsFetched = 0;
// ensure SS is at least caught up to this version, to maintain behavior with old fetch
ASSERT(remoteVersion <= data->version.get());
if (remoteVersion > changeFeedInfo->emptyVersion) {
if (MUTATION_TRACKING_ENABLED) {
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMove", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("ChangeFeedID", rangeId);
}
}
remoteLoc++;
}
if (localVersion <= remoteVersion) {
// Do this once per wait instead of once per version for efficiency
data->fetchingChangeFeeds.insert(changeFeedInfo->id);
Standalone<MutationsAndVersionRef> _localResult = waitNext(localResults.getFuture());
localResult = _localResult;
data->storage.writeKeyValue(
KeyValueRef(changeFeedDurableKey(rangeId, remoteVersion),
changeFeedDurableValue(remoteResult[remoteLoc].mutations,
remoteResult[remoteLoc].knownCommittedVersion)));
++data->counters.kvSystemClearRanges;
changeFeedInfo->fetchVersion = std::max(changeFeedInfo->fetchVersion, remoteVersion);
if (firstVersion == invalidVersion) {
firstVersion = remoteVersion;
}
lastVersion = remoteVersion;
versionsFetched++;
} else {
CODE_PROBE(true, "Change feed ignoring write on move because it was popped concurrently");
if (MUTATION_TRACKING_ENABLED) {
for (auto& m : remoteResult[remoteLoc].mutations) {
DEBUG_MUTATION("ChangeFeedWriteMoveIgnore", remoteVersion, m, data->thisServerID)
.detail("Range", range)
.detail("ChangeFeedID", rangeId)
.detail("EmptyVersion", changeFeedInfo->emptyVersion);
}
}
if (versionsFetched > 0) {
ASSERT(firstVersion != invalidVersion);
ASSERT(lastVersion != invalidVersion);
data->storage.clearRange(
KeyRangeRef(changeFeedDurableKey(changeFeedInfo->id, firstVersion),
changeFeedDurableKey(changeFeedInfo->id, lastVersion + 1)));
++data->counters.kvSystemClearRanges;
firstVersion = invalidVersion;
lastVersion = invalidVersion;
versionsFetched = 0;
}
}
remoteLoc++;
}
// Do this once per wait instead of once per version for efficiency
data->fetchingChangeFeeds.insert(changeFeedInfo->id);