diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index d7858d5ee6..3506137759 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7494,6 +7494,11 @@ ACTOR Future doCFMerge(Reference results, wait(delay(0)); ASSERT(results->mutations.isEmpty()); + // update lastReturned once the previous mutation has been consumed + if (*begin - 1 > results->lastReturnedVersion.get()) { + results->lastReturnedVersion.set(*begin - 1); + } + state int interfNum = 0; // TODO minor optimization - could make this just a vector of indexes if each MutationAndVersionStream remembered @@ -7711,11 +7716,18 @@ ACTOR Future doSingleCFStream(KeyRange range, ASSERT(results->streams.size() == 1); ASSERT(results->storageData.size() == 1); state bool atLatest = false; + + // wait for any previous mutations in stream to be consumed + wait(results->mutations.onEmpty()); + wait(delay(0)); + ASSERT(results->mutations.isEmpty()); + // update lastReturned once the previous mutation has been consumed + if (*begin - 1 > results->lastReturnedVersion.get()) { + results->lastReturnedVersion.set(*begin - 1); + } + loop { - // wait for any previous mutations in stream to be consumed - wait(results->mutations.onEmpty()); - wait(delay(0)); - ASSERT(results->mutations.isEmpty()); + state ChangeFeedStreamReply feedReply = waitNext(results->streams[0].getFuture()); *begin = feedReply.mutations.back().version + 1; diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 29a2a10666..6806a7cac7 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -1096,17 +1096,44 @@ static Version doGranuleRollback(Reference metadata, return cfRollbackVersion; } -ACTOR Future waitOnCFVersion(Reference metadata, Version cfVersion) { +#define DEBUG_WAIT_VERSION_COMMITTED false +ACTOR Future waitOnCFVersion(Reference metadata, + Version original /*TODO REMOVE, just for debugging*/, + Version waitVersion) { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: waiting for {1} \n", original, waitVersion); + } loop { try { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + if (metadata->activeCFData.get().isValid()) { + fmt::print( + "WVC {0}: WAL (currently {1})\n", original, metadata->activeCFData.get()->getVersion()); + } else { + fmt::print("WVC {0}: invalid\n", original, metadata->activeCFData.get()->getVersion()); + } + } // if not valid, we're about to be cancelled anyway - state Future atLeast = - metadata->activeCFData.get().isValid() ? metadata->activeCFData.get()->whenAtLeast(cfVersion) : Never(); + state Future atLeast = metadata->activeCFData.get().isValid() + ? metadata->activeCFData.get()->whenAtLeast(waitVersion) + : Never(); choose { - when(wait(atLeast)) { break; } - when(wait(metadata->activeCFData.onChange())) {} + when(wait(atLeast)) { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: got at least {1} \n", original, waitVersion); + } + break; + } + when(wait(metadata->activeCFData.onChange())) { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: cfOnChange \n", original); + } + } } } catch (Error& e) { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: got error {1} \n", original, e.name()); + } if (e.code() == error_code_operation_cancelled) { throw e; } @@ -1114,9 +1141,9 @@ ACTOR Future waitOnCFVersion(Reference metadata, Version // if waiting on a parent granule change feed and we change to the child, the parent will get end_of_stream, // which could cause this waiting whenAtLeast to get change_feed_cancelled. We should simply retry and wait // a bit, as blobGranuleUpdateFiles will switch to the new change feed - if (e.code() != error_code_change_feed_cancelled) { + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { if (BW_DEBUG) { - fmt::print("waitVersionCommitted WAL got unexpected error {}\n", e.name()); + fmt::print("WVC {0}: unexpected error {1}\n", original, e.name()); } throw e; } @@ -1124,9 +1151,13 @@ ACTOR Future waitOnCFVersion(Reference metadata, Version } } + if (DEBUG_BW_VERSION(original) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: got \n", original); + } + // sanity check to make sure whenAtLeast didn't return early - if (cfVersion > metadata->waitForVersionReturned) { - metadata->waitForVersionReturned = cfVersion; + if (waitVersion > metadata->waitForVersionReturned) { + metadata->waitForVersionReturned = waitVersion; } // stop after change feed callback @@ -1138,8 +1169,14 @@ ACTOR Future waitOnCFVersion(Reference metadata, Version ACTOR Future waitCommittedGrv(Reference bwData, Reference metadata, Version version) { + if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: grv start\n", version); + } // TODO REMOVE debugs if (version > bwData->grvVersion.get()) { + if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: getting grv\n", version); + } /*if (BW_DEBUG) { fmt::print("waitVersionCommitted waiting {0}\n", version); }*/ @@ -1154,7 +1191,10 @@ ACTOR Future waitCommittedGrv(Reference bwData, } Version grvVersion = bwData->grvVersion.get(); - wait(waitOnCFVersion(metadata, grvVersion)); + if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: got grv\n", version); + } + wait(waitOnCFVersion(metadata, version, grvVersion)); return Void(); } @@ -1163,8 +1203,14 @@ ACTOR Future waitVersionCommitted(Reference bwData, Version version) { // If GRV is way in the future, we know we can't roll back more than 5 seconds (or whatever this knob is set to) // worth of versions + if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: starting\n", version); + } wait(waitCommittedGrv(bwData, metadata, version) || - waitOnCFVersion(metadata, version + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)); + waitOnCFVersion(metadata, version, version + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)); + if (DEBUG_BW_VERSION(version) && DEBUG_WAIT_VERSION_COMMITTED) { + fmt::print("WVC {0}: done\n", version); + } if (version > metadata->knownCommittedVersion) { metadata->knownCommittedVersion = version; } @@ -1358,8 +1404,10 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, when(Standalone> _mutations = waitNext(metadata->activeCFData.get()->mutations.getFuture())) { /*if (DEBUG_BW_VERSION(metadata->bufferedDeltaVersion)) { - fmt::print("BW got mutations after ({0}): ({1})\n", + fmt::print("BW got mutations after ({0}): {1} - {2} ({3})\n", metadata->bufferedDeltaVersion, + _mutations.front().version, + _mutations.back().version, _mutations.size()); }*/ mutations = _mutations;