enabling median assignment limiting (#10805)
This commit is contained in:
parent
b20dcf23a9
commit
3bdcbef465
|
@ -1182,6 +1182,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );
|
||||
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 );
|
||||
init( BLOB_MANAGER_CONCURRENT_MERGE_CHECKS, 64 ); if( randomize && BUGGIFY ) BLOB_MANAGER_CONCURRENT_MERGE_CHECKS = 1 << deterministicRandom()->randomInt(0, 7);
|
||||
init( BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING, true ); if( randomize && BUGGIFY ) BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING = false;
|
||||
init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE, 2.0 ); if( randomize && BUGGIFY ) BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE = (1.0 + deterministicRandom()->random01() * 2);
|
||||
init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER, 3 );
|
||||
init( BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER, 10 );
|
||||
init( BLOB_MANIFEST_BACKUP, false );
|
||||
init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 600.0 );
|
||||
init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0 );
|
||||
|
|
|
@ -1201,6 +1201,10 @@ public:
|
|||
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX;
|
||||
double BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT;
|
||||
int BLOB_MANAGER_CONCURRENT_MERGE_CHECKS;
|
||||
bool BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING;
|
||||
double BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE;
|
||||
int BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER;
|
||||
int BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER;
|
||||
double BGCC_TIMEOUT;
|
||||
double BGCC_MIN_INTERVAL;
|
||||
bool BLOB_MANIFEST_BACKUP;
|
||||
|
|
|
@ -225,8 +225,10 @@ struct RangeAssignment {
|
|||
// FIXME: namespace?
|
||||
struct BlobWorkerInfo {
|
||||
int numGranulesAssigned;
|
||||
int recentGranulesAssigned;
|
||||
|
||||
BlobWorkerInfo(int numGranulesAssigned = 0) : numGranulesAssigned(numGranulesAssigned) {}
|
||||
BlobWorkerInfo(int numGranulesAssigned = 0, int recentGranulesAssigned = 0)
|
||||
: numGranulesAssigned(numGranulesAssigned), recentGranulesAssigned(recentGranulesAssigned) {}
|
||||
};
|
||||
|
||||
// recover is when the BM assigns an ambiguously owned range on recovery
|
||||
|
@ -294,6 +296,8 @@ struct BlobManagerStats {
|
|||
Counter granulesFullyPurged;
|
||||
Counter granulesPartiallyPurged;
|
||||
Counter filesPurged;
|
||||
Counter granulesHitMedianLimit;
|
||||
|
||||
Future<Void> logger;
|
||||
int64_t activeMerges;
|
||||
int64_t blockedAssignments;
|
||||
|
@ -317,8 +321,9 @@ struct BlobManagerStats {
|
|||
ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc), ccTimeouts("CCTimeouts", cc),
|
||||
ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc),
|
||||
granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc),
|
||||
filesPurged("FilesPurged", cc), activeMerges(0), blockedAssignments(0), lastFlushVersion(0),
|
||||
lastMLogTruncationVersion(0), lastManifestSeqNo(0), lastManifestDumpTs(0), manifestSizeInBytes(0) {
|
||||
filesPurged("FilesPurged", cc), granulesHitMedianLimit("GranulesHitMedianLimit", cc), activeMerges(0),
|
||||
blockedAssignments(0), lastFlushVersion(0), lastMLogTruncationVersion(0), lastManifestSeqNo(0),
|
||||
lastManifestDumpTs(0), manifestSizeInBytes(0) {
|
||||
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
|
||||
specialCounter(cc, "Epoch", [epoch]() { return epoch; });
|
||||
specialCounter(cc, "ActiveMerges", [this]() { return this->activeMerges; });
|
||||
|
@ -390,6 +395,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
|
|||
|
||||
std::unordered_map<UID, BlobWorkerInterface> workersById;
|
||||
std::unordered_map<UID, BlobWorkerInfo> workerStats; // mapping between workerID -> workerStats
|
||||
std::deque<UID> recentBWAssignments;
|
||||
std::unordered_set<NetworkAddress> workerAddresses;
|
||||
std::unordered_set<UID> deadWorkers;
|
||||
std::unordered_map<UID, UID> workerAffinities;
|
||||
|
@ -845,7 +851,6 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData,
|
|||
}
|
||||
|
||||
int minGranulesAssigned = INT_MAX;
|
||||
std::vector<UID> eligibleWorkers;
|
||||
|
||||
// because lowest number of granules worker(s) might not exactly have the lowest memory for various reasons, if we
|
||||
// got blob_worker_full as the error last time, sometimes just pick a random worker that wasn't the last one we
|
||||
|
@ -853,50 +858,90 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData,
|
|||
if (bmData->workerStats.size() >= 2 && previousFailure.present() &&
|
||||
previousFailure.get().second.code() == error_code_blob_worker_full && deterministicRandom()->coinflip()) {
|
||||
CODE_PROBE(true, "randomly picking worker due to blob_worker_full");
|
||||
eligibleWorkers.reserve(bmData->workerStats.size());
|
||||
std::vector<UID> randomWorkers;
|
||||
randomWorkers.reserve(bmData->workerStats.size());
|
||||
for (auto& it : bmData->workerStats) {
|
||||
if (it.first != previousFailure.get().first) {
|
||||
eligibleWorkers.push_back(it.first);
|
||||
randomWorkers.push_back(it.first);
|
||||
}
|
||||
}
|
||||
ASSERT(!eligibleWorkers.empty());
|
||||
int randomIdx = deterministicRandom()->randomInt(0, eligibleWorkers.size());
|
||||
ASSERT(!randomWorkers.empty());
|
||||
int randomIdx = deterministicRandom()->randomInt(0, randomWorkers.size());
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("picked worker {0} randomly since previous attempt got blob_worker_full\n",
|
||||
eligibleWorkers[randomIdx].toString().substr(0, 5));
|
||||
randomWorkers[randomIdx].toString().substr(0, 5));
|
||||
}
|
||||
|
||||
return eligibleWorkers[randomIdx];
|
||||
return randomWorkers[randomIdx];
|
||||
}
|
||||
|
||||
// recent granules assigned
|
||||
std::vector<int> medianCalc;
|
||||
// UID, total granules assigned, recent granules assigned
|
||||
std::vector<std::tuple<UID, int, int>> eligibleWorkers;
|
||||
|
||||
Optional<int> excludeIfRecentOver;
|
||||
for (auto const& worker : bmData->workerStats) {
|
||||
UID currId = worker.first;
|
||||
int granulesAssigned = worker.second.numGranulesAssigned;
|
||||
|
||||
// if previous attempt failed and that worker is still present, ignore it
|
||||
if (bmData->workerStats.size() >= 2 && previousFailure.present() && previousFailure.get().first == currId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
eligibleWorkers.push_back({ currId, worker.second.numGranulesAssigned, worker.second.recentGranulesAssigned });
|
||||
medianCalc.push_back(worker.second.recentGranulesAssigned);
|
||||
}
|
||||
|
||||
if (SERVER_KNOBS->BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING && medianCalc.size() > 1 &&
|
||||
bmData->recentBWAssignments.size() >=
|
||||
SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_MIN_SAMPLES_PER_WORKER * bmData->workerStats.size()) {
|
||||
CODE_PROBE(true, "blob manager enabling median assignment limiting");
|
||||
// FIXME: make more efficient with quick select
|
||||
std::sort(medianCalc.begin(), medianCalc.end());
|
||||
// round down in case of even number of workers to be more conservative
|
||||
int medianIdx = (medianCalc.size() - 1) / 2;
|
||||
// protect against bad knob values
|
||||
double multiplyFactor = std::max(1.0, SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_ALLOWANCE);
|
||||
excludeIfRecentOver = std::max(1, (int)(multiplyFactor * medianCalc[medianIdx]));
|
||||
}
|
||||
|
||||
std::vector<UID> finalEligibleWorkers;
|
||||
bool anyOverLimit = false;
|
||||
for (auto& it : eligibleWorkers) {
|
||||
UID currId = std::get<0>(it);
|
||||
int granulesAssigned = std::get<1>(it);
|
||||
int recentGranulesAssigned = std::get<2>(it);
|
||||
|
||||
if (excludeIfRecentOver.present() && recentGranulesAssigned > excludeIfRecentOver.get()) {
|
||||
anyOverLimit = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (granulesAssigned <= minGranulesAssigned) {
|
||||
if (granulesAssigned < minGranulesAssigned) {
|
||||
eligibleWorkers.clear();
|
||||
finalEligibleWorkers.clear();
|
||||
minGranulesAssigned = granulesAssigned;
|
||||
}
|
||||
eligibleWorkers.emplace_back(currId);
|
||||
finalEligibleWorkers.emplace_back(currId);
|
||||
}
|
||||
}
|
||||
|
||||
if (anyOverLimit) {
|
||||
CODE_PROBE(true, "BM excluding BW due to median assignment algorithm");
|
||||
++bmData->stats.granulesHitMedianLimit;
|
||||
}
|
||||
|
||||
// pick a random worker out of the eligible workers
|
||||
ASSERT(eligibleWorkers.size() > 0);
|
||||
int idx = deterministicRandom()->randomInt(0, eligibleWorkers.size());
|
||||
ASSERT(finalEligibleWorkers.size() > 0);
|
||||
int idx = deterministicRandom()->randomInt(0, finalEligibleWorkers.size());
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("picked worker {0}, which has a minimal number ({1}) of granules assigned\n",
|
||||
eligibleWorkers[idx].toString().substr(0, 5),
|
||||
finalEligibleWorkers[idx].toString().substr(0, 5),
|
||||
minGranulesAssigned);
|
||||
}
|
||||
|
||||
return eligibleWorkers[idx];
|
||||
return finalEligibleWorkers[idx];
|
||||
}
|
||||
|
||||
// circular dependency between handleRangeAssign and doRangeAssignment
|
||||
|
@ -932,6 +977,25 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
|
|||
fmt::print("Chose BW {0} for seqno {1} in BM {2}\n", _workerId.toString(), seqNo, bmData->epoch);
|
||||
}
|
||||
workerID = _workerId;
|
||||
|
||||
if (SERVER_KNOBS->BLOB_MANAGER_ENABLE_MEDIAN_ASSIGNMENT_LIMITING) {
|
||||
// this worker is guaranteed to be in the map because we just picked it from the map in the most recent
|
||||
// now()
|
||||
bmData->recentBWAssignments.push_back(workerID.get());
|
||||
bmData->workerStats[workerID.get()].recentGranulesAssigned++;
|
||||
|
||||
if (bmData->recentBWAssignments.size() >=
|
||||
SERVER_KNOBS->BLOB_MANAGER_MEDIAN_ASSIGNMENT_MAX_SAMPLES_PER_WORKER * bmData->workerStats.size()) {
|
||||
UID workerIdToPop = bmData->recentBWAssignments.front();
|
||||
bmData->recentBWAssignments.pop_front();
|
||||
// worker could no longer exist now
|
||||
auto it = bmData->workerStats.find(workerIdToPop);
|
||||
if (it != bmData->workerStats.end()) {
|
||||
it->second.recentGranulesAssigned--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We don't have to check for races with an overlapping assignment because it would insert over us in the
|
||||
// actor map, cancelling this actor before it got here
|
||||
bmData->workerAssignments.insert(assignment.keyRange, workerID.get());
|
||||
|
|
Loading…
Reference in New Issue