diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index b0bdf119cd..b27a7650ca 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6808,7 +6808,8 @@ Reference DatabaseContext::getStorageData(StorageServerIn } Version ChangeFeedData::getVersion() { - if (notAtLatest.get() == 0 && mutations.isEmpty()) { + // TODO uncomment? + if (notAtLatest.get() == 0 && mutations.isEmpty() /*& storageData.size() > 0*/) { Version v = storageData[0]->version.get(); for (int i = 1; i < storageData.size(); i++) { if (storageData[i]->version.get() < v) { @@ -6834,7 +6835,7 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } choose { - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(waitForAll(allAtLeast))) { std::vector> onEmpty; if (!self->mutations.isEmpty()) { @@ -6846,14 +6847,14 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } if (!onEmpty.size()) { - return Void(); + break; } choose { when(wait(waitForAll(onEmpty))) { wait(delay(0)); - return Void(); + break; } - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(self->refresh.getFuture())) {} when(wait(self->notAtLatest.onChange())) {} } @@ -6863,12 +6864,14 @@ ACTOR Future changeFeedWhenAtLatest(ChangeFeedData* self, Version version) } } else { choose { - when(wait(lastReturned)) { return Void(); } + when(wait(lastReturned)) { break; } when(wait(self->notAtLatest.onChange())) {} when(wait(self->refresh.getFuture())) {} } } } + ASSERT(self->getVersion() >= version); + return Void(); } Future ChangeFeedData::whenAtLeast(Version version) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 6f48522556..2a450bf5d7 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -98,15 +98,13 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { uint64_t bufferedDeltaBytes = 0; // for client to know when it is safe to read a certain version and from where (check waitForVersion) - NotifiedVersion bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions) + Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions) Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots Version pendingSnapshotVersion = 0; Version initialSnapshotVersion = invalidVersion; - AsyncVar rollbackCount; - int64_t originalEpoch; int64_t originalSeqno; int64_t continueEpoch; @@ -118,6 +116,8 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted { Promise resumeSnapshot; + AsyncVar> activeCFData; + AssignBlobRangeRequest originalReq; void resume() { @@ -1011,6 +1011,14 @@ struct InFlightFile { : future(future), version(version), bytes(bytes), snapshot(snapshot) {} }; +static Reference newChangeFeedData(Version startVersion) { + // FIXME: should changeFeedStream guarantee that this is always set to begin-1 instead? + Reference r = makeReference(); + // TODO uncomment? + // r->lastReturnedVersion.set(startVersion); + return r; +} + static Version doGranuleRollback(Reference metadata, Version mutationVersion, Version rollbackVersion, @@ -1074,9 +1082,7 @@ static Version doGranuleRollback(Reference metadata, metadata->deltaArena = Arena(); metadata->currentDeltas = GranuleDeltas(); metadata->bufferedDeltaBytes = 0; - // Create new notified version so it doesn't go backwards. Do this before signaling the rollback counter so that - // the callers pick this new one up for the next waitForVersion - metadata->bufferedDeltaVersion = NotifiedVersion(cfRollbackVersion); + metadata->bufferedDeltaVersion = cfRollbackVersion; // Track that this rollback happened, since we have to re-read mutations up to the rollback // Add this rollback to in progress, and put all completed ones back in progress @@ -1115,9 +1121,7 @@ static Version doGranuleRollback(Reference metadata, // delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations // directly and immediately pop the rollback out of inProgress - // Create new notified version so it doesn't go backwards. Do this before signaling the rollback counter so that - // the callers pick this new one up for the next waitForVersion - metadata->bufferedDeltaVersion = NotifiedVersion(rollbackVersion); + metadata->bufferedDeltaVersion = rollbackVersion; cfRollbackVersion = mutationVersion; } @@ -1128,8 +1132,6 @@ static Version doGranuleRollback(Reference metadata, cfRollbackVersion); } - metadata->rollbackCount.set(metadata->rollbackCount.get() + 1); - return cfRollbackVersion; } @@ -1139,8 +1141,6 @@ static Version doGranuleRollback(Reference metadata, ACTOR Future blobGranuleUpdateFiles(Reference bwData, Reference metadata, Future assignFuture) { - state Reference oldChangeFeedStream = makeReference(); - state Reference changeFeedStream = makeReference(); state std::deque inFlightFiles; state Future oldChangeFeedFuture; state Future changeFeedFuture; @@ -1237,26 +1237,29 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata->durableDeltaVersion.set(startVersion); metadata->pendingDeltaVersion = startVersion; - metadata->bufferedDeltaVersion.set(startVersion); + metadata->bufferedDeltaVersion = startVersion; committedVersion.set(startVersion); - ASSERT(metadata->readable.canBeSet()); - metadata->readable.send(Void()); - + metadata->activeCFData.set(newChangeFeedData(startVersion)); if (startState.parentGranule.present() && startVersion < startState.changeFeedStartVersion) { // read from parent change feed up until our new change feed is started readOldChangeFeed = true; - oldChangeFeedFuture = bwData->db->getChangeFeedStream(oldChangeFeedStream, + + oldChangeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(), oldCFKey.get(), startVersion + 1, startState.changeFeedStartVersion, metadata->keyRange); + } else { readOldChangeFeed = false; changeFeedFuture = bwData->db->getChangeFeedStream( - changeFeedStream, cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange); + metadata->activeCFData.get(), cfKey, startVersion + 1, MAX_VERSION, metadata->keyRange); } + ASSERT(metadata->readable.canBeSet()); + metadata->readable.send(Void()); + loop { // check outstanding snapshot/delta files for completion while (inFlightFiles.size() > 0) { @@ -1298,16 +1301,12 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, state Standalone> mutations; try { + state Standalone> _mutations = + waitNext(metadata->activeCFData.get()->mutations.getFuture()); + mutations = _mutations; if (readOldChangeFeed) { - // TODO efficient way to store next in mutations? - Standalone> oldMutations = - waitNext(oldChangeFeedStream->mutations.getFuture()); - mutations = oldMutations; ASSERT(mutations.back().version < startState.changeFeedStartVersion); } else { - Standalone> newMutations = - waitNext(changeFeedStream->mutations.getFuture()); - mutations = newMutations; ASSERT(mutations.front().version >= startState.changeFeedStartVersion); } } catch (Error& e) { @@ -1326,11 +1325,15 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, metadata->keyRange.begin.printable(), metadata->keyRange.end.printable(), startState.granuleID.toString(), - metadata->bufferedDeltaVersion.get()); + metadata->bufferedDeltaVersion); } - changeFeedFuture = bwData->db->getChangeFeedStream( - changeFeedStream, cfKey, startState.changeFeedStartVersion, MAX_VERSION, metadata->keyRange); + metadata->activeCFData.set(newChangeFeedData(startState.changeFeedStartVersion - 1)); + changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(), + cfKey, + startState.changeFeedStartVersion, + MAX_VERSION, + metadata->keyRange); } // process mutations @@ -1339,7 +1342,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, // buffer mutations at this version. There should not be multiple MutationsAndVersionRef with the same // version - ASSERT(deltas.version > metadata->bufferedDeltaVersion.get()); + ASSERT(deltas.version > metadata->bufferedDeltaVersion); if (!deltas.mutations.empty()) { if (deltas.mutations.size() == 1 && deltas.mutations.back().param1 == lastEpochEndPrivateKey) { // Note rollbackVerision is durable, [rollbackVersion+1 - deltas.version] needs to be tossed @@ -1386,6 +1389,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, .detail("Version", deltas.version) .detail("RollbackVersion", rollbackVersion); } + Version cfRollbackVersion = doGranuleRollback(metadata, deltas.version, rollbackVersion, @@ -1394,27 +1398,29 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, rollbacksCompleted); // Reset change feeds to cfRollbackVersion + metadata->activeCFData.set(newChangeFeedData(cfRollbackVersion)); if (readOldChangeFeed) { // It shouldn't be possible to roll back across the parent/child feed boundary, // because the transaction creating the child change feed had to commit before we // got here. ASSERT(cfRollbackVersion < startState.changeFeedStartVersion); - oldChangeFeedStream = makeReference(); oldChangeFeedFuture = - bwData->db->getChangeFeedStream(oldChangeFeedStream, + bwData->db->getChangeFeedStream(metadata->activeCFData.get(), oldCFKey.get(), cfRollbackVersion + 1, startState.changeFeedStartVersion, metadata->keyRange); + } else { ASSERT(cfRollbackVersion > startState.changeFeedStartVersion); - changeFeedStream = makeReference(); - changeFeedFuture = bwData->db->getChangeFeedStream(changeFeedStream, + + changeFeedFuture = bwData->db->getChangeFeedStream(metadata->activeCFData.get(), cfKey, cfRollbackVersion + 1, MAX_VERSION, metadata->keyRange); } + justDidRollback = true; break; } @@ -1443,7 +1449,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } // update buffered version and committed version - metadata->bufferedDeltaVersion.set(deltas.version); + metadata->bufferedDeltaVersion = deltas.version; Version knownNoRollbacksPast = std::min(deltas.version, deltas.knownCommittedVersion); if (knownNoRollbacksPast > committedVersion.get()) { // This is the only place it is safe to set committedVersion, as it has to come from the mutation @@ -1622,6 +1628,9 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, if (BW_DEBUG) { printf("BGUF got error %s\n", e.name()); } + // Free last change feed data + metadata->activeCFData.set(Reference()); + if (e.code() == error_code_operation_cancelled) { throw; } @@ -1786,13 +1795,15 @@ ACTOR Future waitForVersion(Reference metadata, Version v metadata->keyRange.end.printable().c_str(), v, metadata->readable.isSet() ? "T" : "F", - metadata->bufferedDeltaVersion.get(), + metadata->activeCFData.get()->getVersion(), metadata->pendingDeltaVersion, metadata->durableDeltaVersion.get(), metadata->pendingSnapshotVersion, metadata->durableSnapshotVersion.get());*/ - if (v <= metadata->bufferedDeltaVersion.get() && + ASSERT(metadata->activeCFData.get().isValid()); + + if (v <= metadata->activeCFData.get()->getVersion() && (v <= metadata->durableDeltaVersion.get() || metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) && (v <= metadata->durableSnapshotVersion.get() || @@ -1801,31 +1812,36 @@ ACTOR Future waitForVersion(Reference metadata, Version v } // wait for change feed version to catch up to ensure we have all data - if (metadata->bufferedDeltaVersion.get() < v) { - wait(metadata->bufferedDeltaVersion.whenAtLeast(v)); + if (metadata->activeCFData.get()->getVersion() < v) { + wait(metadata->activeCFData.get()->whenAtLeast(v)); + ASSERT(metadata->activeCFData.get()->getVersion() >= v); } // wait for any pending delta and snapshot files as of the moment the change feed version caught up. state Version pendingDeltaV = metadata->pendingDeltaVersion; state Version pendingSnapshotV = metadata->pendingSnapshotVersion; - ASSERT(pendingDeltaV <= metadata->bufferedDeltaVersion.get()); - if (pendingDeltaV > metadata->durableDeltaVersion.get()) { + // If there are mutations that are no longer buffered but have not been + // persisted to a delta file that are necessary for the query, wait for them + if (pendingDeltaV > metadata->durableDeltaVersion.get() && v > metadata->durableDeltaVersion.get()) { wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV)); + ASSERT(metadata->durableDeltaVersion.get() >= pendingDeltaV); } // This isn't strictly needed, but if we're in the process of re-snapshotting, we'd likely rather // return that snapshot file than the previous snapshot file and all its delta files. - if (pendingSnapshotV > metadata->durableSnapshotVersion.get()) { + if (pendingSnapshotV > metadata->durableSnapshotVersion.get() && v > metadata->durableSnapshotVersion.get()) { wait(metadata->durableSnapshotVersion.whenAtLeast(pendingSnapshotV)); + ASSERT(metadata->durableSnapshotVersion.get() >= pendingSnapshotV); } // There is a race here - we wait for pending delta files before this to finish, but while we do, we // kick off another delta file and roll the mutations. In that case, we must return the new delta // file instead of in memory mutations, so we wait for that delta file to complete - if (metadata->pendingDeltaVersion != pendingDeltaV) { - wait(metadata->durableDeltaVersion.whenAtLeast(pendingDeltaV + 1)); + if (metadata->pendingDeltaVersion > v) { + wait(metadata->durableDeltaVersion.whenAtLeast(v)); + ASSERT(metadata->durableDeltaVersion.get() >= v); } return Void(); @@ -1939,7 +1955,7 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData // lazily load files for old granule if not present chunkRange = cur->range; - if (cur->files.isError() || !cur->files.isValid()) { + if (!cur->files.isValid() || cur->files.isError()) { cur->files = loadHistoryFiles(bwData, cur->granuleID); } @@ -1971,6 +1987,9 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } else { // this is an active granule query loop { + if (!metadata->activeCFData.get().isValid()) { + throw wrong_shard_server(); + } Future waitForVersionFuture = waitForVersion(metadata, req.readVersion); if (waitForVersionFuture.isReady()) { // didn't wait, so no need to check rollback stuff @@ -1978,19 +1997,15 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData } // rollback resets all of the version information, so we have to redo wait for // version on rollback - state int rollbackCount = metadata->rollbackCount.get(); choose { - when(wait(waitForVersionFuture)) {} - when(wait(metadata->rollbackCount.onChange())) {} + when(wait(waitForVersionFuture)) { break; } + when(wait(metadata->activeCFData.onChange())) {} when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } } - - if (rollbackCount == metadata->rollbackCount.get()) { - break; - } else if (BW_REQUEST_DEBUG) { - fmt::print("[{0} - {1}) @ {2} hit rollback, restarting waitForVersion\n", - req.keyRange.begin.printable(), - req.keyRange.end.printable(), + if (BW_REQUEST_DEBUG && metadata->activeCFData.get().isValid()) { + fmt::print("{0} - {1}) @ {2} hit CF change, restarting waitForVersion\n", + req.keyRange.begin.printable().c_str(), + req.keyRange.end.printable().c_str(), req.readVersion); } } @@ -2091,6 +2106,7 @@ ACTOR Future handleBlobGranuleFileRequest(Reference bwData req.reply.send(rep); --bwData->stats.activeReadRequests; } catch (Error& e) { + // printf("Error in BGFRequest %s\n", e.name()); if (e.code() == error_code_operation_cancelled) { req.reply.sendError(wrong_shard_server()); throw; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 1b79cc8265..27ea0cd3dc 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4654,6 +4654,7 @@ void changeServerKeys(StorageServer* data, } } } + data->keyChangeFeed.coalesce(f.second.contents()); auto feed = data->uidChangeFeed.find(f.first); if (feed != data->uidChangeFeed.end()) { feed->second->removing = true; @@ -4909,6 +4910,7 @@ private: } } } + data->keyChangeFeed.coalesce(feed->second->range.contents()); data->uidChangeFeed.erase(feed); } else { // must be pop or stop