More cleanup.

This commit is contained in:
Suraj Gupta 2021-12-03 18:22:10 -05:00 committed by Josh Slocum
parent 1d32da5ac6
commit e9d1f36cae
3 changed files with 20 additions and 123 deletions

View File

@ -7831,7 +7831,7 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio
return popChangeFeedMutationsActor(Reference<DatabaseContext>::addRef(this), rangeID, version);
}
#define BG_REQUEST_DEBUG true
#define BG_REQUEST_DEBUG false
ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db,
PromiseStream<KeyRange> results,

View File

@ -215,7 +215,6 @@ struct BlobManagerData {
std::unordered_set<UID> deadWorkers;
KeyRangeMap<UID> workerAssignments;
KeyRangeMap<bool> knownBlobRanges;
KeyRangeMap<Version> prunesInProgress;
AsyncTrigger startRecruiting;
Debouncer restartRecruiting;

View File

@ -47,25 +47,6 @@
#define BW_DEBUG true
#define BW_REQUEST_DEBUG true
// represents a previous version of a granule, and optionally the files that compose it
struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry> {
KeyRange range;
UID granuleID;
Version startVersion; // version of the first snapshot
Version endVersion; // version of the last delta file
// load files lazily, and allows for clearing old cold-queried files to save memory
Future<GranuleFiles> files;
// FIXME: do skip pointers with single back-pointer and neighbor pointers
// Just parent reference for now (assumes no merging)
Reference<GranuleHistoryEntry> parentGranule;
GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {}
GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion)
: range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {}
};
struct GranuleStartState {
UID granuleID;
Version changeFeedStartVersion;
@ -147,6 +128,25 @@ struct GranuleRangeMetadata {
// ~GranuleRangeMetadata() { printf("Destroying granule metadata\n"); }
};
// represents a previous version of a granule, and optionally the files that compose it
struct GranuleHistoryEntry : NonCopyable, ReferenceCounted<GranuleHistoryEntry> {
KeyRange range;
UID granuleID;
Version startVersion; // version of the first snapshot
Version endVersion; // version of the last delta file
// load files lazily, and allows for clearing old cold-queried files to save memory
Future<GranuleFiles> files;
// FIXME: do skip pointers with single back-pointer and neighbor pointers
// Just parent reference for now (assumes no merging)
Reference<GranuleHistoryEntry> parentGranule;
GranuleHistoryEntry() : startVersion(invalidVersion), endVersion(invalidVersion) {}
GranuleHistoryEntry(KeyRange range, UID granuleID, Version startVersion, Version endVersion)
: range(range), granuleID(granuleID), startVersion(startVersion), endVersion(endVersion) {}
};
struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
UID id;
Database db;
@ -2701,107 +2701,6 @@ ACTOR Future<Void> monitorRemoval(Reference<BlobWorkerData> bwData) {
}
}
ACTOR Future<Void> monitorPruneKeys(Reference<BlobWorkerData> self) {
try {
state Value oldPruneWatchVal;
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first)
// before checking through the prune intents. We write a UID into the change key value
// so that we can still recognize when the watch key has been changed while we weren't
// monitoring it
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> newPruneWatchVal = wait(tr->get(blobGranulePruneChangeKey));
// if the value at the change key has changed, that means there is new work to do
if (newPruneWatchVal.present() && oldPruneWatchVal != newPruneWatchVal.get()) {
oldPruneWatchVal = newPruneWatchVal.get();
printf("old and new watch don't match\n");
break;
}
// otherwise, there are no changes and we should wait until the next change (or timeout)
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
wait(tr->commit());
printf("About to wait for change or timeout\n");
choose {
when(wait(watchPruneIntentsChange)) { tr->reset(); }
when(wait(delay(SERVER_KNOBS->BG_PRUNE_TIMEOUT))) {
printf("bg prune timeouts\n");
break;
}
}
// wait(timeout(watchPruneIntentsChange, SERVER_KNOBS->BG_PRUNE_TIMEOUT, Void()));
} catch (Error& e) {
wait(tr->onError(e));
}
}
tr->reset();
// loop through all prune intentions and do prune work accordingly
state KeyRef beginKey = normalKeys.begin;
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state std::vector<Future<Void>> prunes;
try {
// TODO: replace 10000 with a knob
KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end));
state RangeResult pruneIntents = wait(krmGetRanges(
tr, blobGranulePruneKeys.begin, nextRange, 10000, GetRangeLimits::BYTE_LIMIT_UNLIMITED));
// TODO: would we miss a range [pruneIntents[9999], pruneIntents[10000]) because of the `more`?
// Or does `readThrough` take care of this? We also do this in recoverBlobManager
printf("pruneIntents.size()==%d\n", pruneIntents.size());
for (int rangeIdx = 0; rangeIdx < pruneIntents.size() - 1; ++rangeIdx) {
if (pruneIntents[rangeIdx].value.size() == 0) {
continue;
}
KeyRef rangeStartKey = pruneIntents[rangeIdx].key;
KeyRef rangeEndKey = pruneIntents[rangeIdx + 1].key;
KeyRange range(KeyRangeRef(rangeStartKey, rangeEndKey));
Version pruneVersion;
bool force;
std::tie(pruneVersion, force) = decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
printf("about to prune range [%s-%s) @ %d, force=%s\n",
rangeStartKey.printable().c_str(),
rangeEndKey.printable().c_str(),
pruneVersion,
force ? "T" : "F");
// TODO: clear associated history
}
if (!pruneIntents.more) {
break;
}
beginKey = pruneIntents.readThrough.get();
} catch (Error& e) {
// TODO: other errors here from pruneRange?
wait(tr->onError(e));
}
}
printf("done pruning all ranges. looping back\n");
}
} catch (Error& e) {
if (BW_DEBUG) {
printf("monitorPruneKeys got error %s\n", e.name());
}
throw e;
}
}
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
ReplyPromise<InitializeBlobWorkerReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
@ -2847,7 +2746,6 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
recruitReply.send(rep);
self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
self->addActor.send(monitorPruneKeys(self));
state Future<Void> selfRemoved = monitorRemoval(self);
TraceEvent("BlobWorkerInit", self->id);