Set throttling ratio in GlobalTagThrottler::tryUpdateAutoThrottling
This commit is contained in:
parent
f41515e66b
commit
20ac60fb11
|
@ -527,6 +527,8 @@ public:
|
||||||
int64_t manualThrottleCount() const { return 0; }
|
int64_t manualThrottleCount() const { return 0; }
|
||||||
|
|
||||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
|
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
|
||||||
|
throttlingRatios[ss.id] = ss.getThrottlingRatio(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
||||||
|
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||||
for (const auto& busyReadTag : ss.busiestReadTags) {
|
for (const auto& busyReadTag : ss.busiestReadTags) {
|
||||||
throughput[ss.id][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
|
throughput[ss.id][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
|
||||||
}
|
}
|
||||||
|
@ -536,8 +538,6 @@ public:
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setThrottlingRatio(UID storageServerId, Optional<double> ratio) { throttlingRatios[storageServerId] = ratio; }
|
|
||||||
|
|
||||||
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
||||||
tagStatistics[tag].setQuota(tagQuotaValue);
|
tagStatistics[tag].setQuota(tagQuotaValue);
|
||||||
}
|
}
|
||||||
|
@ -580,10 +580,6 @@ Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const&
|
||||||
return impl->tryUpdateAutoThrottling(ss);
|
return impl->tryUpdateAutoThrottling(ss);
|
||||||
}
|
}
|
||||||
|
|
||||||
void GlobalTagThrottler::setThrottlingRatio(UID storageServerId, Optional<double> ratio) {
|
|
||||||
return impl->setThrottlingRatio(storageServerId, ratio);
|
|
||||||
}
|
|
||||||
|
|
||||||
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
||||||
return impl->setQuota(tag, tagQuotaValue);
|
return impl->setQuota(tag, tagQuotaValue);
|
||||||
}
|
}
|
||||||
|
@ -645,18 +641,10 @@ public:
|
||||||
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
||||||
result.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
|
result.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
|
||||||
}
|
}
|
||||||
|
result.lastReply.bytesInput = ((totalReadCost.smoothRate() + totalWriteCost.smoothRate()) / targetCost) *
|
||||||
|
SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<double> getThrottlingRatio() const {
|
|
||||||
auto const springCost = 0.2 * targetCost;
|
|
||||||
auto const currentCost = totalReadCost.smoothRate() + totalWriteCost.smoothRate();
|
|
||||||
if (currentCost < targetCost - springCost) {
|
|
||||||
return {};
|
|
||||||
} else {
|
|
||||||
return std::max(0.0, ((targetCost + springCost) - currentCost) / springCost);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class StorageServerCollection {
|
class StorageServerCollection {
|
||||||
|
@ -693,14 +681,6 @@ public:
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<UID, Optional<double>> getThrottlingRatios() const {
|
|
||||||
std::map<UID, Optional<double>> result;
|
|
||||||
for (int i = 0; i < storageServers.size(); ++i) {
|
|
||||||
result[UID(i, i)] = storageServers[i].getThrottlingRatio();
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
|
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
|
||||||
|
@ -766,10 +746,6 @@ ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTag
|
||||||
for (const auto& sq : storageQueueInfos) {
|
for (const auto& sq : storageQueueInfos) {
|
||||||
globalTagThrottler->tryUpdateAutoThrottling(sq);
|
globalTagThrottler->tryUpdateAutoThrottling(sq);
|
||||||
}
|
}
|
||||||
auto const throttlingRatios = storageServers->getThrottlingRatios();
|
|
||||||
for (const auto& [id, ratio] : throttlingRatios) {
|
|
||||||
globalTagThrottler->setThrottlingRatio(id, ratio);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1018,22 +1018,13 @@ void StorageQueueInfo::refreshCommitCost(double elapsed) {
|
||||||
totalWriteCosts = 0;
|
totalWriteCosts = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<double> StorageQueueInfo::getWriteQueueSizeLimitRatio(int64_t storageSpringBytes,
|
Optional<double> StorageQueueInfo::getThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const {
|
||||||
int64_t storageTargetBytes) const {
|
|
||||||
auto const minFreeSpace =
|
|
||||||
std::max(SERVER_KNOBS->MIN_AVAILABLE_SPACE,
|
|
||||||
(int64_t)(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * smoothTotalSpace.smoothTotal()));
|
|
||||||
auto const storageQueue = getStorageQueueBytes();
|
auto const storageQueue = getStorageQueueBytes();
|
||||||
auto const springBytes = std::max<int64_t>(
|
if (storageQueue < storageTargetBytes - storageSpringBytes) {
|
||||||
1, std::min<int64_t>(storageSpringBytes, (smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
|
|
||||||
auto const targetBytes =
|
|
||||||
std::max<int64_t>(1, std::min<int64_t>(storageTargetBytes, smoothFreeSpace.smoothTotal() - minFreeSpace));
|
|
||||||
auto const targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0);
|
|
||||||
auto const inputRate = smoothInputBytes.smoothRate();
|
|
||||||
if (targetRateRatio > 0 && inputRate > 0) {
|
|
||||||
return verySmoothDurableBytes.smoothRate() / (inputRate * targetRateRatio);
|
|
||||||
} else {
|
|
||||||
return {};
|
return {};
|
||||||
|
} else {
|
||||||
|
return std::max(
|
||||||
|
0.0, static_cast<double>((storageTargetBytes + storageSpringBytes) - storageQueue) / storageSpringBytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ public:
|
||||||
void addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost);
|
void addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost);
|
||||||
|
|
||||||
// Determine the ratio (limit / current throughput) for throttling based on write queue size
|
// Determine the ratio (limit / current throughput) for throttling based on write queue size
|
||||||
Optional<double> getWriteQueueSizeLimitRatio(int64_t storageSpringBytes, int64_t storageTargetBytes) const;
|
Optional<double> getThrottlingRatio(int64_t storageTargetBytes, int64_t storageSpringBytes) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct TLogQueueInfo {
|
struct TLogQueueInfo {
|
||||||
|
|
|
@ -90,10 +90,6 @@ public:
|
||||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
|
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
|
||||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
|
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
|
||||||
|
|
||||||
// Based on limiting storage queue size, set a ratio by which total throughput on the storage server needs to be
|
|
||||||
// adjusted
|
|
||||||
void setThrottlingRatio(UID storageServerId, Optional<double> ratio);
|
|
||||||
|
|
||||||
// Testing only:
|
// Testing only:
|
||||||
public:
|
public:
|
||||||
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
||||||
|
|
Loading…
Reference in New Issue