No RK throttling on blob workers if no blob ranges (#9425)
This commit is contained in:
parent
5befe6541e
commit
33c0b35ee6
|
@ -247,6 +247,21 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<bool> checkAnyBlobRanges(Database db) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
// FIXME: check if any active ranges. This still returns true if there are inactive ranges, but it
|
||||
// mostly serves its purpose to allow setting blob_granules_enabled=1 on a cluster that has no blob
|
||||
// workers currently.
|
||||
RangeResult anyData = wait(tr.getRange(blobRangeKeys, 1));
|
||||
return !anyData.empty();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorBlobWorkers(Ratekeeper* self, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
state std::vector<BlobWorkerInterface> blobWorkers;
|
||||
state int workerFetchCount = 0;
|
||||
|
@ -257,6 +272,7 @@ public:
|
|||
|
||||
loop {
|
||||
while (!self->configuration.blobGranulesEnabled) {
|
||||
// FIXME: clear blob worker state if granules were previously enabled?
|
||||
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
|
||||
}
|
||||
|
||||
|
@ -267,8 +283,9 @@ public:
|
|||
(SERVER_KNOBS->METRIC_UPDATE_RATE * FLOW_KNOBS->DELAY_JITTER_OFFSET);
|
||||
if (++workerFetchCount == fetchAmount || blobWorkerDead) {
|
||||
workerFetchCount = 0;
|
||||
std::vector<BlobWorkerInterface> _blobWorkers = wait(getBlobWorkers(self->db, true, &grv));
|
||||
blobWorkers = _blobWorkers;
|
||||
state Future<bool> anyBlobRangesCheck = checkAnyBlobRanges(self->db);
|
||||
wait(store(blobWorkers, getBlobWorkers(self->db, true, &grv)));
|
||||
wait(store(self->anyBlobRanges, anyBlobRangesCheck));
|
||||
} else {
|
||||
grv = self->maxVersion;
|
||||
}
|
||||
|
@ -635,7 +652,7 @@ Ratekeeper::Ratekeeper(UID id, Database db)
|
|||
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
|
||||
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH,
|
||||
SERVER_KNOBS->TARGET_BW_LAG_BATCH),
|
||||
maxVersion(0), blobWorkerTime(now()), unblockedAssignmentTime(now()) {
|
||||
maxVersion(0), blobWorkerTime(now()), unblockedAssignmentTime(now()), anyBlobRanges(false) {
|
||||
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
|
||||
tagThrottler = std::make_unique<GlobalTagThrottler>(db, id, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND);
|
||||
} else {
|
||||
|
@ -897,7 +914,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (configuration.blobGranulesEnabled && SERVER_KNOBS->BW_THROTTLING_ENABLED) {
|
||||
if (configuration.blobGranulesEnabled && SERVER_KNOBS->BW_THROTTLING_ENABLED && anyBlobRanges) {
|
||||
Version lastBWVer = 0;
|
||||
auto lastIter = version_transactions.end();
|
||||
if (!blobWorkerVersionHistory.empty()) {
|
||||
|
|
|
@ -206,6 +206,7 @@ class Ratekeeper {
|
|||
std::map<Version, Ratekeeper::VersionInfo> version_transactions;
|
||||
std::map<Version, std::pair<double, Optional<double>>> version_recovery;
|
||||
Deque<std::pair<double, Version>> blobWorkerVersionHistory;
|
||||
bool anyBlobRanges;
|
||||
Optional<Key> remoteDC;
|
||||
|
||||
double getRecoveryDuration(Version ver) const {
|
||||
|
|
Loading…
Reference in New Issue