From d85eb330e0400a8b5ee5854d1ba89f62ae68422d Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 2 Dec 2021 14:52:16 -0600 Subject: [PATCH] retooling some waitForVersion stuff and adding asserts --- fdbclient/NativeAPI.actor.cpp | 12 +++++++----- fdbserver/BlobWorker.actor.cpp | 17 +++++++++++------ fdbserver/storageserver.actor.cpp | 2 ++ 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 6b985714da..b27a7650ca 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6835,7 +6835,7 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } choose { - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(waitForAll(allAtLeast))) { std::vector> onEmpty; if (!self->mutations.isEmpty()) { @@ -6847,14 +6847,14 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } if (!onEmpty.size()) { - return Void(); + break; } choose { when(wait(waitForAll(onEmpty))) { wait(delay(0)); - return Void(); + break; } - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(self->refresh.getFuture())) {} when(wait(self->notAtLatest.onChange())) {} } @@ -6864,12 +6864,14 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } else { choose { - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(self->notAtLatest.onChange())) {} when(wait(self->refresh.getFuture())) {} } } } + ASSERT(self->getVersion() >= version); + return Void(); } Future ChangeFeedData::whenAtLeast(Version version) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index e63cc3c2dc..2a450bf5d7 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -1814,29 +1814,34 @@ ACTOR Future waitForVersion(Reference metadata, Version v // wait for change feed version to catch up to ensure we have all data if (metadata->activeCFData.get()->getVersion() < v) { wait(metadata->activeCFData.get()->whenAtLeast(v)); + ASSERT(metadata->activeCFData.get()->getVersion() >= v); } // wait for any pending delta and snapshot files as of the moment the change feed version caught up. state Version pendingDeltaV = metadata->pendingDeltaVersion; state Version pendingSnapshotV = metadata->pendingSnapshotVersion; - // ASSERT(pendingDeltaV <= metadata->activeCFData.get()->getVersion()); - if (pendingDeltaV > metadata->durableDeltaVersion.get()) { + // If there are mutations that are no longer buffered but have not been + // persisted to a delta file that are necessary for the query, wait for them + if (pendingDeltaV > metadata->durableDeltaVersion.get() && v > metadata->durableDeltaVersion.get()) { wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV)); + ASSERT(metadata->durableDeltaVersion.get() >= pendingDeltaV); } // This isn't strictly needed, but if we're in the process of re-snapshotting, we'd likely rather // return that snapshot file than the previous snapshot file and all its delta files. - if (pendingSnapshotV > metadata->durableSnapshotVersion.get()) { + if (pendingSnapshotV > metadata->durableSnapshotVersion.get() && v > metadata->durableSnapshotVersion.get()) { wait(metadata->durableSnapshotVersion.whenAtLeast(pendingSnapshotV)); + ASSERT(metadata->durableSnapshotVersion.get() >= pendingSnapshotV); } // There is a race here - we wait for pending delta files before this to finish, but while we do, we // kick off another delta file and roll the mutations. In that case, we must return the new delta // file instead of in memory mutations, so we wait for that delta file to complete - if (metadata->pendingDeltaVersion != pendingDeltaV) { - wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV + 1)); + if (metadata->pendingDeltaVersion > v) { + wait(metadata->durableDeltaVersion.whenAtLeast(v)); + ASSERT(metadata->durableDeltaVersion.get() >= v); } return Void(); @@ -2101,7 +2106,7 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData req.reply.send(rep); --bwData->stats.activeReadRequests; } catch (Error& e) { - printf("Error in BGFRequest %s\n", e.name()); + // printf("Error in BGFRequest %s\n", e.name()); if (e.code() == error_code_operation_cancelled) { req.reply.sendError(wrong_shard_server()); throw; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index cc48631ead..f1bcf67927 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4650,6 +4650,7 @@ void changeServerKeys(StorageServer* data, } } } + data->keyChangeFeed.coalesce(f.second.contents()); auto feed = data->uidChangeFeed.find(f.first); if (feed != data->uidChangeFeed.end()) { feed->second->removing = true; @@ -4905,6 +4906,7 @@ private: } } } + data->keyChangeFeed.coalesce(feed->second->range.contents()); data->uidChangeFeed.erase(feed); } else { // must be pop or stop