retooling some waitForVersion stuff and adding asserts

This commit is contained in:
Josh Slocum 2021-12-02 14:52:16 -06:00
parent f43169cb7b
commit d85eb330e0
3 changed files with 20 additions and 11 deletions

View File

@ -6835,7 +6835,7 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
}
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(waitForAll(allAtLeast))) {
std::vector<Future<Void>> onEmpty;
if (!self->mutations.isEmpty()) {
@ -6847,14 +6847,14 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
}
if (!onEmpty.size()) {
return Void();
break;
}
choose {
when(wait(waitForAll(onEmpty))) {
wait(delay(0));
return Void();
break;
}
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(self->refresh.getFuture())) {}
when(wait(self->notAtLatest.onChange())) {}
}
@ -6864,12 +6864,14 @@ ACTOR Future<Void> changeFeedWhenAtLatest(ChangeFeedData* self, Version version)
}
} else {
choose {
when(wait(lastReturned)) { return Void(); }
when(wait(lastReturned)) { break; }
when(wait(self->notAtLatest.onChange())) {}
when(wait(self->refresh.getFuture())) {}
}
}
}
ASSERT(self->getVersion() >= version);
return Void();
}
Future<Void> ChangeFeedData::whenAtLeast(Version version) {

View File

@ -1814,29 +1814,34 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
// wait for change feed version to catch up to ensure we have all data
if (metadata->activeCFData.get()->getVersion() < v) {
wait(metadata->activeCFData.get()->whenAtLeast(v));
ASSERT(metadata->activeCFData.get()->getVersion() >= v);
}
// wait for any pending delta and snapshot files as of the moment the change feed version caught up.
state Version pendingDeltaV = metadata->pendingDeltaVersion;
state Version pendingSnapshotV = metadata->pendingSnapshotVersion;
// ASSERT(pendingDeltaV <= metadata->activeCFData.get()->getVersion());
if (pendingDeltaV > metadata->durableDeltaVersion.get()) {
// If there are mutations that are no longer buffered but have not been
// persisted to a delta file that are necessary for the query, wait for them
if (pendingDeltaV > metadata->durableDeltaVersion.get() && v > metadata->durableDeltaVersion.get()) {
wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV));
ASSERT(metadata->durableDeltaVersion.get() >= pendingDeltaV);
}
// This isn't strictly needed, but if we're in the process of re-snapshotting, we'd likely rather
// return that snapshot file than the previous snapshot file and all its delta files.
if (pendingSnapshotV > metadata->durableSnapshotVersion.get()) {
if (pendingSnapshotV > metadata->durableSnapshotVersion.get() && v > metadata->durableSnapshotVersion.get()) {
wait(metadata->durableSnapshotVersion.whenAtLeast(pendingSnapshotV));
ASSERT(metadata->durableSnapshotVersion.get() >= pendingSnapshotV);
}
// There is a race here - we wait for pending delta files before this to finish, but while we do, we
// kick off another delta file and roll the mutations. In that case, we must return the new delta
// file instead of in memory mutations, so we wait for that delta file to complete
if (metadata->pendingDeltaVersion != pendingDeltaV) {
wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV + 1));
if (metadata->pendingDeltaVersion > v) {
wait(metadata->durableDeltaVersion.whenAtLeast(v));
ASSERT(metadata->durableDeltaVersion.get() >= v);
}
return Void();
@ -2101,7 +2106,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData
req.reply.send(rep);
--bwData->stats.activeReadRequests;
} catch (Error& e) {
printf("Error in BGFRequest %s\n", e.name());
// printf("Error in BGFRequest %s\n", e.name());
if (e.code() == error_code_operation_cancelled) {
req.reply.sendError(wrong_shard_server());
throw;

View File

@ -4650,6 +4650,7 @@ void changeServerKeys(StorageServer* data,
}
}
}
data->keyChangeFeed.coalesce(f.second.contents());
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->removing = true;
@ -4905,6 +4906,7 @@ private:
}
}
}
data->keyChangeFeed.coalesce(feed->second->range.contents());
data->uidChangeFeed.erase(feed);
} else {
// must be pop or stop