improved file cleanup execution and testing

This commit is contained in:
Evan Tschannen 2022-02-22 12:00:09 -08:00
parent e75ab5f3a0
commit 330b2b48ec
8 changed files with 182 additions and 218 deletions

View File

@ -77,13 +77,14 @@ struct BlobGranuleChunkRef {
constexpr static FileIdentifier file_identifier = 865198; constexpr static FileIdentifier file_identifier = 865198;
KeyRangeRef keyRange; KeyRangeRef keyRange;
Version includedVersion; Version includedVersion;
Version startVersion;
Optional<BlobFilePointerRef> snapshotFile; // not set if it's an incremental read Optional<BlobFilePointerRef> snapshotFile; // not set if it's an incremental read
VectorRef<BlobFilePointerRef> deltaFiles; VectorRef<BlobFilePointerRef> deltaFiles;
GranuleDeltas newDeltas; GranuleDeltas newDeltas;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, keyRange, includedVersion, snapshotFile, deltaFiles, newDeltas); serializer(ar, keyRange, includedVersion, startVersion, snapshotFile, deltaFiles, newDeltas);
} }
}; };

View File

@ -813,7 +813,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards || (randomize && BUGGIFY) ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; } init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards || (randomize && BUGGIFY) ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; }
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
init( BG_PRUNE_TIMEOUT, 60*60);
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0; init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0; init( BLOB_WORKER_REQUEST_TIMEOUT, 5.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_REQUEST_TIMEOUT = 1.0;

View File

@ -767,8 +767,6 @@ public:
int BG_DELTA_FILE_TARGET_BYTES; int BG_DELTA_FILE_TARGET_BYTES;
int BG_DELTA_BYTES_BEFORE_COMPACT; int BG_DELTA_BYTES_BEFORE_COMPACT;
double BG_PRUNE_TIMEOUT;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure
double BLOB_WORKER_REQUEST_TIMEOUT; // Blob Worker's server-side request timeout double BLOB_WORKER_REQUEST_TIMEOUT; // Blob Worker's server-side request timeout
double BLOB_WORKERLIST_FETCH_INTERVAL; double BLOB_WORKERLIST_FETCH_INTERVAL;

View File

@ -1128,6 +1128,7 @@ const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), Litera
const KeyRangeRef blobGranuleSplitBoundaryKeys(LiteralStringRef("\xff\x02/bgsb/"), LiteralStringRef("\xff\x02/bgsb0")); const KeyRangeRef blobGranuleSplitBoundaryKeys(LiteralStringRef("\xff\x02/bgsb/"), LiteralStringRef("\xff\x02/bgsb0"));
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0")); const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0")); const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
const KeyRangeRef blobGranuleVersionKeys(LiteralStringRef("\xff\x02/bgv/"), LiteralStringRef("\xff\x02/bgv0"));
const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange"); const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange");
const uint8_t BG_FILE_TYPE_DELTA = 'D'; const uint8_t BG_FILE_TYPE_DELTA = 'D';
@ -1182,20 +1183,23 @@ std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(V
return std::tuple(filename, offset, length); return std::tuple(filename, offset, length);
} }
const Value blobGranulePruneValueFor(Version version, bool force) { const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << version; wr << version;
wr << range;
wr << force; wr << force;
return wr.toValue(); return wr.toValue();
} }
std::pair<Version, bool> decodeBlobGranulePruneValue(ValueRef const& value) { std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value) {
Version version; Version version;
KeyRange range;
bool force; bool force;
BinaryReader reader(value, IncludeVersion()); BinaryReader reader(value, IncludeVersion());
reader >> version; reader >> version;
reader >> range;
reader >> force; reader >> force;
return std::pair(version, force); return std::tuple(version, range, force);
} }
const Value blobGranuleMappingValueFor(UID const& workerID) { const Value blobGranuleMappingValueFor(UID const& workerID) {

View File

@ -560,6 +560,7 @@ extern const KeyRangeRef blobGranuleHistoryKeys;
// \xff\x02/bgp/(start,end) = (version, force) // \xff\x02/bgp/(start,end) = (version, force)
extern const KeyRangeRef blobGranulePruneKeys; extern const KeyRangeRef blobGranulePruneKeys;
extern const KeyRangeRef blobGranuleVersionKeys;
extern const KeyRef blobGranulePruneChangeKey; extern const KeyRef blobGranulePruneChangeKey;
const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion); const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion);
@ -569,8 +570,8 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length); const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length);
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePruneValueFor(Version version, bool force); const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force);
std::pair<Version, bool> decodeBlobGranulePruneValue(ValueRef const& value); std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value);
const Value blobGranuleMappingValueFor(UID const& workerID); const Value blobGranuleMappingValueFor(UID const& workerID);
UID decodeBlobGranuleMappingValue(ValueRef const& value); UID decodeBlobGranuleMappingValue(ValueRef const& value);

View File

@ -2290,15 +2290,11 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
* Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done * Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done
* processing this prune intent. * processing this prune intent.
*/ */
ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range, Version pruneVersion, bool force) {
KeyRef startKey,
KeyRef endKey,
Version pruneVersion,
bool force) {
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n", fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n",
startKey.printable(), range.begin.printable(),
endKey.printable(), range.end.printable(),
pruneVersion, pruneVersion,
force); force);
} }
@ -2316,8 +2312,6 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>> state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>>
visited; visited;
state KeyRange range(KeyRangeRef(startKey, endKey)); // range for [startKey, endKey)
// find all active granules (that comprise the range) and add to the queue // find all active granules (that comprise the range) and add to the queue
state KeyRangeMap<UID>::Ranges activeRanges = self->workerAssignments.intersectingRanges(range); state KeyRangeMap<UID>::Ranges activeRanges = self->workerAssignments.intersectingRanges(range);
@ -2335,7 +2329,7 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
} }
// assumption: prune boundaries must respect granule boundaries // assumption: prune boundaries must respect granule boundaries
if (activeRange.begin() < startKey || activeRange.end() > endKey) { if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
continue; continue;
} }
@ -2388,7 +2382,7 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
// get the persisted history entry for this granule // get the persisted history entry for this granule
state Standalone<BlobGranuleHistoryValue> currHistoryNode; state Standalone<BlobGranuleHistoryValue> currHistoryNode;
state KeyRef historyKey = blobGranuleHistoryKeyFor(currRange, startVersion); state Key historyKey = blobGranuleHistoryKeyFor(currRange, startVersion);
loop { loop {
try { try {
Optional<Value> persistedHistory = wait(tr.get(historyKey)); Optional<Value> persistedHistory = wait(tr.get(historyKey));
@ -2497,38 +2491,10 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
// another pruneIntent that got written for this table while we were processing this one. // another pruneIntent that got written for this table while we were processing this one.
// If that is the case, we should not clear the key. Otherwise, we can just clear the key. // If that is the case, we should not clear the key. Otherwise, we can just clear the key.
tr.reset();
if (BM_DEBUG) {
printf("About to clear prune intent\n");
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Key pruneIntentKey = blobGranulePruneKeys.begin.withSuffix(startKey);
state Optional<Value> pruneIntentValue = wait(tr.get(pruneIntentKey));
ASSERT(pruneIntentValue.present());
Version currPruneVersion;
bool currForce;
std::tie(currPruneVersion, currForce) = decodeBlobGranulePruneValue(pruneIntentValue.get());
if (currPruneVersion == pruneVersion && currForce == force) {
tr.clear(pruneIntentKey.withPrefix(blobGranulePruneKeys.begin));
wait(tr.commit());
}
break;
} catch (Error& e) {
fmt::print("Attempt to clear prune intent got error {}\n", e.name());
wait(tr.onError(e));
}
}
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("Successfully pruned range [{0} - {1}) at pruneVersion={2}\n", fmt::print("Successfully pruned range [{0} - {1}) at pruneVersion={2}\n",
startKey.printable(), range.begin.printable(),
endKey.printable(), range.end.printable(),
pruneVersion); pruneVersion);
} }
return Void(); return Void();
@ -2556,180 +2522,112 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self,
*/ */
ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) { ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
// setup bstore // setup bstore
try { if (BM_DEBUG) {
if (BM_DEBUG) { fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str());
fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); }
} self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); if (BM_DEBUG) {
if (BM_DEBUG) { printf("BM constructed backup container\n");
printf("BM constructed backup container\n");
}
} catch (Error& e) {
if (BM_DEBUG) {
fmt::print("BM got backup container init error {0}\n", e.name());
}
throw e;
} }
try { loop {
state Value oldPruneWatchVal; state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first)
// before checking through the prune intents. We write a UID into the change key value
// so that we can still recognize when the watch key has been changed while we weren't
// monitoring it
state Key lastPruneKey = blobGranulePruneKeys.begin;
loop { loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first) state std::vector<Future<Void>> prunes;
// before checking through the prune intents. We write a UID into the change key value state CoalescedKeyRangeMap<std::pair<Version, bool>> pruneMap;
// so that we can still recognize when the watch key has been changed while we weren't pruneMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
// monitoring it try {
loop { // TODO: replace 10000 with a knob
try { state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, 10000));
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); if (pruneIntents.size()) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); int rangeIdx = 0;
for (; rangeIdx < pruneIntents.size(); ++rangeIdx) {
state Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey)); Version pruneVersion;
KeyRange range;
// if the value at the change key has changed, that means there is new work to do bool force;
if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) { std::tie(pruneVersion, range, force) =
oldPruneWatchVal = newPruneWatchVal.get(); decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
if (BM_DEBUG) { auto ranges = pruneMap.intersectingRanges(range);
printf("the blobGranulePruneChangeKey changed\n"); bool foundConflict = false;
for (auto it : ranges) {
if ((it.value().second && !force && it.value().first < pruneVersion) ||
(!it.value().second && force && pruneVersion < it.value().first)) {
foundConflict = true;
break;
}
} }
if (foundConflict) {
// TODO: debugging code, remove it break;
/* }
if (newPruneWatchVal.get().toString().substr(0, 6) == "prune=") { pruneMap.insert(range, std::make_pair(pruneVersion, force));
state Reference<ReadYourWritesTransaction> dummy =
makeReference<ReadYourWritesTransaction>(self->db); fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n",
loop { range.begin.printable(),
try { range.end.printable(),
dummy->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); pruneVersion,
dummy->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); force ? "T" : "F");
std::istringstream iss(newPruneWatchVal.get().toString().substr(6)); }
Version version; lastPruneKey = pruneIntents[rangeIdx - 1].key;
iss >> version;
dummy->set(blobGranulePruneKeys.begin.withSuffix(normalKeys.begin), for (auto it : pruneMap.ranges()) {
blobGranulePruneValueFor(version, false)); if (it.value().first > 0) {
wait(dummy->commit()); prunes.emplace_back(pruneRange(self, it.range(), it.value().first, it.value().second));
break;
} catch (Error& e) {
wait(dummy->onError(e));
}
}
} }
*/
break;
} }
// otherwise, there are no changes and we should wait until the next change (or timeout) // wait for this set of prunes to complete before starting the next ones since if we
// prune a range R at version V and while we are doing that, the time expires, we will
// end up trying to prune the same range again since the work isn't finished and the
// prunes will race
//
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
// done. If the BM fails then all prunes will fail and so the next BM will have a clear
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
// new BM
wait(waitForAll(prunes));
break;
} else {
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey); state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
wait(tr->commit()); wait(tr->commit());
wait(watchPruneIntentsChange);
if (BM_DEBUG) { tr->reset();
printf("monitorPruneKeys waiting for change or timeout\n");
}
choose {
when(wait(watchPruneIntentsChange)) {
if (BM_DEBUG) {
printf("monitorPruneKeys saw a change\n");
}
tr->reset();
}
when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) {
if (BM_DEBUG) {
printf("monitorPruneKeys got a timeout\n");
}
break;
}
}
} catch (Error& e) {
wait(tr->onError(e));
}
}
tr->reset();
if (BM_DEBUG) {
printf("Looping over prune intents\n");
}
// loop through all prune intentions and do prune work accordingly
try {
state KeyRef beginKey = normalKeys.begin;
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state std::vector<Future<Void>> prunes;
try {
// TODO: replace 10000 with a knob
KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end));
state RangeResult pruneIntents = wait(krmGetRanges(
tr, blobGranulePruneKeys.begin, nextRange, 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
state Key lastEndKey;
for (int rangeIdx = 0; rangeIdx < pruneIntents.size() - 1; ++rangeIdx) {
KeyRef rangeStartKey = pruneIntents[rangeIdx].key;
KeyRef rangeEndKey = pruneIntents[rangeIdx + 1].key;
lastEndKey = rangeEndKey;
if (pruneIntents[rangeIdx].value.size() == 0) {
continue;
}
KeyRange range(KeyRangeRef(rangeStartKey, rangeEndKey));
Version pruneVersion;
bool force;
std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n",
rangeStartKey.printable(),
rangeEndKey.printable(),
pruneVersion,
force ? "T" : "F");
prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force));
}
// wait for this set of prunes to complete before starting the next ones since if we
// prune a range R at version V and while we are doing that, the time expires, we will
// end up trying to prune the same range again since the work isn't finished and the
// prunes will race
//
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
// done. If the BM fails then all prunes will fail and so the next BM will have a clear
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
// new BM
wait(waitForAll(prunes));
if (!pruneIntents.more) {
break;
}
beginKey = lastEndKey;
} catch (Error& e) {
wait(tr->onError(e));
}
} }
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_actor_cancelled) { wait(tr->onError(e));
throw e;
}
if (BM_DEBUG) {
fmt::print("monitorPruneKeys for BM {0} saw error {1}\n", self->id.toString(), e.name());
}
// don't want to kill the blob manager for errors around pruning
TraceEvent("MonitorPruneKeysError", self->id).detail("Error", e.name());
}
if (BM_DEBUG) {
printf("Done pruning current set of prune intents.\n");
} }
} }
} catch (Error& e) {
tr->reset();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->clear(KeyRangeRef(blobGranulePruneKeys.begin, keyAfter(lastPruneKey)));
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("monitorPruneKeys got error {}\n", e.name()); printf("Done pruning current set of prune intents.\n");
} }
throw e;
} }
} }

View File

@ -2230,6 +2230,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
state KeyRange chunkRange; state KeyRange chunkRange;
state GranuleFiles chunkFiles; state GranuleFiles chunkFiles;
state Version startVer;
if (metadata->initialSnapshotVersion > req.readVersion) { if (metadata->initialSnapshotVersion > req.readVersion) {
// this is a time travel query, find previous granule // this is a time travel query, find previous granule
@ -2290,6 +2291,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
ASSERT(cur->endVersion > req.readVersion); ASSERT(cur->endVersion > req.readVersion);
ASSERT(cur->startVersion <= req.readVersion); ASSERT(cur->startVersion <= req.readVersion);
startVer = cur->startVersion;
// lazily load files for old granule if not present // lazily load files for old granule if not present
chunkRange = cur->range; chunkRange = cur->range;
@ -2324,6 +2326,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion); ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion);
} else { } else {
// this is an active granule query // this is an active granule query
startVer = metadata->initialSnapshotVersion;
loop { loop {
if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) { if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) {
throw wrong_shard_server(); throw wrong_shard_server();
@ -2379,6 +2382,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
BlobGranuleChunkRef chunk; BlobGranuleChunkRef chunk;
// TODO change in V2 // TODO change in V2
chunk.includedVersion = req.readVersion; chunk.includedVersion = req.readVersion;
chunk.startVersion = startVer;
chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end)); chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end));
// handle snapshot files // handle snapshot files

View File

@ -31,6 +31,7 @@
#include "fdbserver/Knobs.h" #include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h" #include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/workloads/workloads.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h" #include "flow/IRandom.h"
#include "flow/genericactors.actor.h" #include "flow/genericactors.actor.h"
@ -328,19 +329,15 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
try { try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> oldPruneIntent = wait(tr->get(blobGranulePruneKeys.begin.withSuffix(range.begin)));
if (oldPruneIntent.present()) { Value pruneValue = blobGranulePruneValueFor(version, range, force);
Version oldPruneVersion;
bool oldForce;
std::tie(oldPruneVersion, oldForce) = decodeBlobGranulePruneValue(oldPruneIntent.get());
if (oldPruneVersion >= version) {
return Void();
}
}
Value pruneValue = blobGranulePruneValueFor(version, force); Key pruneKey = KeyRef(blobGranulePruneKeys.begin.withSuffix(std::string(14, '\x00')));
wait(krmSetRange(tr, blobGranulePruneKeys.begin, range, pruneValue)); int32_t pos = pruneKey.size() - 14;
pos = littleEndian32(pos);
uint8_t* data = mutateString(pruneKey);
memcpy(data + pruneKey.size() - sizeof(int32_t), &pos, sizeof(int32_t));
tr->atomicOp(pruneKey, pruneValue, MutationRef::SetVersionstampedKey);
tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString()); tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(tr->commit()); wait(tr->commit());
if (BGV_DEBUG) { if (BGV_DEBUG) {
@ -363,11 +360,53 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
} }
} }
ACTOR Future<Void> killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) {
state Transaction tr(cx);
state std::set<UID> knownWorkers;
state bool first = true;
loop {
try {
RangeResult r = wait(tr.getRange(blobWorkerListKeys, CLIENT_KNOBS->TOO_MANY));
state std::vector<UID> haltIds;
state std::vector<Future<ErrorOr<Void>>> haltRequests;
for (auto& it : r) {
BlobWorkerInterface interf = decodeBlobWorkerListValue(it.value);
if (first) {
knownWorkers.insert(interf.id());
}
if (knownWorkers.count(interf.id())) {
haltIds.push_back(interf.id());
haltRequests.push_back(interf.haltBlobWorker.tryGetReply(HaltBlobWorkerRequest(1e6, UID())));
}
}
first = false;
wait(waitForAll(haltRequests));
bool allPresent = true;
for (int i = 0; i < haltRequests.size(); i++) {
if (haltRequests[i].get().present()) {
knownWorkers.erase(haltIds[i]);
} else {
allPresent = false;
}
}
if (allPresent) {
return Void();
} else {
wait(delay(1.0));
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) {
state double last = now(); state double last = now();
state double endTime = last + self->testDuration; state double endTime = last + self->testDuration;
state std::map<double, OldRead> timeTravelChecks; state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0; state int64_t timeTravelChecksMemory = 0;
state Version pruneVersion = 1;
TraceEvent("BlobGranuleVerifierStart"); TraceEvent("BlobGranuleVerifierStart");
if (BGV_DEBUG) { if (BGV_DEBUG) {
@ -392,12 +431,32 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// advance iterator before doing read, so if it gets error we don't retry it // advance iterator before doing read, so if it gets error we don't retry it
try { try {
// TODO: before reading, prune at some version [0, readVersion) state Version newPruneVersion = deterministicRandom()->randomInt64(1, oldRead.v);
pruneVersion = std::max(pruneVersion, newPruneVersion);
wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult = std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
self->timeTravelReads++; self->timeTravelReads++;
wait(self->killBlobWorkers(cx, self));
try {
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, pruneVersion));
Version minStartVer = newPruneVersion;
for (auto& it : versionRead.second) {
minStartVer = std::min(minStartVer, it.startVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, minStartVer - 1));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
}
// TODO: read at some version older than pruneVersion and make sure you get txn_too_old // TODO: read at some version older than pruneVersion and make sure you get txn_too_old
// To achieve this, the BWs are going to have to recognize latest prune versions per granules // To achieve this, the BWs are going to have to recognize latest prune versions per granules
} catch (Error& e) { } catch (Error& e) {