Fixed a number of different pruning problems

This commit is contained in:
Evan Tschannen 2022-03-01 10:13:20 -08:00
parent 8f8987ad67
commit 6f1fb71949
4 changed files with 63 additions and 34 deletions

View File

@ -77,14 +77,14 @@ struct BlobGranuleChunkRef {
constexpr static FileIdentifier file_identifier = 865198;
KeyRangeRef keyRange;
Version includedVersion;
Version startVersion;
Version snapshotVersion;
Optional<BlobFilePointerRef> snapshotFile; // not set if it's an incremental read
VectorRef<BlobFilePointerRef> deltaFiles;
GranuleDeltas newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, includedVersion, startVersion, snapshotFile, deltaFiles, newDeltas);
serializer(ar, keyRange, includedVersion, snapshotVersion, snapshotFile, deltaFiles, newDeltas);
}
};

View File

@ -2450,7 +2450,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
pruneMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
try {
// TODO: replace 10000 with a knob
state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, 10000));
state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, BUGGIFY ? 1 : 10000));
if (pruneIntents.size()) {
int rangeIdx = 0;
for (; rangeIdx < pruneIntents.size(); ++rangeIdx) {

View File

@ -2256,7 +2256,6 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
state KeyRange chunkRange;
state GranuleFiles chunkFiles;
state Version startVer;
if (metadata->initialSnapshotVersion > req.readVersion) {
// this is a time travel query, find previous granule
@ -2317,7 +2316,6 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
ASSERT(cur->endVersion > req.readVersion);
ASSERT(cur->startVersion <= req.readVersion);
startVer = cur->startVersion;
// lazily load files for old granule if not present
chunkRange = cur->range;
@ -2352,7 +2350,6 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
ASSERT(chunkFiles.snapshotFiles.front().version <= req.readVersion);
} else {
// this is an active granule query
startVer = metadata->initialSnapshotVersion;
loop {
if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) {
throw wrong_shard_server();
@ -2408,7 +2405,6 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
BlobGranuleChunkRef chunk;
// TODO change in V2
chunk.includedVersion = req.readVersion;
chunk.startVersion = startVer;
chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end));
// handle snapshot files
@ -2434,6 +2430,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i];
chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length);
Version snapshotVersion = chunkFiles.snapshotFiles[i].version;
chunk.snapshotVersion = snapshotVersion;
// handle delta files
// cast this to an int so i going to -1 still compares properly

View File

@ -327,28 +327,28 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// utility to prune <range> at pruneVersion=<version> with the <force> flag
ACTOR Future<Void> pruneAtVersion(Database cx, KeyRange range, Version version, bool force) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state Version commitVersion = 0;
state Key pruneKey;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Value pruneValue = blobGranulePruneValueFor(version, range, force);
Key pruneKey = KeyRef(blobGranulePruneKeys.begin.withSuffix(std::string(14, '\x00')));
int32_t pos = pruneKey.size() - 14;
pos = littleEndian32(pos);
uint8_t* data = mutateString(pruneKey);
memcpy(data + pruneKey.size() - sizeof(int32_t), &pos, sizeof(int32_t));
tr->atomicOp(pruneKey, pruneValue, MutationRef::SetVersionstampedKey);
tr->atomicOp(
addVersionStampAtEnd(blobGranulePruneKeys.begin), pruneValue, MutationRef::SetVersionstampedKey);
tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString());
state Future<Standalone<StringRef>> fTrVs = tr->getVersionstamp();
wait(tr->commit());
Standalone<StringRef> vs = wait(fTrVs);
pruneKey = blobGranulePruneKeys.begin.withSuffix(vs);
if (BGV_DEBUG) {
printf("pruneAtVersion for range [%s-%s) at version %lld succeeded\n",
range.begin.printable().c_str(),
range.end.printable().c_str(),
version);
}
return Void();
break;
} catch (Error& e) {
if (BGV_DEBUG) {
printf("pruneAtVersion for range [%s-%s) at version %lld encountered error %s\n",
@ -360,6 +360,23 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
wait(tr->onError(e));
}
}
tr->reset();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> pruneVal = wait(tr->get(pruneKey));
if (!pruneVal.present()) {
return Void();
}
state Future<Void> watchFuture = tr->watch(pruneKey);
wait(tr->commit());
wait(watchFuture);
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) {
@ -403,11 +420,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
}
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) {
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPruning) {
state double last = now();
state double endTime = last + self->testDuration;
state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0;
state Version prevPruneVersion = -1;
state UID dbgId = debugRandom()->randomUniqueID();
TraceEvent("BlobGranuleVerifierStart");
if (BGV_DEBUG) {
@ -429,33 +448,44 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state OldRead oldRead = timeTravelIt->second;
timeTravelChecksMemory -= oldRead.oldResult.expectedSize();
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
if (prevPruneVersion == -1) {
prevPruneVersion = oldRead.v;
}
// advance iterator before doing read, so if it gets error we don't retry it
try {
state Version newPruneVersion = deterministicRandom()->randomInt64(1, oldRead.v);
dbgPruneVersion = std::max(dbgPruneVersion, newPruneVersion);
wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false));
state Version newPruneVersion = 0;
state bool doPruning =
allowPruning && prevPruneVersion < oldRead.v && deterministicRandom()->random01() < 0.5;
if (doPruning) {
newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, oldRead.v);
prevPruneVersion = std::max(prevPruneVersion, newPruneVersion);
dbgPruneVersion = prevPruneVersion;
wait(self->pruneAtVersion(cx, oldRead.range, newPruneVersion, false));
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);
self->timeTravelReads++;
wait(self->killBlobWorkers(cx, self));
try {
if (doPruning) {
wait(self->killBlobWorkers(cx, self));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, dbgPruneVersion));
Version minStartVer = newPruneVersion;
for (auto& it : versionRead.second) {
minStartVer = std::min(minStartVer, it.startVersion);
wait(self->readFromBlob(cx, self, oldRead.range, prevPruneVersion));
try {
Version minSnapshotVersion = newPruneVersion;
for (auto& it : versionRead.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, minSnapshotVersion - 1));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(self->readFromBlob(cx, self, oldRead.range, minStartVer - 1));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
ASSERT(e.code() == error_code_blob_granule_transaction_too_old);
}
// TODO: read at some version older than pruneVersion and make sure you get txn_too_old
@ -508,7 +538,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
for (int i = 0; i < threads; i++) {
clients.push_back(
timeout(reportErrors(verifyGranules(cx, this), "BlobGranuleVerifier"), testDuration, Void()));
timeout(reportErrors(verifyGranules(cx, this, clientId == 0 && i == 0), "BlobGranuleVerifier"),
testDuration,
Void()));
}
return delay(testDuration);
}