From 330b2b48ec06664bd9a70e2157f0a12db4ff04b7 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 22 Feb 2022 12:00:09 -0800 Subject: [PATCH] improved file cleanup execution and testing --- fdbclient/BlobGranuleCommon.h | 3 +- fdbclient/ServerKnobs.cpp | 1 - fdbclient/ServerKnobs.h | 2 - fdbclient/SystemData.cpp | 10 +- fdbclient/SystemData.h | 5 +- fdbserver/BlobManager.actor.cpp | 292 ++++++------------ fdbserver/BlobWorker.actor.cpp | 4 + .../workloads/BlobGranuleVerifier.actor.cpp | 83 ++++- 8 files changed, 182 insertions(+), 218 deletions(-) diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index 7c44838c18..b33cbabb7b 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -77,13 +77,14 @@ struct BlobGranuleChunkRef { constexpr static FileIdentifier file_identifier = 865198; KeyRangeRef keyRange; Version includedVersion; + Version startVersion; Optional snapshotFile; // not set if it's an incremental read VectorRef deltaFiles; GranuleDeltas newDeltas; template void serialize(Ar& ar) { - serializer(ar, keyRange, includedVersion, snapshotFile, deltaFiles, newDeltas); + serializer(ar, keyRange, includedVersion, startVersion, snapshotFile, deltaFiles, newDeltas); } }; diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 3ed62aa08c..2c1e93f3c0 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -813,7 +813,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards || (randomize && BUGGIFY) ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; } init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); - init( BG_PRUNE_TIMEOUT, 60*60); init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0; init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index cfd7fe4c59..7e41df3756 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -767,8 +767,6 @@ public: int BG_DELTA_FILE_TARGET_BYTES; int BG_DELTA_BYTES_BEFORE_COMPACT; - double BG_PRUNE_TIMEOUT; - double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure double BLOB_WORKER_REQUEST_TIMEOUT; // Blob Worker's server-side request timeout double BLOB_WORKERLIST_FETCH_INTERVAL; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index eb892dff03..3ac30b8dc0 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1128,6 +1128,7 @@ const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), Litera const KeyRangeRef blobGranuleSplitBoundaryKeys(LiteralStringRef("\xff\x02/bgsb/"), LiteralStringRef("\xff\x02/bgsb0")); const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0")); const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0")); +const KeyRangeRef blobGranuleVersionKeys(LiteralStringRef("\xff\x02/bgv/"), LiteralStringRef("\xff\x02/bgv0")); const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange"); const uint8_t BG_FILE_TYPE_DELTA = 'D'; @@ -1182,20 +1183,23 @@ std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(V return std::tuple(filename, offset, length); } -const Value blobGranulePruneValueFor(Version version, bool force) { +const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) { BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << version; + wr << range; wr << force; return wr.toValue(); } -std::pair decodeBlobGranulePruneValue(ValueRef const& value) { +std::tuple decodeBlobGranulePruneValue(ValueRef const& value) { Version version; + KeyRange range; bool force; BinaryReader reader(value, IncludeVersion()); reader >> version; + reader >> range; reader >> force; - return std::pair(version, force); + return std::tuple(version, range, force); } const Value blobGranuleMappingValueFor(UID const& workerID) { diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index c76c2225a6..1a1b024064 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -560,6 +560,7 @@ extern const KeyRangeRef blobGranuleHistoryKeys; // \xff\x02/bgp/(start,end) = (version, force) extern const KeyRangeRef blobGranulePruneKeys; +extern const KeyRangeRef blobGranuleVersionKeys; extern const KeyRef blobGranulePruneChangeKey; const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion); @@ -569,8 +570,8 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID); const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length); std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); -const Value blobGranulePruneValueFor(Version version, bool force); -std::pair decodeBlobGranulePruneValue(ValueRef const& value); +const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force); +std::tuple decodeBlobGranulePruneValue(ValueRef const& value); const Value blobGranuleMappingValueFor(UID const& workerID); UID decodeBlobGranuleMappingValue(ValueRef const& value); diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 687299d207..c0016b1f72 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2290,15 +2290,11 @@ ACTOR Future partiallyDeleteGranule(Reference self, UID g * Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done * processing this prune intent. */ -ACTOR Future pruneRange(Reference self, - KeyRef startKey, - KeyRef endKey, - Version pruneVersion, - bool force) { +ACTOR Future pruneRange(Reference self, KeyRangeRef range, Version pruneVersion, bool force) { if (BM_DEBUG) { fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n", - startKey.printable(), - endKey.printable(), + range.begin.printable(), + range.end.printable(), pruneVersion, force); } @@ -2316,8 +2312,6 @@ ACTOR Future pruneRange(Reference self, state std::unordered_set, boost::hash>> visited; - state KeyRange range(KeyRangeRef(startKey, endKey)); // range for [startKey, endKey) - // find all active granules (that comprise the range) and add to the queue state KeyRangeMap::Ranges activeRanges = self->workerAssignments.intersectingRanges(range); @@ -2335,7 +2329,7 @@ ACTOR Future pruneRange(Reference self, } // assumption: prune boundaries must respect granule boundaries - if (activeRange.begin() < startKey || activeRange.end() > endKey) { + if (activeRange.begin() < range.begin || activeRange.end() > range.end) { continue; } @@ -2388,7 +2382,7 @@ ACTOR Future pruneRange(Reference self, // get the persisted history entry for this granule state Standalone currHistoryNode; - state KeyRef historyKey = blobGranuleHistoryKeyFor(currRange, startVersion); + state Key historyKey = blobGranuleHistoryKeyFor(currRange, startVersion); loop { try { Optional persistedHistory = wait(tr.get(historyKey)); @@ -2497,38 +2491,10 @@ ACTOR Future pruneRange(Reference self, // another pruneIntent that got written for this table while we were processing this one. // If that is the case, we should not clear the key. Otherwise, we can just clear the key. - tr.reset(); - if (BM_DEBUG) { - printf("About to clear prune intent\n"); - } - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - state Key pruneIntentKey = blobGranulePruneKeys.begin.withSuffix(startKey); - state Optional pruneIntentValue = wait(tr.get(pruneIntentKey)); - ASSERT(pruneIntentValue.present()); - - Version currPruneVersion; - bool currForce; - std::tie(currPruneVersion, currForce) = decodeBlobGranulePruneValue(pruneIntentValue.get()); - - if (currPruneVersion == pruneVersion && currForce == force) { - tr.clear(pruneIntentKey.withPrefix(blobGranulePruneKeys.begin)); - wait(tr.commit()); - } - break; - } catch (Error& e) { - fmt::print("Attempt to clear prune intent got error {}\n", e.name()); - wait(tr.onError(e)); - } - } - if (BM_DEBUG) { fmt::print("Successfully pruned range [{0} - {1}) at pruneVersion={2}\n", - startKey.printable(), - endKey.printable(), + range.begin.printable(), + range.end.printable(), pruneVersion); } return Void(); @@ -2556,180 +2522,112 @@ ACTOR Future pruneRange(Reference self, */ ACTOR Future monitorPruneKeys(Reference self) { // setup bstore - try { - if (BM_DEBUG) { - fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); - } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); - if (BM_DEBUG) { - printf("BM constructed backup container\n"); - } - } catch (Error& e) { - if (BM_DEBUG) { - fmt::print("BM got backup container init error {0}\n", e.name()); - } - throw e; + if (BM_DEBUG) { + fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); + } + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + if (BM_DEBUG) { + printf("BM constructed backup container\n"); } - try { - state Value oldPruneWatchVal; + loop { + state Reference tr = makeReference(self->db); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + // Wait for the watch to change, or some time to expire (whichever comes first) + // before checking through the prune intents. We write a UID into the change key value + // so that we can still recognize when the watch key has been changed while we weren't + // monitoring it + + state Key lastPruneKey = blobGranulePruneKeys.begin; + loop { - state Reference tr = makeReference(self->db); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - // Wait for the watch to change, or some time to expire (whichever comes first) - // before checking through the prune intents. We write a UID into the change key value - // so that we can still recognize when the watch key has been changed while we weren't - // monitoring it - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - state Optional newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey)); - - // if the value at the change key has changed, that means there is new work to do - if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) { - oldPruneWatchVal = newPruneWatchVal.get(); - if (BM_DEBUG) { - printf("the blobGranulePruneChangeKey changed\n"); + state std::vector> prunes; + state CoalescedKeyRangeMap> pruneMap; + pruneMap.insert(allKeys, std::make_pair(0, false)); + try { + // TODO: replace 10000 with a knob + state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, 10000)); + if (pruneIntents.size()) { + int rangeIdx = 0; + for (; rangeIdx < pruneIntents.size(); ++rangeIdx) { + Version pruneVersion; + KeyRange range; + bool force; + std::tie(pruneVersion, range, force) = + decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value); + auto ranges = pruneMap.intersectingRanges(range); + bool foundConflict = false; + for (auto it : ranges) { + if ((it.value().second && !force && it.value().first < pruneVersion) || + (!it.value().second && force && pruneVersion < it.value().first)) { + foundConflict = true; + break; + } } - - // TODO: debugging code, remove it - /* - if (newPruneWatchVal.get().toString().substr(0, 6) == "prune=") { - state Reference dummy = - makeReference(self->db); - loop { - try { - dummy->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - dummy->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - std::istringstream iss(newPruneWatchVal.get().toString().substr(6)); - Version version; - iss >> version; - dummy->set(blobGranulePruneKeys.begin.withSuffix(normalKeys.begin), - blobGranulePruneValueFor(version, false)); - wait(dummy->commit()); - break; - - } catch (Error& e) { - wait(dummy->onError(e)); - } - } + if (foundConflict) { + break; + } + pruneMap.insert(range, std::make_pair(pruneVersion, force)); + + fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n", + range.begin.printable(), + range.end.printable(), + pruneVersion, + force ? "T" : "F"); + } + lastPruneKey = pruneIntents[rangeIdx - 1].key; + + for (auto it : pruneMap.ranges()) { + if (it.value().first > 0) { + prunes.emplace_back(pruneRange(self, it.range(), it.value().first, it.value().second)); } - */ - break; } - // otherwise, there are no changes and we should wait until the next change (or timeout) + // wait for this set of prunes to complete before starting the next ones since if we + // prune a range R at version V and while we are doing that, the time expires, we will + // end up trying to prune the same range again since the work isn't finished and the + // prunes will race + // + // TODO: this isn't that efficient though. Instead we could keep metadata as part of the + // BM's memory that tracks which prunes are active. Once done, we can mark that work as + // done. If the BM fails then all prunes will fail and so the next BM will have a clear + // set of metadata (i.e. no work in progress) so we will end up doing the work in the + // new BM + + wait(waitForAll(prunes)); + break; + } else { state Future watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey); wait(tr->commit()); - - if (BM_DEBUG) { - printf("monitorPruneKeys waiting for change or timeout\n"); - } - - choose { - when(wait(watchPruneIntentsChange)) { - if (BM_DEBUG) { - printf("monitorPruneKeys saw a change\n"); - } - tr->reset(); - } - when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) { - if (BM_DEBUG) { - printf("monitorPruneKeys got a timeout\n"); - } - break; - } - } - } catch (Error& e) { - wait(tr->onError(e)); - } - } - - tr->reset(); - - if (BM_DEBUG) { - printf("Looping over prune intents\n"); - } - - // loop through all prune intentions and do prune work accordingly - try { - state KeyRef beginKey = normalKeys.begin; - loop { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - state std::vector> prunes; - try { - // TODO: replace 10000 with a knob - KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end)); - state RangeResult pruneIntents = wait(krmGetRanges( - tr, blobGranulePruneKeys.begin, nextRange, 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); - state Key lastEndKey; - - for (int rangeIdx = 0; rangeIdx < pruneIntents.size() - 1; ++rangeIdx) { - KeyRef rangeStartKey = pruneIntents[rangeIdx].key; - KeyRef rangeEndKey = pruneIntents[rangeIdx + 1].key; - lastEndKey = rangeEndKey; - if (pruneIntents[rangeIdx].value.size() == 0) { - continue; - } - KeyRange range(KeyRangeRef(rangeStartKey, rangeEndKey)); - Version pruneVersion; - bool force; - std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value); - - fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n", - rangeStartKey.printable(), - rangeEndKey.printable(), - pruneVersion, - force ? "T" : "F"); - prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force)); - } - - // wait for this set of prunes to complete before starting the next ones since if we - // prune a range R at version V and while we are doing that, the time expires, we will - // end up trying to prune the same range again since the work isn't finished and the - // prunes will race - // - // TODO: this isn't that efficient though. Instead we could keep metadata as part of the - // BM's memory that tracks which prunes are active. Once done, we can mark that work as - // done. If the BM fails then all prunes will fail and so the next BM will have a clear - // set of metadata (i.e. no work in progress) so we will end up doing the work in the - // new BM - wait(waitForAll(prunes)); - - if (!pruneIntents.more) { - break; - } - - beginKey = lastEndKey; - } catch (Error& e) { - wait(tr->onError(e)); - } + wait(watchPruneIntentsChange); + tr->reset(); } } catch (Error& e) { - if (e.code() == error_code_actor_cancelled) { - throw e; - } - if (BM_DEBUG) { - fmt::print("monitorPruneKeys for BM {0} saw error {1}\n", self->id.toString(), e.name()); - } - // don't want to kill the blob manager for errors around pruning - TraceEvent("MonitorPruneKeysError", self->id).detail("Error", e.name()); - } - if (BM_DEBUG) { - printf("Done pruning current set of prune intents.\n"); + wait(tr->onError(e)); } } - } catch (Error& e) { + + tr->reset(); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->clear(KeyRangeRef(blobGranulePruneKeys.begin, keyAfter(lastPruneKey))); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + if (BM_DEBUG) { - fmt::print("monitorPruneKeys got error {}\n", e.name()); + printf("Done pruning current set of prune intents.\n"); } - throw e; } } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 9f81cff53b..d5f788724f 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -2230,6 +2230,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl state KeyRange chunkRange; state GranuleFiles chunkFiles; + state Version startVer; if (metadata->initialSnapshotVersion > req.readVersion) { // this is a time travel query, find previous granule @@ -2290,6 +2291,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(cur->endVersion > req.readVersion); ASSERT(cur->startVersion <= req.readVersion); + startVer = cur->startVersion; // lazily load files for old granule if not present chunkRange = cur->range; @@ -2324,6 +2326,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion); } else { // this is an active granule query + startVer = metadata->initialSnapshotVersion; loop { if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) { throw wrong_shard_server(); @@ -2379,6 +2382,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl BlobGranuleChunkRef chunk; // TODO change in V2 chunk.includedVersion = req.readVersion; + chunk.startVersion = startVer; chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end)); // handle snapshot files diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index 6767da00ef..7e7042c7cc 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -31,6 +31,7 @@ #include "fdbserver/Knobs.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" +#include "flow/Error.h" #include "flow/IRandom.h" #include "flow/genericactors.actor.h" @@ -328,19 +329,15 @@ struct BlobGranuleVerifierWorkload : TestWorkload { try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - Optional oldPruneIntent = wait(tr->get(blobGranulePruneKeys.begin.withSuffix(range.begin))); - if (oldPruneIntent.present()) { - Version oldPruneVersion; - bool oldForce; - std::tie(oldPruneVersion, oldForce) = decodeBlobGranulePruneValue(oldPruneIntent.get()); - if (oldPruneVersion >= version) { - return Void(); - } - } + Value pruneValue = blobGranulePruneValueFor(version, range, force); - Value pruneValue = blobGranulePruneValueFor(version, force); - wait(krmSetRange(tr, blobGranulePruneKeys.begin, range, pruneValue)); + Key pruneKey = KeyRef(blobGranulePruneKeys.begin.withSuffix(std::string(14, '\x00'))); + int32_t pos = pruneKey.size() - 14; + pos = littleEndian32(pos); + uint8_t* data = mutateString(pruneKey); + memcpy(data + pruneKey.size() - sizeof(int32_t), &pos, sizeof(int32_t)); + tr->atomicOp(pruneKey, pruneValue, MutationRef::SetVersionstampedKey); tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString()); wait(tr->commit()); if (BGV_DEBUG) { @@ -363,11 +360,53 @@ struct BlobGranuleVerifierWorkload : TestWorkload { } } + ACTOR Future killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) { + state Transaction tr(cx); + state std::set knownWorkers; + state bool first = true; + loop { + try { + RangeResult r = wait(tr.getRange(blobWorkerListKeys, CLIENT_KNOBS->TOO_MANY)); + + state std::vector haltIds; + state std::vector>> haltRequests; + for (auto& it : r) { + BlobWorkerInterface interf = decodeBlobWorkerListValue(it.value); + if (first) { + knownWorkers.insert(interf.id()); + } + if (knownWorkers.count(interf.id())) { + haltIds.push_back(interf.id()); + haltRequests.push_back(interf.haltBlobWorker.tryGetReply(HaltBlobWorkerRequest(1e6, UID()))); + } + } + first = false; + wait(waitForAll(haltRequests)); + bool allPresent = true; + for (int i = 0; i < haltRequests.size(); i++) { + if (haltRequests[i].get().present()) { + knownWorkers.erase(haltIds[i]); + } else { + allPresent = false; + } + } + if (allPresent) { + return Void(); + } else { + wait(delay(1.0)); + } + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + ACTOR Future verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { state double last = now(); state double endTime = last + self->testDuration; state std::map timeTravelChecks; state int64_t timeTravelChecksMemory = 0; + state Version pruneVersion = 1; TraceEvent("BlobGranuleVerifierStart"); if (BGV_DEBUG) { @@ -392,12 +431,32 @@ struct BlobGranuleVerifierWorkload : TestWorkload { // advance iterator before doing read, so if it gets error we don't retry it try { - // TODO: before reading, prune at some version [0, readVersion) + state Version newPruneVersion = deterministicRandom()->randomInt64(1, oldRead.v); + pruneVersion = std::max(pruneVersion, newPruneVersion); + wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false)); std::pair>> reReadResult = wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); self->timeTravelReads++; + wait(self->killBlobWorkers(cx, self)); + try { + std::pair>> versionRead = + wait(self->readFromBlob(cx, self, oldRead.range, pruneVersion)); + Version minStartVer = newPruneVersion; + for (auto& it : versionRead.second) { + minStartVer = std::min(minStartVer, it.startVersion); + } + std::pair>> versionRead = + wait(self->readFromBlob(cx, self, oldRead.range, minStartVer - 1)); + ASSERT(false); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + ASSERT(e.code() == error_code_blob_granule_transaction_too_old); + } + // TODO: read at some version older than pruneVersion and make sure you get txn_too_old // To achieve this, the BWs are going to have to recognize latest prune versions per granules } catch (Error& e) {