Better granule conversion (#7787)

* better check for granule-ification

* Handling blob granule initial split too large

* Re-evaluating split size if too large, even if read doesn't get transaction_too_old

* reworked to have blob worker propose split key

* New GranuleStatusReply to avoid seqno issue stream side effects

* Handling retries on reevaluateInitialSplit properly

* Waiting for stream to be initialized

* Checking reevaluate split for additional split points beyond proposed

* Fixing more races in reevaluate initial split

* properly handling cleaning up old change feed after split re-evaluate

* fixing granule conversion bug with hard boundaries

* fixing clear and merge check race with cycle test

* refactor missed knob check for clearAndMerge

* Fixing formatting

* review comments and improving large range conversion

* fixing typo

* more formatting
This commit is contained in:
Josh Slocum 2022-08-05 18:12:17 -05:00 committed by GitHub
parent cd0020ccaf
commit f866ffc36b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 426 additions and 65 deletions

View File

@ -172,6 +172,7 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
KeyRange granuleRange;
bool doSplit;
bool writeHotSplit;
bool initialSplitTooBig;
int64_t continueEpoch;
int64_t continueSeqno;
UID granuleID;
@ -180,11 +181,13 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
bool mergeCandidate;
int64_t originalEpoch;
int64_t originalSeqno;
Optional<Key> proposedSplitKey;
GranuleStatusReply() {}
explicit GranuleStatusReply(KeyRange range,
bool doSplit,
bool writeHotSplit,
bool initialSplitTooBig,
int64_t continueEpoch,
int64_t continueSeqno,
UID granuleID,
@ -193,11 +196,15 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
bool mergeCandidate,
int64_t originalEpoch,
int64_t originalSeqno)
: granuleRange(range), doSplit(doSplit), writeHotSplit(writeHotSplit), continueEpoch(continueEpoch),
continueSeqno(continueSeqno), granuleID(granuleID), startVersion(startVersion), blockedVersion(blockedVersion),
mergeCandidate(mergeCandidate), originalEpoch(originalEpoch), originalSeqno(originalSeqno) {}
: granuleRange(range), doSplit(doSplit), writeHotSplit(writeHotSplit), initialSplitTooBig(initialSplitTooBig),
continueEpoch(continueEpoch), continueSeqno(continueSeqno), granuleID(granuleID), startVersion(startVersion),
blockedVersion(blockedVersion), mergeCandidate(mergeCandidate), originalEpoch(originalEpoch),
originalSeqno(originalSeqno) {}
int expectedSize() const { return sizeof(GranuleStatusReply) + granuleRange.expectedSize(); }
int expectedSize() const {
return sizeof(GranuleStatusReply) + granuleRange.expectedSize() +
(proposedSplitKey.present() ? proposedSplitKey.get().expectedSize() : 0);
}
template <class Ar>
void serialize(Ar& ar) {
@ -207,6 +214,7 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
granuleRange,
doSplit,
writeHotSplit,
initialSplitTooBig,
continueEpoch,
continueSeqno,
granuleID,
@ -214,7 +222,8 @@ struct GranuleStatusReply : public ReplyPromiseStreamReply {
blockedVersion,
mergeCandidate,
originalEpoch,
originalSeqno);
originalSeqno,
proposedSplitKey);
}
};

View File

@ -1012,22 +1012,21 @@ ACTOR Future<Void> writeInitialGranuleMapping(Reference<BlobManagerData> bmData,
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
wait(checkManagerLock(tr, bmData));
while (i + j < splitPoints.keys.size() - 1 && j < transactionChunkSize) {
state KeyRangeRef splitRange = KeyRangeRef(splitPoints.keys[i + j], splitPoints.keys[i + j + 1]);
// set to empty UID - no worker assigned yet
wait(krmSetRange(tr,
blobGranuleMappingKeys.begin,
KeyRangeRef(splitPoints.keys[i + j], splitPoints.keys[i + j + 1]),
blobGranuleMappingValueFor(UID())));
// Update BlobGranuleMergeBoundary.
if (splitPoints.boundaries.count(splitRange.begin)) {
tr->set(blobGranuleMergeBoundaryKeyFor(splitRange.begin),
blobGranuleMergeBoundaryValueFor(splitPoints.boundaries[splitRange.begin]));
// Instead of doing a krmSetRange for each granule, because it does a read-modify-write, we do one
// krmSetRange for the whole batch, and then just individual sets for each intermediate boundary This
// does one read per transaction instead of N serial reads per transaction
state int endIdx = std::min(i + transactionChunkSize, (int)(splitPoints.keys.size() - 1));
wait(krmSetRange(tr,
blobGranuleMappingKeys.begin,
KeyRangeRef(splitPoints.keys[i], splitPoints.keys[endIdx]),
blobGranuleMappingValueFor(UID())));
for (j = 0; i + j < endIdx; j++) {
if (splitPoints.boundaries.count(splitPoints.keys[i + j])) {
tr->set(blobGranuleMergeBoundaryKeyFor(splitPoints.keys[i + j]),
blobGranuleMergeBoundaryValueFor(splitPoints.boundaries[splitPoints.keys[i + j]]));
}
j++;
tr->set(splitPoints.keys[i + j].withPrefix(blobGranuleMappingKeys.begin),
blobGranuleMappingValueFor(UID()));
}
wait(tr->commit());
@ -1119,6 +1118,11 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
throw internal_error();
}
// TODO better way to do this!
bmData->mergeHardBoundaries.clear();
for (auto& it : results) {
bmData->mergeHardBoundaries[it.key] = true;
}
ar.dependsOn(results.arena());
VectorRef<KeyRangeRef> rangesToAdd;
@ -1150,11 +1154,6 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
ra.keyRange = range;
ra.revoke = RangeRevokeData(true); // dispose=true
handleRangeAssign(bmData, ra);
bmData->mergeHardBoundaries.erase(range.begin);
if (bmData->knownBlobRanges.containedRanges(singleKeyRange(range.end)).empty()) {
bmData->mergeHardBoundaries[range.end] = true;
}
}
state std::vector<Future<BlobGranuleSplitPoints>> splitFutures;
@ -1162,10 +1161,6 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
for (KeyRangeRef range : rangesToAdd) {
TraceEvent("ClientBlobRangeAdded", bmData->id).detail("Range", range);
splitFutures.push_back(splitRange(bmData, range, false, true));
if (bmData->knownBlobRanges.containedRanges(singleKeyRange(range.begin)).empty()) {
bmData->mergeHardBoundaries[range.begin] = true;
}
}
for (auto f : splitFutures) {
@ -1256,6 +1251,230 @@ static void downsampleSplit(const Standalone<VectorRef<KeyRef>>& splits,
}
}
ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobManagerData> bmData,
UID currentWorkerId,
KeyRange granuleRange,
UID granuleID,
int64_t epoch,
int64_t seqno,
Key proposedSplitKey) {
CODE_PROBE(true, "BM re-evaluating initial split too big");
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big from {3} @ ({4}, {5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
currentWorkerId.toString().substr(0, 5),
epoch,
seqno);
fmt::print("Proposed split (2):\n");
fmt::print(" {0}\n", granuleRange.begin.printable());
fmt::print(" {0}\n", proposedSplitKey.printable());
fmt::print(" {0}\n", granuleRange.end.printable());
}
TraceEvent("BMCheckInitialSplitTooBig", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("Granule", granuleRange)
.detail("ProposedSplitKey", proposedSplitKey);
// calculate new split targets speculatively assuming split is too large and current worker still owns it
ASSERT(granuleRange.begin < proposedSplitKey);
ASSERT(proposedSplitKey < granuleRange.end);
state Future<BlobGranuleSplitPoints> fSplitFirst =
splitRange(bmData, KeyRangeRef(granuleRange.begin, proposedSplitKey), false, true);
state Future<BlobGranuleSplitPoints> fSplitSecond =
splitRange(bmData, KeyRangeRef(proposedSplitKey, granuleRange.end), false, true);
state Standalone<VectorRef<KeyRef>> newRanges;
BlobGranuleSplitPoints splitFirst = wait(fSplitFirst);
ASSERT(splitFirst.keys.size() >= 2);
ASSERT(splitFirst.keys.front() == granuleRange.begin);
ASSERT(splitFirst.keys.back() == proposedSplitKey);
for (int i = 0; i < splitFirst.keys.size(); i++) {
newRanges.push_back_deep(newRanges.arena(), splitFirst.keys[i]);
}
BlobGranuleSplitPoints splitSecond = wait(fSplitSecond);
ASSERT(splitSecond.keys.size() >= 2);
ASSERT(splitSecond.keys.front() == proposedSplitKey);
ASSERT(splitSecond.keys.back() == granuleRange.end);
// i=1 to skip proposedSplitKey, since above already added it
for (int i = 1; i < splitSecond.keys.size(); i++) {
newRanges.push_back_deep(newRanges.arena(), splitSecond.keys[i]);
}
if (BM_DEBUG) {
fmt::print("Re-evaluated split ({0}:\n", newRanges.size());
for (auto& it : newRanges) {
fmt::print(" {0}\n", it.printable());
}
}
// redo key alignment on full set of split points
// FIXME: only need to align propsedSplitKey in the middle
state BlobGranuleSplitPoints finalSplit = wait(alignKeys(bmData, granuleRange, newRanges));
ASSERT(finalSplit.keys.size() > 2);
if (BM_DEBUG) {
fmt::print("Aligned split ({0}:\n", finalSplit.keys.size());
for (auto& it : finalSplit.keys) {
fmt::print(" {0}{1}\n", it.printable(), finalSplit.boundaries.count(it) ? " *" : "");
}
}
// Check lock to see if lock is still the specified epoch and seqno, and there are no files for the granule.
// If either of these are false, some other worker now has the granule. if there are files, it already succeeded at
// a split. if not, and it fails too, it will retry and get back here
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
state Key lockKey = blobGranuleLockKeyFor(granuleRange);
state bool retried = false;
loop {
try {
tr->setOption(FDBTransactionOptions::Option::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::Option::ACCESS_SYSTEM_KEYS);
// make sure we're still manager when this transaction gets committed
wait(checkManagerLock(tr, bmData));
// this adds a read conflict range, so if another granule concurrently commits a file, we will retry and see
// that
KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
RangeResult granuleFiles = wait(tr->getRange(range, 1));
if (!granuleFiles.empty()) {
CODE_PROBE(true, "split too big was eventually solved by another worker");
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: solved by another worker\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
return Void();
}
Optional<Value> prevLockValue = wait(tr->get(lockKey));
ASSERT(prevLockValue.present());
std::tuple<int64_t, int64_t, UID> prevOwner = decodeBlobGranuleLockValue(prevLockValue.get());
int64_t prevOwnerEpoch = std::get<0>(prevOwner);
int64_t prevOwnerSeqno = std::get<1>(prevOwner);
UID prevGranuleID = std::get<2>(prevOwner);
if (prevOwnerEpoch != epoch || prevOwnerSeqno != seqno || prevGranuleID != granuleID) {
if (retried && prevOwnerEpoch == bmData->epoch && prevGranuleID == granuleID &&
prevOwnerSeqno == std::numeric_limits<int64_t>::max()) {
// owner didn't change, last iteration of this transaction just succeeded but threw an error.
CODE_PROBE(true, "split too big adjustment succeeded after retry");
break;
}
CODE_PROBE(true, "split too big was since moved to another worker");
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: moved to another worker\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
fmt::print("Epoch: Prev {0}, Cur {1}\n", prevOwnerEpoch, epoch);
fmt::print("Seqno: Prev {0}, Cur {1}\n", prevOwnerSeqno, seqno);
fmt::print("GranuleID: Prev {0}, Cur {1}\n",
prevGranuleID.toString().substr(0, 6),
granuleID.toString().substr(0, 6));
}
return Void();
}
if (prevOwnerEpoch > bmData->epoch) {
if (BM_DEBUG) {
fmt::print("BM {0} found a higher epoch {1} for granule lock of [{2} - {3})\n",
bmData->epoch,
prevOwnerEpoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
if (bmData->iAmReplaced.canBeSet()) {
bmData->iAmReplaced.send(Void());
}
return Void();
}
// The lock check above *should* handle this, but just be sure, also make sure that this granule wasn't
// already split in the granule mapping
RangeResult existingRanges = wait(
krmGetRanges(tr, blobGranuleMappingKeys.begin, granuleRange, 3, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
if (existingRanges.size() > 2 || existingRanges.more) {
CODE_PROBE(true, "split too big was already re-split");
if (BM_DEBUG) {
fmt::print("BM {0} re-evaluating initial split [{1} - {2}) too big: already split\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
for (auto& it : existingRanges) {
fmt::print(" {0}\n", it.key.printable());
}
}
return Void();
}
// Set lock to max value for this manager, so other reassignments can't race with this transaction
// and existing owner can't modify it further.
tr->set(lockKey, blobGranuleLockValueFor(bmData->epoch, std::numeric_limits<int64_t>::max(), granuleID));
// set new ranges
state int i;
for (i = 0; i < finalSplit.keys.size() - 1; i++) {
wait(krmSetRange(tr,
blobGranuleMappingKeys.begin,
KeyRangeRef(finalSplit.keys[i], finalSplit.keys[i + 1]),
blobGranuleMappingValueFor(UID())));
if (finalSplit.boundaries.count(finalSplit.keys[i])) {
tr->set(blobGranuleMergeBoundaryKeyFor(finalSplit.keys[i]),
blobGranuleMergeBoundaryValueFor(finalSplit.boundaries[finalSplit.keys[i]]));
}
}
// Need to destroy the old change feed for the no longer needed feed, otherwise it will leak
// This has to be a non-ryw transaction for the change feed destroy mutations to propagate properly
// TODO: fix this better! (privatize change feed key clear)
wait(updateChangeFeed(&tr->getTransaction(),
granuleIDToCFKey(granuleID),
ChangeFeedStatus::CHANGE_FEED_DESTROY,
granuleRange));
retried = true;
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
// transaction committed, send updated range assignments. Even if there is only one range still, we need to revoke
// it and re-assign it to cancel the old granule and retry
CODE_PROBE(true, "BM successfully changed initial split too big");
RangeAssignment raRevoke;
raRevoke.isAssign = false;
raRevoke.keyRange = granuleRange;
raRevoke.revoke = RangeRevokeData(false); // not a dispose
handleRangeAssign(bmData, raRevoke);
for (int i = 0; i < finalSplit.keys.size() - 1; i++) {
// reassign new range and do handover of previous range
RangeAssignment raAssignSplit;
raAssignSplit.isAssign = true;
raAssignSplit.keyRange = KeyRangeRef(finalSplit.keys[i], finalSplit.keys[i + 1]);
raAssignSplit.assign = RangeAssignmentData();
// don't care who this range gets assigned to
handleRangeAssign(bmData, raAssignSplit);
}
if (BM_DEBUG) {
fmt::print("BM {0} Re-splitting initial range [{1} - {2}) into {3} granules done\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
finalSplit.keys.size() - 1);
}
return Void();
}
ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
UID currentWorkerId,
KeyRange granuleRange,
@ -1344,9 +1563,10 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
granuleRange.end.printable(),
splitPoints.keys.size() - 1);
for (int i = 0; i < splitPoints.keys.size(); i++) {
fmt::print(" {}:{}\n",
fmt::print(" {0}:{1}{2}\n",
(i < newGranuleIDs.size() ? newGranuleIDs[i] : UID()).toString().substr(0, 6).c_str(),
splitPoints.keys[i].printable());
splitPoints.keys[i].printable(),
splitPoints.boundaries.count(splitPoints.keys[i]) ? " *" : "");
}
}
@ -1800,7 +2020,7 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
// Clear existing merge boundaries.
tr->clear(blobGranuleMergeBoundaryKeyFor(parentRange.begin));
// This has to be
// This has to be a non-ryw transaction for the change feed destroy mutations to propagate properly
// TODO: fix this better! (privatize change feed key clear)
wait(updateChangeFeed(&tr->getTransaction(),
granuleIDToCFKey(parentGranuleIDs[parentIdx]),
@ -2283,7 +2503,9 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
rep.continueSeqno,
bwInterf.id().toString(),
rep.doSplit ? "split" : (rep.mergeCandidate ? "merge" : ""),
rep.mergeCandidate ? "" : (rep.writeHotSplit ? "hot" : "normal"));
rep.mergeCandidate
? ""
: (rep.writeHotSplit ? "hot" : (rep.initialSplitTooBig ? "toobig" : "normal")));
}
ASSERT(rep.doSplit || rep.mergeCandidate);
@ -2449,14 +2671,25 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
newEval.inProgress = maybeSplitRange(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit,
rep.originalEpoch,
rep.originalSeqno);
if (rep.initialSplitTooBig) {
ASSERT(rep.proposedSplitKey.present());
newEval.inProgress = reevaluateInitialSplit(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.originalEpoch,
rep.originalSeqno,
rep.proposedSplitKey.get());
} else {
newEval.inProgress = maybeSplitRange(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit,
rep.originalEpoch,
rep.originalSeqno);
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
}

View File

@ -741,6 +741,13 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
}
}
ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
UID granuleID,
KeyRange keyRange,
int64_t epoch,
int64_t seqno,
Key proposedSplitKey);
ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
Reference<BlobConnectionProvider> bstore,
KeyRange keyRange,
@ -749,33 +756,59 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
int64_t seqno,
Version version,
PromiseStream<RangeResult> rows,
bool createGranuleHistory) {
bool initialSnapshot) {
state std::string fileName = randomBGFilename(bwData->id, granuleID, version, ".snapshot");
state Standalone<GranuleSnapshot> snapshot;
state int64_t bytesRead = 0;
state bool injectTooBig = initialSnapshot && g_network->isSimulated() && BUGGIFY_WITH_PROB(0.1);
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
loop {
try {
if (initialSnapshot && snapshot.size() > 1 &&
(injectTooBig || bytesRead >= 3 * SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES)) {
// throw transaction too old either on injection for simulation, or if snapshot would be too large now
throw transaction_too_old();
}
RangeResult res = waitNext(rows.getFuture());
snapshot.arena().dependsOn(res.arena());
snapshot.append(snapshot.arena(), res.begin(), res.size());
bytesRead += res.expectedSize();
wait(yield(TaskPriority::BlobWorkerUpdateStorage));
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
}
throw e;
// if we got transaction_too_old naturally, have lower threshold for re-evaluating (2xlimit)
if (initialSnapshot && snapshot.size() > 1 && e.code() == error_code_transaction_too_old &&
(injectTooBig || bytesRead >= 2 * SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES)) {
// idle this actor, while we tell the manager this is too big and to re-evaluate granules and revoke us
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) re-evaluating snapshot after {2} bytes ({3} limit) {4}\n",
keyRange.begin.printable(),
keyRange.end.printable(),
bytesRead,
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES,
injectTooBig ? "(injected)" : "");
}
wait(reevaluateInitialSplit(
bwData, granuleID, keyRange, epoch, seqno, snapshot[snapshot.size() / 2].key));
ASSERT(false);
} else {
throw e;
}
}
}
wait(delay(0, TaskPriority::BlobWorkerUpdateStorage));
if (BW_DEBUG) {
fmt::print("Granule [{0} - {1}) read {2} snapshot rows\n",
fmt::print("Granule [{0} - {1}) read {2} snapshot rows ({3} bytes)\n",
keyRange.begin.printable(),
keyRange.end.printable(),
snapshot.size());
snapshot.size(),
bytesRead);
}
if (g_network->isSimulated()) {
@ -850,7 +883,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta);
tr->set(snapshotFileKey, snapshotFileValue);
// create granule history at version if this is a new granule with the initial dump from FDB
if (createGranuleHistory) {
if (initialSnapshot) {
Key historyKey = blobGranuleHistoryKeyFor(keyRange, version);
Standalone<BlobGranuleHistoryValue> historyValue;
historyValue.granuleID = granuleID;
@ -917,7 +950,6 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
state FlowLock::Releaser holdingDVL(bwData->initialSnapshotLock);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
state int64_t bytesRead = 0;
state int retries = 0;
state Version lastReadVersion = invalidVersion;
state Version readVersion = invalidVersion;
@ -956,12 +988,11 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
throw e;
}
if (BW_DEBUG) {
fmt::print("Dumping snapshot {0} from FDB for [{1} - {2}) got error {3} after {4} bytes\n",
fmt::print("Dumping snapshot {0} from FDB for [{1} - {2}) got error {3}\n",
retries + 1,
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
e.name(),
bytesRead);
e.name());
}
state Error err = e;
if (e.code() == error_code_server_overloaded) {
@ -976,7 +1007,6 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
.error(err)
.detail("Granule", metadata->keyRange)
.detail("Count", retries);
bytesRead = 0;
lastReadVersion = readVersion;
// Pop change feed up to readVersion, because that data will be before the next snapshot
// Do this to prevent a large amount of CF data from accumulating if we have consecutive failures to
@ -1196,6 +1226,7 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange,
true,
writeHot,
false,
statusEpoch,
statusSeqno,
granuleID,
@ -1259,6 +1290,64 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
return reSnapshotIdx;
}
// wait indefinitely to tell manager to re-evaluate this split, until the granule is revoked
ACTOR Future<Void> reevaluateInitialSplit(Reference<BlobWorkerData> bwData,
UID granuleID,
KeyRange keyRange,
int64_t epoch,
int64_t seqno,
Key proposedSplitKey) {
// wait for first stream to be initialized
while (!bwData->statusStreamInitialized) {
wait(bwData->currentManagerStatusStream.onChange());
}
loop {
try {
// wait for manager stream to become ready, and send a message
loop {
choose {
when(wait(bwData->currentManagerStatusStream.get().onReady())) { break; }
when(wait(bwData->currentManagerStatusStream.onChange())) {}
}
}
GranuleStatusReply reply(keyRange,
true,
false,
true,
epoch,
seqno,
granuleID,
invalidVersion,
invalidVersion,
false,
epoch,
seqno);
reply.proposedSplitKey = proposedSplitKey;
bwData->currentManagerStatusStream.get().send(reply);
// if a new manager appears, also tell it about this granule being splittable, or retry after a certain
// amount of time of not hearing back
wait(success(timeout(bwData->currentManagerStatusStream.onChange(), 10.0)));
wait(delay(0));
CODE_PROBE(true, "Blob worker re-sending initialsplit too big");
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
CODE_PROBE(true, "Blob worker re-sending merge candidate to manager after not error/not hearing back");
// if we got broken promise while waiting, the old stream was killed, so we don't need to wait
// on change, just retry
if (e.code() == error_code_broken_promise) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} else {
wait(bwData->currentManagerStatusStream.onChange());
}
}
}
}
ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
Reference<GranuleMetadata> metadata,
UID granuleID,
@ -1291,7 +1380,6 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
wait(bwData->currentManagerStatusStream.onChange());
}
// FIXME: after a certain amount of retries/time, we may want to re-check anyway
state double sendTimeGiveUp = now() + SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS / 2.0;
loop {
try {
@ -1310,6 +1398,7 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
}
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange,
false,
false,
false,
metadata->continueEpoch,

View File

@ -2297,6 +2297,11 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
}
rangeIds[it->id] = std::tuple(it->range, it->emptyVersion, stopVersion, it->metadataVersion);
} else if (it->destroyed && it->metadataVersion > metadataWaitVersion) {
// if we communicate the lack of a change feed because it's destroying, ensure the feed destroy isn't
// rolled back first
CODE_PROBE(true, "Overlapping Change Feeds ensuring destroy isn't rolled back");
metadataWaitVersion = it->metadataVersion;
}
}
}

View File

@ -49,8 +49,6 @@
*/
struct BlobGranuleVerifierWorkload : TestWorkload {
bool doSetup;
double minDelay;
double maxDelay;
double testDuration;
double timeTravelLimit;
uint64_t timeTravelBufferSize;
@ -65,7 +63,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t purges = 0;
std::vector<Future<Void>> clients;
bool enablePurging;
bool initAtEnd;
bool strictPurgeChecking;
bool clearAndMergeCheck;
DatabaseConfiguration config;
@ -74,9 +74,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
BlobGranuleVerifierWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
doSetup = !clientId; // only do this on the "first" client
// FIXME: don't do the delay in setup, as that delays the start of all workloads
minDelay = getOption(options, LiteralStringRef("minDelay"), 0.0);
maxDelay = getOption(options, LiteralStringRef("maxDelay"), 0.0);
testDuration = getOption(options, LiteralStringRef("testDuration"), 120.0);
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration);
timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000);
@ -87,6 +84,15 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// won't be cleaned up.
strictPurgeChecking =
getOption(options, LiteralStringRef("strictPurgeChecking"), false /*sharedRandomNumber % 2 == 0*/);
sharedRandomNumber /= 10;
// randomly some tests write data first and then turn on blob granules later, to test conversion of existing DB
initAtEnd = !enablePurging && sharedRandomNumber % 10 == 0;
sharedRandomNumber /= 10;
clearAndMergeCheck = getOption(options, LiteralStringRef("clearAndMergeCheck"), sharedRandomNumber % 10 == 0);
sharedRandomNumber /= 10;
ASSERT(threads >= 1);
if (BGV_DEBUG) {
@ -110,9 +116,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// FIXME: run the actual FDBCLI command instead of copy/pasting its implementation
// Sets the whole user keyspace to be blobified
ACTOR Future<Void> setUpBlobRange(Database cx, Future<Void> waitForStart) {
ACTOR Future<Void> setUpBlobRange(Database cx) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
wait(waitForStart);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -142,11 +147,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
wait(success(ManagementAPI::changeConfig(cx.getReference(), "blob_granules_enabled=1", true)));
double initialDelay = deterministicRandom()->random01() * (self->maxDelay - self->minDelay) + self->minDelay;
if (BGV_DEBUG) {
printf("BGW setup initial delay of %.3f\n", initialDelay);
if (!self->initAtEnd) {
wait(self->setUpBlobRange(cx));
}
wait(self->setUpBlobRange(cx, delay(initialDelay)));
return Void();
}
@ -430,6 +433,11 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
ACTOR Future<bool> _check(Database cx, BlobGranuleVerifierWorkload* self) {
// check error counts, and do an availability check at the end
if (self->doSetup && self->initAtEnd) {
// FIXME: this doesn't check the data contents post-conversion, just that it finishes successfully
wait(self->setUpBlobRange(cx));
}
state Transaction tr(cx);
state Version readVersion = wait(self->doGrv(&tr));
state Version startReadVersion = readVersion;
@ -446,8 +454,21 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state Future<Void> rangeFetcher = self->findGranules(cx, self);
loop {
wait(self->granuleRanges.onChange());
// wait until entire keyspace has granules
if (!self->granuleRanges.get().empty()) {
break;
bool haveAll = true;
if (self->granuleRanges.get().front().begin != normalKeys.begin ||
self->granuleRanges.get().back().end != normalKeys.end) {
haveAll = false;
}
for (int i = 0; haveAll && i < self->granuleRanges.get().size() - 1; i++) {
if (self->granuleRanges.get()[i].end != self->granuleRanges.get()[i + 1].begin) {
haveAll = false;
}
}
if (haveAll) {
break;
}
}
}
rangeFetcher.cancel();
@ -535,7 +556,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// FIXME: if doPurging was set, possibly do one last purge here, and verify it succeeds with no errors
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && self->clearAndMergeCheck) {
CODE_PROBE(true, "BGV clearing database and awaiting merge");
wait(clearAndAwaitMerge(cx, normalKeys));
}

View File

@ -20,6 +20,8 @@ testTitle = 'BlobGranuleMoveVerifyCycle'
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0
# cycle does its own workload checking, don't want clear racing with its checking
clearAndMergeCheck = false
[[test.workload]]
testName = 'RandomClogging'

View File

@ -18,6 +18,8 @@ testTitle = 'BlobGranuleVerifyCycle'
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0
# cycle does its own workload checking, don't want clear racing with its checking
clearAndMergeCheck = false
[[test.workload]]
testName = 'RandomClogging'