Fixed bug in merge cursor whenAtLeast

This commit is contained in:
Josh Slocum 2021-12-09 14:19:00 -06:00
parent 46ac726700
commit 1ee0b16bfa
4 changed files with 47 additions and 17 deletions

View File

@ -174,6 +174,7 @@ struct ChangeFeedData : ReferenceCounted<ChangeFeedData> {
std::vector<Reference<ChangeFeedStorageData>> storageData;
AsyncVar<int> notAtLatest;
Promise<Void> refresh;
Version maxSeenVersion;
ChangeFeedData() : notAtLatest(1) {}
};

View File

@ -7063,16 +7063,17 @@ Reference<ChangeFeedStorageData> DatabaseContext::getStorageData(StorageServerIn
}
Version ChangeFeedData::getVersion() {
// TODO uncomment?
if (notAtLatest.get() == 0 && mutations.isEmpty() /*& storageData.size() > 0*/) {
Version v = storageData[0]->version.get();
for (int i = 1; i < storageData.size(); i++) {
if (storageData[i]->version.get() < v) {
v = storageData[i]->version.get();
}
}
return std::max(v, lastReturnedVersion.get());
// FIXME: add back in smarter version check later
/*if (notAtLatest.get() == 0 && mutations.isEmpty()) {
Version v = storageData[0]->version.get();
for (int i = 1; i < storageData.size(); i++) {
if (storageData[i]->version.get() < v) {
v = storageData[i]->version.get();
}
}
return std::max(v, lastReturnedVersion.get());
}
*/
return lastReturnedVersion.get();
}
@ -7131,16 +7132,31 @@ ACTOR Future<Void> changeFeedWaitLatest(ChangeFeedData* self, Version version) {
}
// then, wait for client to have consumed up through version
while (!self->mutations.isEmpty()) {
if (self->maxSeenVersion >= version) {
// merge cursor has something buffered but has not yet sent it to self->mutations, just wait for
// lastReturnedVersion
if (DEBUG_CF_WAIT_VERSION == version) {
fmt::print("CFW {0}) WaitLatest: waiting for client onEmpty\n", version);
fmt::print("CFW {0}) WaitLatest: maxSeenVersion -> waiting lastReturned\n", version);
}
wait(self->mutations.onEmpty());
wait(delay(0));
}
if (DEBUG_CF_WAIT_VERSION == version) {
fmt::print("CFW {0}) WaitLatest: done\n", version);
wait(self->lastReturnedVersion.whenAtLeast(version));
if (DEBUG_CF_WAIT_VERSION == version) {
fmt::print("CFW {0}) WaitLatest: maxSeenVersion -> got lastReturned\n", version);
}
} else {
// all mutations <= version are in self->mutations, wait for empty
while (!self->mutations.isEmpty()) {
if (DEBUG_CF_WAIT_VERSION == version) {
fmt::print("CFW {0}) WaitLatest: waiting for client onEmpty\n", version);
}
wait(self->mutations.onEmpty());
wait(delay(0));
}
if (DEBUG_CF_WAIT_VERSION == version) {
fmt::print("CFW {0}) WaitLatest: done\n", version);
}
}
return Void();
@ -7242,6 +7258,9 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf,
}
resultLoc++;
}
if (rep.mutations.back().version > feedData->maxSeenVersion) {
feedData->maxSeenVersion = rep.mutations.back().version;
}
nextVersion = rep.mutations.back().version + 1;
if (!atLatestVersion && rep.atLatestVersion) {
@ -7317,6 +7336,7 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
db->changeFeedUpdaters.erase(it->id);
}
}
results->maxSeenVersion = invalidVersion;
results->storageData.clear();
Promise<Void> refresh = results->refresh;
results->refresh = Promise<Void>();
@ -7355,6 +7375,10 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
if (DEBUG_CF_VERSION(nextOut.back().version)) {
fmt::print("CFNA (merged): {0} (1)\n", nextOut.back().version);
}
if (nextOut.back().version < results->lastReturnedVersion.get()) {
printf("ERROR: merge cursor pushing next out <= lastReturnedVersion");
}
ASSERT(nextOut.back().version >= results->lastReturnedVersion.get());
results->mutations.send(nextOut);
wait(results->mutations.onEmpty());
@ -7532,6 +7556,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
}
results->streams.push_back(interf.changeFeedStream.getReplyStream(req));
results->maxSeenVersion = invalidVersion;
results->storageData.clear();
results->storageData.push_back(db->getStorageData(interf));
Promise<Void> refresh = results->refresh;

View File

@ -1889,6 +1889,10 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&
(v <= metadata->durableSnapshotVersion.get() ||
metadata->durableSnapshotVersion.get() == metadata->pendingSnapshotVersion)) {
// TODO REMOVE debugging
if (v > metadata->waitForVersionReturned) {
metadata->waitForVersionReturned = v;
}
if (v == DEBUG_BW_WAIT_VERSION) {
fmt::print("{0}) already done\n", v);
}

View File

@ -169,7 +169,7 @@ ERROR( quick_get_key_values_has_more, 2033, "One of the mapped range queries is
ERROR( quick_get_value_miss, 2034, "Found a mapped key that is not served in the same SS" )
ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served in the same SS" )
ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" )
ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read was not materialized" )
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )