More blob granule operational stuff (#7783)

* More blob manager metrics

* making blobrange check command work for large ranges
This commit is contained in:
Josh Slocum 2022-08-03 18:11:25 -05:00 committed by GitHub
parent b9d156f0d9
commit 1cda8a2fc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 14 deletions

View File

@ -99,20 +99,68 @@ ACTOR Future<Void> doBlobPurge(Database db, Key startKey, Key endKey, Optional<V
return Void();
}
ACTOR Future<Version> checkBlobSubrange(Database db, KeyRange keyRange, Optional<Version> version) {
state Transaction tr(db);
state Version readVersionOut = invalidVersion;
loop {
try {
wait(success(tr.readBlobGranules(keyRange, 0, version, &readVersionOut)));
return readVersionOut;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> doBlobCheck(Database db, Key startKey, Key endKey, Optional<Version> version) {
state Transaction tr(db);
state Version readVersionOut = invalidVersion;
state double elapsed = -timer_monotonic();
state KeyRange range = KeyRange(KeyRangeRef(startKey, endKey));
state Standalone<VectorRef<KeyRangeRef>> allRanges;
loop {
try {
wait(success(tr.readBlobGranules(KeyRange(KeyRangeRef(startKey, endKey)), 0, version, &readVersionOut)));
elapsed += timer_monotonic();
wait(store(allRanges, tr.getBlobGranuleRanges(range)));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
if (allRanges.empty()) {
fmt::print("ERROR: No blob ranges for [{0} - {1})\n", startKey.printable(), endKey.printable());
return Void();
}
fmt::print("Loaded {0} blob ranges to check\n", allRanges.size());
state std::vector<Future<Version>> checkParts;
// chunk up to smaller ranges than max
int maxChunkSize = 1000;
KeyRange currentChunk;
int currentChunkSize = 0;
for (auto& it : allRanges) {
if (currentChunkSize == maxChunkSize) {
checkParts.push_back(checkBlobSubrange(db, currentChunk, version));
currentChunkSize = 0;
}
if (currentChunkSize == 0) {
currentChunk = it;
} else if (it.begin != currentChunk.end) {
fmt::print("ERROR: Blobrange check failed, gap in blob ranges from [{0} - {1})\n",
currentChunk.end.printable(),
it.begin.printable());
return Void();
} else {
currentChunk = KeyRangeRef(currentChunk.begin, it.end);
}
currentChunkSize++;
}
checkParts.push_back(checkBlobSubrange(db, currentChunk, version));
wait(waitForAll(checkParts));
readVersionOut = checkParts.back().get();
elapsed += timer_monotonic();
fmt::print("Blob check complete for [{0} - {1}) @ {2} in {3:.6f} seconds\n",
startKey.printable(),
endKey.printable(),

View File

@ -259,6 +259,7 @@ struct BlobManagerStats {
Counter granuleSplits;
Counter granuleWriteHotSplits;
Counter granuleMerges;
Counter ccGranulesChecked;
Counter ccRowsChecked;
Counter ccBytesChecked;
@ -270,16 +271,27 @@ struct BlobManagerStats {
Counter granulesPartiallyPurged;
Counter filesPurged;
Future<Void> logger;
int64_t activeMerges;
// Current stats maintained for a given blob worker process
explicit BlobManagerStats(UID id, double interval, std::unordered_map<UID, BlobWorkerInterface>* workers)
explicit BlobManagerStats(UID id,
double interval,
int64_t epoch,
std::unordered_map<UID, BlobWorkerInterface>* workers,
std::unordered_map<Key, bool>* mergeHardBoundaries,
std::unordered_map<Key, BlobGranuleMergeBoundary>* mergeBoundaries)
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc),
ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc),
ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc), granuleMerges("GranuleMerges", cc),
ccGranulesChecked("CCGranulesChecked", cc), ccRowsChecked("CCRowsChecked", cc),
ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), ccTimeouts("CCTimeouts", cc),
ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc),
granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc),
filesPurged("FilesPurged", cc) {
filesPurged("FilesPurged", cc), activeMerges(0) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
specialCounter(cc, "Epoch", [epoch]() { return epoch; });
specialCounter(cc, "ActiveMerges", [this]() { return this->activeMerges; });
specialCounter(cc, "HardBoundaries", [mergeHardBoundaries]() { return mergeHardBoundaries->size(); });
specialCounter(cc, "SoftBoundaries", [mergeBoundaries]() { return mergeBoundaries->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
};
@ -364,18 +376,23 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
Promise<Void> foundBlobWorkers;
Promise<Void> doneRecovering;
int64_t epoch = -1;
int64_t epoch;
int64_t seqNo = 1;
Promise<Void> iAmReplaced;
BlobManagerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
BlobManagerData(UID id,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Database db,
Optional<Key> dcId,
int64_t epoch)
: id(id), db(db), dcId(dcId),
stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, epoch, &workersById, &mergeHardBoundaries, &mergeBoundaries),
knownBlobRanges(false, normalKeys.end), tenantData(BGTenantMap(dbInfo)),
mergeCandidates(MergeCandidateInfo(MergeCandidateUnknown), normalKeys.end),
activeGranuleMerges(invalidVersion, normalKeys.end),
concurrentMergeChecks(SERVER_KNOBS->BLOB_MANAGER_CONCURRENT_MERGE_CHECKS),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0), epoch(epoch) {}
// only initialize blob store if actually needed
void initBStore() {
@ -1874,6 +1891,7 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
std::vector<UID> parentGranuleIDs,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions) {
++bmData->stats.activeMerges;
// wait for BM to be fully recovered before starting actual merges
wait(bmData->doneRecovering.getFuture());
@ -1920,6 +1938,8 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
BoundaryEvaluation(bmData->epoch, seqnoForEval, BoundaryEvalType::MERGE, 0, 0));
bmData->setMergeCandidate(mergeRange, MergeCandidateMerging);
--bmData->stats.activeMerges;
return Void();
}
@ -1937,6 +1957,8 @@ ACTOR Future<Void> doMerge(Reference<BlobManagerData> bmData,
}
ranges.push_back(std::get<1>(toMerge.back()).end);
++bmData->stats.granuleMerges;
try {
std::pair<UID, Version> persistMerge =
wait(persistMergeGranulesStart(bmData, mergeRange, ids, ranges, startVersions));
@ -4251,7 +4273,8 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
makeReference<BlobManagerData>(deterministicRandom()->randomUniqueID(),
dbInfo,
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
bmInterf.locality.dcId());
bmInterf.locality.dcId(),
epoch);
state Future<Void> collection = actorCollection(self->addActor.getFuture());
@ -4260,8 +4283,6 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
}
TraceEvent("BlobManagerInit", bmInterf.id()).detail("Epoch", epoch).log();
self->epoch = epoch;
// although we start the recruiter, we wait until existing workers are ack'd
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });