From 6f1fb719494f17b353e9a12d9651c9fca0bce5db Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 1 Mar 2022 10:13:20 -0800 Subject: [PATCH] Fixed a number of different pruning problems --- fdbclient/BlobGranuleCommon.h | 4 +- fdbserver/BlobManager.actor.cpp | 2 +- fdbserver/BlobWorker.actor.cpp | 5 +- .../workloads/BlobGranuleVerifier.actor.cpp | 86 +++++++++++++------ 4 files changed, 63 insertions(+), 34 deletions(-) diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index b33cbabb7b..4559f75819 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -77,14 +77,14 @@ struct BlobGranuleChunkRef { constexpr static FileIdentifier file_identifier = 865198; KeyRangeRef keyRange; Version includedVersion; - Version startVersion; + Version snapshotVersion; Optional snapshotFile; // not set if it's an incremental read VectorRef deltaFiles; GranuleDeltas newDeltas; template void serialize(Ar& ar) { - serializer(ar, keyRange, includedVersion, startVersion, snapshotFile, deltaFiles, newDeltas); + serializer(ar, keyRange, includedVersion, snapshotVersion, snapshotFile, deltaFiles, newDeltas); } }; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index a55118ec73..0ad0aaf53e 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2450,7 +2450,7 @@ ACTOR Future monitorPruneKeys(Reference self) { pruneMap.insert(allKeys, std::make_pair(0, false)); try { // TODO: replace 10000 with a knob - state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, 10000)); + state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, BUGGIFY ? 1 : 10000)); if (pruneIntents.size()) { int rangeIdx = 0; for (; rangeIdx < pruneIntents.size(); ++rangeIdx) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index c51e1c5bbd..a005bca190 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -2256,7 +2256,6 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl state KeyRange chunkRange; state GranuleFiles chunkFiles; - state Version startVer; if (metadata->initialSnapshotVersion > req.readVersion) { // this is a time travel query, find previous granule @@ -2317,7 +2316,6 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(cur->endVersion > req.readVersion); ASSERT(cur->startVersion <= req.readVersion); - startVer = cur->startVersion; // lazily load files for old granule if not present chunkRange = cur->range; @@ -2352,7 +2350,6 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion); } else { // this is an active granule query - startVer = metadata->initialSnapshotVersion; loop { if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) { throw wrong_shard_server(); @@ -2408,7 +2405,6 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl BlobGranuleChunkRef chunk; // TODO change in V2 chunk.includedVersion = req.readVersion; - chunk.startVersion = startVer; chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end)); // handle snapshot files @@ -2434,6 +2430,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i]; chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length); Version snapshotVersion = chunkFiles.snapshotFiles[i].version; + chunk.snapshotVersion = snapshotVersion; // handle delta files // cast this to an int so i going to -1 still compares properly diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index aaef5a468f..bbd58cd8cc 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -327,28 +327,28 @@ struct BlobGranuleVerifierWorkload : TestWorkload { // utility to prune at pruneVersion= with the flag ACTOR Future pruneAtVersion(Database cx, KeyRange range, Version version, bool force) { state Reference tr = makeReference(cx); + state Version commitVersion = 0; + state Key pruneKey; loop { try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); Value pruneValue = blobGranulePruneValueFor(version, range, force); - - Key pruneKey = KeyRef(blobGranulePruneKeys.begin.withSuffix(std::string(14, '\x00'))); - int32_t pos = pruneKey.size() - 14; - pos = littleEndian32(pos); - uint8_t* data = mutateString(pruneKey); - memcpy(data + pruneKey.size() - sizeof(int32_t), &pos, sizeof(int32_t)); - tr->atomicOp(pruneKey, pruneValue, MutationRef::SetVersionstampedKey); + tr->atomicOp( + addVersionStampAtEnd(blobGranulePruneKeys.begin), pruneValue, MutationRef::SetVersionstampedKey); tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString()); + state Future> fTrVs = tr->getVersionstamp(); wait(tr->commit()); + Standalone vs = wait(fTrVs); + pruneKey = blobGranulePruneKeys.begin.withSuffix(vs); if (BGV_DEBUG) { printf("pruneAtVersion for range [%s-%s) at version %lld succeeded\n", range.begin.printable().c_str(), range.end.printable().c_str(), version); } - return Void(); + break; } catch (Error& e) { if (BGV_DEBUG) { printf("pruneAtVersion for range [%s-%s) at version %lld encountered error %s\n", @@ -360,6 +360,23 @@ struct BlobGranuleVerifierWorkload : TestWorkload { wait(tr->onError(e)); } } + tr->reset(); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + Optional pruneVal = wait(tr->get(pruneKey)); + if (!pruneVal.present()) { + return Void(); + } + state Future watchFuture = tr->watch(pruneKey); + wait(tr->commit()); + wait(watchFuture); + } catch (Error& e) { + wait(tr->onError(e)); + } + } } ACTOR Future killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) { @@ -403,11 +420,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload { } } - ACTOR Future verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { + ACTOR Future verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPruning) { state double last = now(); state double endTime = last + self->testDuration; state std::map timeTravelChecks; state int64_t timeTravelChecksMemory = 0; + state Version prevPruneVersion = -1; + state UID dbgId = debugRandom()->randomUniqueID(); TraceEvent("BlobGranuleVerifierStart"); if (BGV_DEBUG) { @@ -429,33 +448,44 @@ struct BlobGranuleVerifierWorkload : TestWorkload { state OldRead oldRead = timeTravelIt->second; timeTravelChecksMemory -= oldRead.oldResult.expectedSize(); timeTravelIt = timeTravelChecks.erase(timeTravelIt); + if (prevPruneVersion == -1) { + prevPruneVersion = oldRead.v; + } // advance iterator before doing read, so if it gets error we don't retry it try { - state Version newPruneVersion = deterministicRandom()->randomInt64(1, oldRead.v); - dbgPruneVersion = std::max(dbgPruneVersion, newPruneVersion); - wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false)); + state Version newPruneVersion = 0; + state bool doPruning = + allowPruning && prevPruneVersion < oldRead.v && deterministicRandom()->random01() < 0.5; + if (doPruning) { + newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, oldRead.v); + prevPruneVersion = std::max(prevPruneVersion, newPruneVersion); + dbgPruneVersion = prevPruneVersion; + wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false)); + } std::pair>> reReadResult = wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); self->timeTravelReads++; - wait(self->killBlobWorkers(cx, self)); - try { + if (doPruning) { + wait(self->killBlobWorkers(cx, self)); std::pair>> versionRead = - wait(self->readFromBlob(cx, self, oldRead.range, dbgPruneVersion)); - Version minStartVer = newPruneVersion; - for (auto& it : versionRead.second) { - minStartVer = std::min(minStartVer, it.startVersion); + wait(self->readFromBlob(cx, self, oldRead.range, prevPruneVersion)); + try { + Version minSnapshotVersion = newPruneVersion; + for (auto& it : versionRead.second) { + minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion); + } + std::pair>> versionRead = + wait(self->readFromBlob(cx, self, oldRead.range, minSnapshotVersion - 1)); + ASSERT(false); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + ASSERT(e.code() == error_code_blob_granule_transaction_too_old); } - std::pair>> versionRead = - wait(self->readFromBlob(cx, self, oldRead.range, minStartVer - 1)); - ASSERT(false); - } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) { - throw; - } - ASSERT(e.code() == error_code_blob_granule_transaction_too_old); } // TODO: read at some version older than pruneVersion and make sure you get txn_too_old @@ -508,7 +538,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload { clients.push_back(timeout(findGranules(cx, this), testDuration, Void())); for (int i = 0; i < threads; i++) { clients.push_back( - timeout(reportErrors(verifyGranules(cx, this), "BlobGranuleVerifier"), testDuration, Void())); + timeout(reportErrors(verifyGranules(cx, this, clientId == 0 && i == 0), "BlobGranuleVerifier"), + testDuration, + Void())); } return delay(testDuration); }