diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 856ee04593..31a8c31eb7 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -260,7 +260,10 @@ ACTOR Future getRate(UID myID, Reference> db, int64 } when ( wait( nextRequestTimer ) ) { nextRequestTimer = Never(); - bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE; + double nowTime = now(); + bool detailed = nowTime - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE; + for(auto& [tagName, cost] : *transactionTagCommitCostEst) + cost.existTime = nowTime - cost.existTime; reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply( GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index fd240b5bf4..be23c3af61 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -534,6 +534,7 @@ struct RatekeeperData { Database db; Map storageQueueInfo; + int validSS = 0, numBusySS = 0; Map tlogQueueInfo; std::map proxyInfo; @@ -578,6 +579,7 @@ struct RatekeeperData { SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), autoThrottlingEnabled(false) { expiredTagThrottleCleanup = recurring([this](){ ThrottleApi::expire(this->db); }, SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); + smoothMeanShardSize.reset(SERVER_KNOBS->MIN_SHARD_BYTES); } }; @@ -852,11 +854,14 @@ ACTOR Future monitorDDMetricsChanges(RatekeeperData *self, Referenceget().distributor.get().dataDistributorMetrics.getReply( GetDataDistributorMetricsRequest(normalKeys, std::numeric_limits::max(), true) ) ) ); if(reply.isError()) continue; - if(isFirstRep) { - self->smoothMeanShardSize.reset(reply.get().meanShardSize); - isFirstRep = false; + if(reply.get().meanShardSize > 0) { + if (isFirstRep) { + self->smoothMeanShardSize.reset(reply.get().meanShardSize); + isFirstRep = false; + } else self->smoothMeanShardSize.setTotal(reply.get().meanShardSize); } - else self->smoothMeanShardSize.setTotal(reply.get().meanShardSize); + if(deterministicRandom()->random01() < 0.01) + TraceEvent("RkMeanShardSize").detail("SmoothTotal", self->smoothMeanShardSize.smoothTotal()); }; } @@ -892,7 +897,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { limitReason_t limitReason = limitReason_t::unlimited; int sscount = 0; - + int writeSaturatedSSCount = 0; int64_t worstFreeSpaceStorageServer = std::numeric_limits::max(); int64_t worstStorageQueueStorageServer = 0; int64_t limitingStorageQueueStorageServer = 0; @@ -941,8 +946,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0); - if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) { - tryAutoThrottleTag(self, ss); + if(limits->priority == TransactionPriority::DEFAULT){ + // write saturation + if(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES && storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) + writeSaturatedSSCount ++; + // read saturation + if(storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { + tryAutoThrottleTag(self, ss); + } } double inputRate = ss.smoothInputBytes.smoothRate(); @@ -995,6 +1006,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { ssReasons[ss.id] = ssLimitReason; } + self->validSS = sscount; + self->numBusySS = writeSaturatedSSCount; std::set>> ignoredMachines; for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) { @@ -1222,6 +1235,46 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } } +void updateCommitCostEstimation(RatekeeperData* self, TransactionTagMap const& costEstimation) { + if(self->validSS <= 0) return; + int opsSum = 0; + double bytesSum = 0; + std::multimap, TransactionTag> costTagReverseIndex; + for(const auto& [tagName, costEst] : costEstimation) { + if(self->throttledTags.tagData.count(tagName) == 0) continue; + int ops = costEst.numClear + costEst.numAtomicWrite + costEst.numWrite; + opsSum += ops; + double bytes = costEst.bytesClearEst + costEst.bytesAtomicWrite + costEst.bytesWrite + costEst.numClearShards * self->smoothMeanShardSize.smoothTotal(); + bytesSum += bytes; + costTagReverseIndex.emplace(std::make_pair(-bytes, -ops), tagName); + } + + ASSERT(self->validSS > 0); + int throttledNum = std::max(1, (int)(self->throttledTags.tagData.size() * self->numBusySS / self->validSS)); + // calculate fractionalBusyness + for(auto& [byteOps, tagName] : costTagReverseIndex) { // descending order + if(throttledNum <= 0) break; + if(self->throttledTags.manualThrottledTags.count(tagName) > 0) continue; + double fractionalBusyness = 0.5 * -byteOps.first / (bytesSum+1) + 0.5 * -byteOps.second / (opsSum+1); + if(self->throttledTags.autoThrottledTags.count(tagName) > 0) { // has been throttled + self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness); + } + else { // new auto-throttled + double opsRate = -byteOps.second / (costEstimation.find(tagName)->second.existTime + 1); + if(fractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && opsRate > SERVER_KNOBS->MIN_TAG_COST) + { + Optional clientRate = self->throttledTags.autoThrottleTag(self->id, tagName, fractionalBusyness); + if(clientRate.present()) { + TagSet tags; + tags.addTag(tagName); + + self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION)); + } + } + } + throttledNum --; + } +} ACTOR Future configurationMonitor(RatekeeperData *self) { loop { state ReadYourWritesTransaction tr(self->db); @@ -1308,10 +1361,6 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference 0) { self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions ); @@ -1325,6 +1374,9 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceMETRIC_UPDATE_RATE; + // TODO process commitCostEstimation + updateCommitCostEstimation(&self, req.throttledTagCommitCostEst); + if(p.lastThrottledTagChangeId != self.throttledTagChangeId || now() < p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { p.lastThrottledTagChangeId = self.throttledTagChangeId; p.lastTagPushTime = now(); diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index 43bbb233e7..71ec372ecc 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -84,6 +84,9 @@ struct TransactionCommitCostEstimation { uint64_t bytesAtomicWrite = 0; uint64_t bytesClearEst = 0; + double existTime; + TransactionCommitCostEstimation(): existTime(now()) {} + template void serialize(Ar& ar) { serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear, numClearShards);