Fix range boundaries and clearing intents.
This commit is contained in:
parent
63b7666f49
commit
2ccb3a4740
|
@ -214,6 +214,7 @@ struct BlobManagerData {
|
|||
std::unordered_set<UID> deadWorkers;
|
||||
KeyRangeMap<UID> workerAssignments;
|
||||
KeyRangeMap<bool> knownBlobRanges;
|
||||
KeyRangeMap<Version> prunesInProgress;
|
||||
|
||||
AsyncTrigger startRecruiting;
|
||||
Debouncer restartRecruiting;
|
||||
|
@ -1493,6 +1494,7 @@ ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId,
|
|||
}
|
||||
}
|
||||
|
||||
printf("partial deletion: deleting %d files\n", deletions.size());
|
||||
wait(waitForAll(deletions));
|
||||
|
||||
// delete metadata in FDB (deleted file keys)
|
||||
|
@ -1544,14 +1546,30 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
|
||||
state KeyRangeMap<UID>::iterator activeRange;
|
||||
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
|
||||
// only want to prune exact granules
|
||||
ASSERT(activeRange.begin() >= startKey && activeRange.end() < endKey);
|
||||
// assumption: prune boundaries must respect granule boundaries
|
||||
printf("looping over active range [%s-%s)=%s\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str(),
|
||||
activeRange.value().toString().c_str());
|
||||
|
||||
// ASSERT(activeRange.begin() >= startKey && activeRange.end() < endKey);
|
||||
if (activeRange.begin() < startKey || activeRange.end() > endKey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
printf("fetching latest history for worker assignment [%s-%s)=%s\n",
|
||||
activeRange.begin().printable().c_str(),
|
||||
activeRange.end().printable().c_str(),
|
||||
activeRange.value().toString().c_str());
|
||||
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
||||
ASSERT(history.present());
|
||||
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
||||
// ASSERT(history.present());
|
||||
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a get
|
||||
if (history.present()) {
|
||||
printf("pushing onto history queue\n");
|
||||
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
@ -1559,6 +1577,7 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
}
|
||||
}
|
||||
|
||||
printf("starting to go through history queue\n");
|
||||
while (!historyEntryQueue.empty()) {
|
||||
// process the node at the front of the queue and remove it
|
||||
KeyRange currRange;
|
||||
|
@ -1617,41 +1636,51 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
// race and we'll end up with unreachable nodes in the case of a crash
|
||||
|
||||
state int i;
|
||||
printf("%d granules to fully delete\n", toFullyDelete.size());
|
||||
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
|
||||
UID granuleId;
|
||||
KeyRef historyKey;
|
||||
std::tie(granuleId, historyKey) = toFullyDelete[i];
|
||||
// TODO: can possibly batch into a single txn
|
||||
// FIXME: consider batching into a single txn (need to take care of txn size limit)
|
||||
wait(fullyDeleteGranule(self, granuleId, historyKey));
|
||||
}
|
||||
|
||||
// TODO: could possibly do the partial deletes in parallel?
|
||||
std::vector<Future<Void>> partialDeletions;
|
||||
printf("%d granules to partially delete\n", toPartiallyDelete.size());
|
||||
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
|
||||
UID granuleId = toPartiallyDelete[i];
|
||||
wait(partiallyDeleteGranule(self, granuleId, pruneVersion));
|
||||
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, pruneVersion));
|
||||
}
|
||||
|
||||
// There could have been another pruneIntent that got written for this table while we
|
||||
// were processing this one. If that is the case, we should not clear the key. Otherwise,
|
||||
// we should clear the key to indicate the work is done.
|
||||
wait(waitForAll(partialDeletions));
|
||||
|
||||
// TODO: reuse tr
|
||||
state Reference<ReadYourWritesTransaction> rywTr = makeReference<ReadYourWritesTransaction>(self->db);
|
||||
rywTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
rywTr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
// Now that all the necessary granules and their files have been deleted, we can
|
||||
// clear the pruneIntent key to signify that the work is done. However, there could have been
|
||||
// another pruneIntent that got written for this table while we were processing this one.
|
||||
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
|
||||
|
||||
tr.reset();
|
||||
loop {
|
||||
try {
|
||||
state RangeResult pruneIntent =
|
||||
wait(krmGetRanges(rywTr, blobGranulePruneKeys.begin, range, 1, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
|
||||
ASSERT(pruneIntent.size() == 1);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
if (decodeBlobGranulePruneValue(pruneIntent[0].value).first == pruneVersion) {
|
||||
rywTr->clear(pruneIntent[0].key);
|
||||
wait(rywTr->commit());
|
||||
state Key pruneIntentKey = blobGranulePruneKeys.begin.withSuffix(startKey);
|
||||
state Optional<Value> pruneIntentValue = wait(tr.get(pruneIntentKey));
|
||||
ASSERT(pruneIntentValue.present());
|
||||
|
||||
Version currPruneVersion;
|
||||
bool currForce;
|
||||
std::tie(currPruneVersion, currForce) = decodeBlobGranulePruneValue(pruneIntentValue.get());
|
||||
|
||||
if (currPruneVersion == pruneVersion && currForce == force) {
|
||||
tr.clear(pruneIntentKey.withPrefix(blobGranulePruneKeys.begin));
|
||||
wait(tr.commit());
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(rywTr->onError(e));
|
||||
printf("pruneRange got error %s\n", e.name());
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1661,6 +1690,8 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||
/*
|
||||
TODO: We need to revoke range from BW so that it doesn't try to add to a granule that we dropped
|
||||
Will SnowTram reuse table IDs; do we unhybridize a range once it's been revoked/dropped?
|
||||
|
||||
Or can we possibly make the BW give it up upon seeing a change in the watch?
|
||||
*/
|
||||
|
||||
/*
|
||||
|
@ -1685,31 +1716,71 @@ Will SnowTram reuse table IDs; do we unhybridize a range once it's been revoked/
|
|||
*/
|
||||
ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
||||
try {
|
||||
state Value oldPruneWatchVal;
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
// Wait for the watch to change, or some time to expire (whichever comes first)
|
||||
// before checking through the prune intents
|
||||
|
||||
// TODO: pruneIntent written, watch triggered, prune work scheduled, before work is done another prune
|
||||
// intent is written, then we'd wait a whole timeout to do the next pruneintent
|
||||
// Use a UID to prevent this
|
||||
// before checking through the prune intents. We write a UID into the change key value
|
||||
// so that we can still recognize when the watch key has been changed while we weren't
|
||||
// monitoring it
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey));
|
||||
|
||||
// if the value at the change key has changed, that means there is new work to do
|
||||
if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) {
|
||||
oldPruneWatchVal = newPruneWatchVal.get();
|
||||
printf("old and new watch don't match\n");
|
||||
break;
|
||||
}
|
||||
|
||||
// otherwise, there are no changes and we should wait until the next change (or timeout)
|
||||
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
|
||||
wait(tr->commit());
|
||||
wait(timeout(watchPruneIntentsChange, SERVER_KNOBS->BG_PRUNE_TIMEOUT, Void()));
|
||||
break;
|
||||
choose {
|
||||
when(wait(watchPruneIntentsChange)) { tr->reset(); }
|
||||
when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) {
|
||||
printf("bg prune timeouts\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
// wait(timeout(watchPruneIntentsChange, SERVER_KNOBS->BG_PRUNE_TIMEOUT, Void()));
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
tr->reset();
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Version dummyV = wait(tr->getReadVersion());
|
||||
Value dummyValue = blobGranulePruneValueFor(dummyV, false);
|
||||
wait(krmSetRange(tr, blobGranulePruneKeys.begin, normalKeys, dummyValue));
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
printf("dummy txn saw error %s\n", e.name());
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
tr->reset();
|
||||
|
||||
// loop through all prune intentions and do prune work accordingly
|
||||
state KeyRef beginKey = normalKeys.begin;
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
state std::vector<Future<Void>> prunes;
|
||||
try {
|
||||
// TODO: replace 10000 with a knob
|
||||
|
@ -1719,12 +1790,14 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
|
||||
// TODO: would we miss a range [pruneIntents[9999], pruneIntents[10000]) because of the `more`?
|
||||
// Or does `readThrough` take care of this? We also do this in recoverBlobManager
|
||||
printf("pruneIntents.size()==%d\n", pruneIntents.size());
|
||||
for (int rangeIdx = 0; rangeIdx < pruneIntents.size() - 1; ++rangeIdx) {
|
||||
if (pruneIntents[rangeIdx].value.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
KeyRef rangeStartKey = pruneIntents[rangeIdx].key;
|
||||
KeyRef rangeEndKey = pruneIntents[rangeIdx + 1].key;
|
||||
KeyRange range(KeyRangeRef(rangeStartKey, rangeEndKey));
|
||||
Version pruneVersion;
|
||||
bool force;
|
||||
std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
|
||||
|
@ -1736,11 +1809,26 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
// fails... One way to prevent this is to not iterate over the prunes until the last iteration
|
||||
// is done (i.e waitForAll)
|
||||
//
|
||||
// ErrorOr would prob be what we need here
|
||||
|
||||
/*
|
||||
auto currPrunes = self->prunesInProgress.intersectingRanges(range);
|
||||
int count = 0;
|
||||
for (auto currPrune : currPrunes) {
|
||||
count++;
|
||||
if (currPrune.value() == pruneVersion) {
|
||||
}
|
||||
}
|
||||
ASSERT(currPrunes.() <= 1);
|
||||
*/
|
||||
|
||||
printf("about to prune range [%s-%s) @ %d, force=%s\n",
|
||||
rangeStartKey.printable().c_str(),
|
||||
rangeEndKey.printable().c_str(),
|
||||
pruneVersion,
|
||||
force ? "T" : "F");
|
||||
prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force));
|
||||
|
||||
// TODO: maybe clear the key here if pruneRange succeeded
|
||||
// TODO: maybe clear the key here if pruneRange succeeded, but then we'd have to wait here
|
||||
}
|
||||
|
||||
// wait for this set of prunes to complete before starting the next ones since if we prune
|
||||
|
@ -1763,6 +1851,7 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
printf("done pruning all ranges. looping back\n");
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (BM_DEBUG) {
|
||||
|
|
|
@ -1205,6 +1205,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
metadata->durableSnapshotVersion.set(startState.blobFilesToSnapshot.get().snapshotFiles.back().version);
|
||||
} else {
|
||||
ASSERT(startState.previousDurableVersion == invalidVersion);
|
||||
printf("About to dump initial snapshot\n");
|
||||
BlobFileIndex fromFDB = wait(dumpInitialSnapshotFromFDB(bwData, metadata, startState.granuleID));
|
||||
newSnapshotFile = fromFDB;
|
||||
ASSERT(startState.changeFeedStartVersion <= fromFDB.version);
|
||||
|
|
Loading…
Reference in New Issue