From 1ee0b16bfae9dcd6f2889d96e001230ea7b5784e Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 9 Dec 2021 14:19:00 -0600 Subject: [PATCH] Fixed bug in merge cursor whenAtLeast --- fdbclient/DatabaseContext.h | 1 + fdbclient/NativeAPI.actor.cpp | 57 ++++++++++++++++++++++++---------- fdbserver/BlobWorker.actor.cpp | 4 +++ flow/error_definitions.h | 2 +- 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 859e4d9212..2f47524f2f 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -174,6 +174,7 @@ struct ChangeFeedData : ReferenceCounted { std::vector> storageData; AsyncVar notAtLatest; Promise refresh; + Version maxSeenVersion; ChangeFeedData() : notAtLatest(1) {} }; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index fcbdccec98..764c87189f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7063,16 +7063,17 @@ Reference DatabaseContext::getStorageData(StorageServerIn } Version ChangeFeedData::getVersion() { - // TODO uncomment? - if (notAtLatest.get() == 0 && mutations.isEmpty() /*& storageData.size() > 0*/) { - Version v = storageData[0]->version.get(); - for (int i = 1; i < storageData.size(); i++) { - if (storageData[i]->version.get() < v) { - v = storageData[i]->version.get(); - } - } - return std::max(v, lastReturnedVersion.get()); + // FIXME: add back in smarter version check later + /*if (notAtLatest.get() == 0 && mutations.isEmpty()) { + Version v = storageData[0]->version.get(); + for (int i = 1; i < storageData.size(); i++) { + if (storageData[i]->version.get() < v) { + v = storageData[i]->version.get(); + } + } + return std::max(v, lastReturnedVersion.get()); } + */ return lastReturnedVersion.get(); } @@ -7131,16 +7132,31 @@ ACTOR Future changeFeedWaitLatest(ChangeFeedData* self, Version version) { } // then, wait for client to have consumed up through version - while (!self->mutations.isEmpty()) { + if (self->maxSeenVersion >= version) { + // merge cursor has something buffered but has not yet sent it to self->mutations, just wait for + // lastReturnedVersion if (DEBUG_CF_WAIT_VERSION == version) { - fmt::print("CFW {0}) WaitLatest: waiting for client onEmpty\n", version); + fmt::print("CFW {0}) WaitLatest: maxSeenVersion -> waiting lastReturned\n", version); } - wait(self->mutations.onEmpty()); - wait(delay(0)); - } - if (DEBUG_CF_WAIT_VERSION == version) { - fmt::print("CFW {0}) WaitLatest: done\n", version); + wait(self->lastReturnedVersion.whenAtLeast(version)); + + if (DEBUG_CF_WAIT_VERSION == version) { + fmt::print("CFW {0}) WaitLatest: maxSeenVersion -> got lastReturned\n", version); + } + } else { + // all mutations <= version are in self->mutations, wait for empty + while (!self->mutations.isEmpty()) { + if (DEBUG_CF_WAIT_VERSION == version) { + fmt::print("CFW {0}) WaitLatest: waiting for client onEmpty\n", version); + } + wait(self->mutations.onEmpty()); + wait(delay(0)); + } + + if (DEBUG_CF_WAIT_VERSION == version) { + fmt::print("CFW {0}) WaitLatest: done\n", version); + } } return Void(); @@ -7242,6 +7258,9 @@ ACTOR Future singleChangeFeedStream(StorageServerInterface interf, } resultLoc++; } + if (rep.mutations.back().version > feedData->maxSeenVersion) { + feedData->maxSeenVersion = rep.mutations.back().version; + } nextVersion = rep.mutations.back().version + 1; if (!atLatestVersion && rep.atLatestVersion) { @@ -7317,6 +7336,7 @@ ACTOR Future mergeChangeFeedStream(Reference db, db->changeFeedUpdaters.erase(it->id); } } + results->maxSeenVersion = invalidVersion; results->storageData.clear(); Promise refresh = results->refresh; results->refresh = Promise(); @@ -7355,6 +7375,10 @@ ACTOR Future mergeChangeFeedStream(Reference db, if (DEBUG_CF_VERSION(nextOut.back().version)) { fmt::print("CFNA (merged): {0} (1)\n", nextOut.back().version); } + + if (nextOut.back().version < results->lastReturnedVersion.get()) { + printf("ERROR: merge cursor pushing next out <= lastReturnedVersion"); + } ASSERT(nextOut.back().version >= results->lastReturnedVersion.get()); results->mutations.send(nextOut); wait(results->mutations.onEmpty()); @@ -7532,6 +7556,7 @@ ACTOR Future getChangeFeedStreamActor(Reference db, } results->streams.push_back(interf.changeFeedStream.getReplyStream(req)); + results->maxSeenVersion = invalidVersion; results->storageData.clear(); results->storageData.push_back(db->getStorageData(interf)); Promise refresh = results->refresh; diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index c453d3c301..5e2e191946 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -1889,6 +1889,10 @@ ACTOR Future waitForVersion(Reference metadata, Version v metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) && (v <= metadata->durableSnapshotVersion.get() || metadata->durableSnapshotVersion.get() == metadata->pendingSnapshotVersion)) { + // TODO REMOVE debugging + if (v > metadata->waitForVersionReturned) { + metadata->waitForVersionReturned = v; + } if (v == DEBUG_BW_WAIT_VERSION) { fmt::print("{0}) already done\n", v); } diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 5cd156ccdb..88def0477e 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -169,7 +169,7 @@ ERROR( quick_get_key_values_has_more, 2033, "One of the mapped range queries is ERROR( quick_get_value_miss, 2034, "Found a mapped key that is not served in the same SS" ) ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served in the same SS" ) ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" ) -ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" ) +ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read was not materialized" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" ) ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )