diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 0597418a3c..21deecada8 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1182,6 +1182,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 ); init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 ); init( BLOB_MANAGER_CONCURRENT_MERGE_CHECKS, 64 ); if( randomize && BUGGIFY ) BLOB_MANAGER_CONCURRENT_MERGE_CHECKS = 1 << deterministicRandom()->randomInt(0, 7); + init( BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING, true ); if( randomize && BUGGIFY ) BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING = false; + init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE, 2.0 ); if( randomize && BUGGIFY ) BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE = (1.0 + deterministicRandom()->random01() * 2); + init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER, 3 ); + init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER, 10 ); init( BLOB_MANIFEST_BACKUP, false ); init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 600.0 ); init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 14d39ba439..6852401958 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1201,6 +1201,10 @@ public: double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX; double BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT; int BLOB_MANAGER_CONCURRENT_MERGE_CHECKS; + bool BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING; + double BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE; + int BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER; + int BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER; double BGCC_TIMEOUT; double BGCC_MIN_INTERVAL; bool BLOB_MANIFEST_BACKUP; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 57e33638dd..843abb30a9 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -225,8 +225,10 @@ struct RangeAssignment { // FIXME: namespace? struct BlobWorkerInfo { int numGranulesAssigned; + int recentGranulesAssigned; - BlobWorkerInfo(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {} + BlobWorkerInfo(int numGranulesAssigned = 0, int recentGranulesAssigned = 0) + : numGranulesAssigned(numGranulesAssigned), recentGranulesAssigned(recentGranulesAssigned) {} }; // recover is when the BM assigns an ambiguously owned range on recovery @@ -294,6 +296,8 @@ struct BlobManagerStats { Counter granulesFullyPurged; Counter granulesPartiallyPurged; Counter filesPurged; + Counter granulesHitMedianLimit; + Future logger; int64_t activeMerges; int64_t blockedAssignments; @@ -317,8 +321,9 @@ struct BlobManagerStats { ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc), granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc), - filesPurged("FilesPurged", cc), activeMerges(0), blockedAssignments(0), lastFlushVersion(0), - lastMLogTruncationVersion(0), lastManifestSeqNo(0), lastManifestDumpTs(0), manifestSizeInBytes(0) { + filesPurged("FilesPurged", cc), granulesHitMedianLimit("GranulesHitMedianLimit", cc), activeMerges(0), + blockedAssignments(0), lastFlushVersion(0), lastMLogTruncationVersion(0), lastManifestSeqNo(0), + lastManifestDumpTs(0), manifestSizeInBytes(0) { specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); }); specialCounter(cc, "Epoch", [epoch]() { return epoch; }); specialCounter(cc, "ActiveMerges", [this]() { return this->activeMerges; }); @@ -390,6 +395,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { std::unordered_map workersById; std::unordered_map workerStats; // mapping between workerID -> workerStats + std::deque recentBWAssignments; std::unordered_set workerAddresses; std::unordered_set deadWorkers; std::unordered_map workerAffinities; @@ -845,7 +851,6 @@ ACTOR Future pickWorkerForAssign(Reference bmData, } int minGranulesAssigned = INT_MAX; - std::vector eligibleWorkers; // because lowest number of granules worker(s) might not exactly have the lowest memory for various reasons, if we // got blob_worker_full as the error last time, sometimes just pick a random worker that wasn't the last one we @@ -853,50 +858,90 @@ ACTOR Future pickWorkerForAssign(Reference bmData, if (bmData->workerStats.size() >= 2 && previousFailure.present() && previousFailure.get().second.code() == error_code_blob_worker_full && deterministicRandom()->coinflip()) { CODE_PROBE(true, "randomly picking worker due to blob_worker_full"); - eligibleWorkers.reserve(bmData->workerStats.size()); + std::vector randomWorkers; + randomWorkers.reserve(bmData->workerStats.size()); for (auto& it : bmData->workerStats) { if (it.first != previousFailure.get().first) { - eligibleWorkers.push_back(it.first); + randomWorkers.push_back(it.first); } } - ASSERT(!eligibleWorkers.empty()); - int randomIdx = deterministicRandom()->randomInt(0, eligibleWorkers.size()); + ASSERT(!randomWorkers.empty()); + int randomIdx = deterministicRandom()->randomInt(0, randomWorkers.size()); if (BM_DEBUG) { fmt::print("picked worker {0} randomly since previous attempt got blob_worker_full\n", - eligibleWorkers[randomIdx].toString().substr(0, 5)); + randomWorkers[randomIdx].toString().substr(0, 5)); } - return eligibleWorkers[randomIdx]; + return randomWorkers[randomIdx]; } + // recent granules assigned + std::vector medianCalc; + // UID, total granules assigned, recent granules assigned + std::vector> eligibleWorkers; + + Optional excludeIfRecentOver; for (auto const& worker : bmData->workerStats) { UID currId = worker.first; - int granulesAssigned = worker.second.numGranulesAssigned; // if previous attempt failed and that worker is still present, ignore it if (bmData->workerStats.size() >= 2 && previousFailure.present() && previousFailure.get().first == currId) { continue; } + eligibleWorkers.push_back({ currId, worker.second.numGranulesAssigned, worker.second.recentGranulesAssigned }); + medianCalc.push_back(worker.second.recentGranulesAssigned); + } + + if (SERVER_KNOBS->BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING && medianCalc.size() > 1 && + bmData->recentBWAssignments.size() >= + SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER * bmData->workerStats.size()) { + CODE_PROBE(true, "blob manager enabling median assignment limiting"); + // FIXME: make more efficient with quick select + std::sort(medianCalc.begin(), medianCalc.end()); + // round down in case of even number of workers to be more conservative + int medianIdx = (medianCalc.size() - 1) / 2; + // protect against bad knob values + double multiplyFactor = std::max(1.0, SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE); + excludeIfRecentOver = std::max(1, (int)(multiplyFactor * medianCalc[medianIdx])); + } + + std::vector finalEligibleWorkers; + bool anyOverLimit = false; + for (auto& it : eligibleWorkers) { + UID currId = std::get<0>(it); + int granulesAssigned = std::get<1>(it); + int recentGranulesAssigned = std::get<2>(it); + + if (excludeIfRecentOver.present() && recentGranulesAssigned > excludeIfRecentOver.get()) { + anyOverLimit = true; + continue; + } + if (granulesAssigned <= minGranulesAssigned) { if (granulesAssigned < minGranulesAssigned) { - eligibleWorkers.clear(); + finalEligibleWorkers.clear(); minGranulesAssigned = granulesAssigned; } - eligibleWorkers.emplace_back(currId); + finalEligibleWorkers.emplace_back(currId); } } + if (anyOverLimit) { + CODE_PROBE(true, "BM excluding BW due to median assignment algorithm"); + ++bmData->stats.granulesHitMedianLimit; + } + // pick a random worker out of the eligible workers - ASSERT(eligibleWorkers.size() > 0); - int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size()); + ASSERT(finalEligibleWorkers.size() > 0); + int idx = deterministicRandom()->randomInt(0, finalEligibleWorkers.size()); if (BM_DEBUG) { fmt::print("picked worker {0}, which has a minimal number ({1}) of granules assigned\n", - eligibleWorkers[idx].toString().substr(0, 5), + finalEligibleWorkers[idx].toString().substr(0, 5), minGranulesAssigned); } - return eligibleWorkers[idx]; + return finalEligibleWorkers[idx]; } // circular dependency between handleRangeAssign and doRangeAssignment @@ -932,6 +977,25 @@ ACTOR Future doRangeAssignment(Reference bmData, fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch); } workerID = _workerId; + + if (SERVER_KNOBS->BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING) { + // this worker is guaranteed to be in the map because we just picked it from the map in the most recent + // now() + bmData->recentBWAssignments.push_back(workerID.get()); + bmData->workerStats[workerID.get()].recentGranulesAssigned++; + + if (bmData->recentBWAssignments.size() >= + SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER * bmData->workerStats.size()) { + UID workerIdToPop = bmData->recentBWAssignments.front(); + bmData->recentBWAssignments.pop_front(); + // worker could no longer exist now + auto it = bmData->workerStats.find(workerIdToPop); + if (it != bmData->workerStats.end()) { + it->second.recentGranulesAssigned--; + } + } + } + // We don't have to check for races with an overlapping assignment because it would insert over us in the // actor map, cancelling this actor before it got here bmData->workerAssignments.insert(assignment.keyRange, workerID.get());