add write throttling

This commit is contained in:
Xiaoxi Wang 2020-07-28 03:49:47 +00:00
parent 48a0fb5154
commit 41a3e6c853
3 changed files with 70 additions and 12 deletions

View File

@ -260,7 +260,10 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
double nowTime = now();
bool detailed = nowTime - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
for(auto& [tagName, cost] : *transactionTagCommitCostEst)
cost.existTime = nowTime - cost.existTime;
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,

View File

@ -534,6 +534,7 @@ struct RatekeeperData {
Database db;
Map<UID, StorageQueueInfo> storageQueueInfo;
int validSS = 0, numBusySS = 0;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, ProxyInfo> proxyInfo;
@ -578,6 +579,7 @@ struct RatekeeperData {
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
autoThrottlingEnabled(false) {
expiredTagThrottleCleanup = recurring([this](){ ThrottleApi::expire(this->db); }, SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
smoothMeanShardSize.reset(SERVER_KNOBS->MIN_SHARD_BYTES);
}
};
@ -852,11 +854,14 @@ ACTOR Future<Void> monitorDDMetricsChanges(RatekeeperData *self, Reference<Async
db->get().distributor.get().dataDistributorMetrics.getReply( GetDataDistributorMetricsRequest(normalKeys, std::numeric_limits<int>::max(), true) ) ) );
if(reply.isError()) continue;
if(isFirstRep) {
self->smoothMeanShardSize.reset(reply.get().meanShardSize);
isFirstRep = false;
if(reply.get().meanShardSize > 0) {
if (isFirstRep) {
self->smoothMeanShardSize.reset(reply.get().meanShardSize);
isFirstRep = false;
} else self->smoothMeanShardSize.setTotal(reply.get().meanShardSize);
}
else self->smoothMeanShardSize.setTotal(reply.get().meanShardSize);
if(deterministicRandom()->random01() < 0.01)
TraceEvent("RkMeanShardSize").detail("SmoothTotal", self->smoothMeanShardSize.smoothTotal());
};
}
@ -892,7 +897,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
limitReason_t limitReason = limitReason_t::unlimited;
int sscount = 0;
int writeSaturatedSSCount = 0;
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
@ -941,8 +946,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
tryAutoThrottleTag(self, ss);
if(limits->priority == TransactionPriority::DEFAULT){
// write saturation
if(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES && storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)
writeSaturatedSSCount ++;
// read saturation
if(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
tryAutoThrottleTag(self, ss);
}
}
double inputRate = ss.smoothInputBytes.smoothRate();
@ -995,6 +1006,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
ssReasons[ss.id] = ssLimitReason;
}
self->validSS = sscount;
self->numBusySS = writeSaturatedSSCount;
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
@ -1222,6 +1235,46 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
}
}
void updateCommitCostEstimation(RatekeeperData* self, TransactionTagMap<TransactionCommitCostEstimation> const& costEstimation) {
if(self->validSS <= 0) return;
int opsSum = 0;
double bytesSum = 0;
std::multimap<std::pair<double, int>, TransactionTag> costTagReverseIndex;
for(const auto& [tagName, costEst] : costEstimation) {
if(self->throttledTags.tagData.count(tagName) == 0) continue;
int ops = costEst.numClear + costEst.numAtomicWrite + costEst.numWrite;
opsSum += ops;
double bytes = costEst.bytesClearEst + costEst.bytesAtomicWrite + costEst.bytesWrite + costEst.numClearShards * self->smoothMeanShardSize.smoothTotal();
bytesSum += bytes;
costTagReverseIndex.emplace(std::make_pair(-bytes, -ops), tagName);
}
ASSERT(self->validSS > 0);
int throttledNum = std::max(1, (int)(self->throttledTags.tagData.size() * self->numBusySS / self->validSS));
// calculate fractionalBusyness
for(auto& [byteOps, tagName] : costTagReverseIndex) { // descending order
if(throttledNum <= 0) break;
if(self->throttledTags.manualThrottledTags.count(tagName) > 0) continue;
double fractionalBusyness = 0.5 * -byteOps.first / (bytesSum+1) + 0.5 * -byteOps.second / (opsSum+1);
if(self->throttledTags.autoThrottledTags.count(tagName) > 0) { // has been throttled
self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness);
}
else { // new auto-throttled
double opsRate = -byteOps.second / (costEstimation.find(tagName)->second.existTime + 1);
if(fractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && opsRate > SERVER_KNOBS->MIN_TAG_COST)
{
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness);
if(clientRate.present()) {
TagSet tags;
tags.addTag(tagName);
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
}
}
}
throttledNum --;
}
}
ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
loop {
state ReadYourWritesTransaction tr(self->db);
@ -1308,10 +1361,6 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
for(auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
// TODO process commitCostEstimation
// for (const auto &[tagName, cost] : req.throttledTagCommitCostEst) {
//
// }
}
if(p.batchTransactions > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );
@ -1325,6 +1374,9 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxyInfo.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
// TODO process commitCostEstimation
updateCommitCostEstimation(&self, req.throttledTagCommitCostEst);
if(p.lastThrottledTagChangeId != self.throttledTagChangeId || now() < p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) {
p.lastThrottledTagChangeId = self.throttledTagChangeId;
p.lastTagPushTime = now();

View File

@ -84,6 +84,9 @@ struct TransactionCommitCostEstimation {
uint64_t bytesAtomicWrite = 0;
uint64_t bytesClearEst = 0;
double existTime;
TransactionCommitCostEstimation(): existTime(now()) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear, numClearShards);