Merge pull request #6126 from sfc-gh-etschannen/blob_integration

Fixed a few more change feed bugs
This commit is contained in:
Evan Tschannen 2021-12-07 21:02:19 -08:00 committed by GitHub
commit a47ce06e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 12 deletions

View File

@ -7282,6 +7282,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
*begin = checkVersion + 1;
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
wait(results->mutations.onEmpty());
if (nextOut.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(nextOut.back().version);
}
@ -7311,6 +7312,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
if (nextOut.size()) {
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
wait(results->mutations.onEmpty());
if (nextOut.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(nextOut.back().version);
}
@ -7455,32 +7457,33 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
wait(results->mutations.onEmpty());
choose {
when(wait(cx->connectionFileChanged())) { break; }
when(ChangeFeedStreamReply rep = waitNext(results->streams[0].getFuture())) {
begin = rep.mutations.back().version + 1;
when(state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture())) {
begin = feedReply.mutations.back().version + 1;
// TODO REMOVE, for debugging
if (rep.mutations.back().version < results->lastReturnedVersion.get()) {
if (feedReply.mutations.back().version < results->lastReturnedVersion.get()) {
printf("out of order mutation for CF %s from (%d) %s! %lld < %lld\n",
rangeID.toString().substr(0, 6).c_str(),
results->storageData.size(),
results->storageData.empty()
? "????"
: results->storageData[0]->id.toString().substr(0, 4).c_str(),
rep.mutations.back().version,
feedReply.mutations.back().version,
results->lastReturnedVersion.get());
}
ASSERT(rep.mutations.back().version >= results->lastReturnedVersion.get());
ASSERT(feedReply.mutations.back().version >= results->lastReturnedVersion.get());
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(rep.mutations, rep.arena));
if (rep.mutations.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(rep.mutations.back().version);
Standalone<VectorRef<MutationsAndVersionRef>>(feedReply.mutations, feedReply.arena));
wait(results->mutations.onEmpty());
if (feedReply.mutations.back().version > results->lastReturnedVersion.get()) {
results->lastReturnedVersion.set(feedReply.mutations.back().version);
}
if (!atLatest && rep.atLatestVersion) {
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true;
results->notAtLatest.set(0);
}
if (rep.minStreamVersion > results->storageData[0]->version.get()) {
results->storageData[0]->version.set(rep.minStreamVersion);
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
results->storageData[0]->version.set(feedReply.minStreamVersion);
}
}
}

View File

@ -3929,7 +3929,8 @@ ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
}
remoteLoc++;
} else if (remoteResult[remoteLoc].version == localResult.version) {
if (remoteResult[remoteLoc].mutations.size()) {
if (remoteResult[remoteLoc].mutations.size() &&
remoteResult[remoteLoc].mutations.back().param1 != lastEpochEndPrivateKey) {
ASSERT(localResult.mutations.size());
remoteResult[remoteLoc].mutations.append(
remoteResult.arena(), localResult.mutations.begin(), localResult.mutations.size());