diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 382b38cf18..6dce9351cb 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -993,6 +993,22 @@ struct GetStorageMetricsRequest { }; struct StorageQueuingMetricsReply { + struct TagInfo { + constexpr static FileIdentifier file_identifier = 4528694; + TransactionTag tag; + double rate{ 0.0 }; + double fractionalBusyness{ 0.0 }; + + TagInfo() = default; + TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness) + : tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {} + + template + void serialize(Ar& ar) { + serializer(ar, tag, rate, fractionalBusyness); + } + }; + constexpr static FileIdentifier file_identifier = 7633366; double localTime; int64_t instanceID; // changes if bytesDurable and bytesInput reset @@ -1003,9 +1019,7 @@ struct StorageQueuingMetricsReply { double cpuUsage; double diskUsage; double localRateLimit; - Optional busiestTag; - double busiestTagFractionalBusyness; - double busiestTagRate; + std::vector busiestTags; template void serialize(Ar& ar) { @@ -1020,9 +1034,7 @@ struct StorageQueuingMetricsReply { cpuUsage, diskUsage, localRateLimit, - busiestTag, - busiestTagFractionalBusyness, - busiestTagRate); + busiestTags); } }; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 5cd5d148e3..2726c039fe 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -118,6 +118,8 @@ set(FDBSERVER_SRCS RestoreWorker.actor.h RestoreWorkerInterface.actor.cpp RestoreWorkerInterface.actor.h + RkTagThrottleCollection.cpp + RkTagThrottleCollection.h RocksDBCheckpointUtils.actor.cpp RocksDBCheckpointUtils.actor.h RoleLineage.actor.cpp @@ -152,6 +154,8 @@ set(FDBSERVER_SRCS TesterInterface.actor.h TLogInterface.h TLogServer.actor.cpp + TransactionTagCounter.cpp + TransactionTagCounter.h TSSMappingUtil.actor.cpp TSSMappingUtil.actor.h VersionedBTree.actor.cpp diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 9b8b62e5ac..94e41d0f52 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -157,32 +157,7 @@ public: ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.reset(reply.get().version); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version); - } - - myQueueInfo->value.busiestReadTag = reply.get().busiestTag; - myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; - myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate; + myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes); } else { if (myQueueInfo->value.valid) { TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id()); @@ -210,24 +185,7 @@ public: ErrorOr reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor( TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - } + myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes); } else { if (myQueueInfo->value.valid) { TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id()); @@ -290,9 +248,7 @@ public: self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(self.monitorThrottlingChanges()); - Ratekeeper* selfPtr = &self; // let flow compiler capture self - self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); }, - SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + self.addActor.send(self.refreshStorageServerCommitCosts()); TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) @@ -412,6 +368,19 @@ public: return Void(); } + ACTOR static Future refreshStorageServerCommitCosts(Ratekeeper* self) { + state double lastBusiestCommitTagPick; + loop { + lastBusiestCommitTagPick = now(); + wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + double elapsed = now() - lastBusiestCommitTagPick; + // for each SS, select the busiest commit tag from ssTrTagCommitCost + for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) { + ssQueueInfo.refreshCommitCost(elapsed); + } + } + } + }; // class RatekeeperImpl Future Ratekeeper::configurationMonitor() { @@ -464,11 +433,8 @@ Ratekeeper::Ratekeeper(UID id, Database db) SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, - SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), - lastBusiestCommitTagPick(0.0) { + SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) { tagThrottler = std::make_unique(db, id); - expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); }, - SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); } void Ratekeeper::updateCommitCostEstimation( @@ -478,9 +444,7 @@ void Ratekeeper::updateCommitCostEstimation( if (tagCostIt == costEstimation.end()) continue; for (const auto& [tagName, cost] : tagCostIt->second) { - it->value.tagCostEst[tagName] += cost; - it->value.totalWriteCosts += cost.getCostSum(); - it->value.totalWriteOps += cost.getOpsSum(); + it->value.addCommitCost(tagName, cost); } } } @@ -558,10 +522,10 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) { } } - int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal(); + int64_t storageQueue = ss.getStorageQueueBytes(); worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue); - int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal(); + int64_t storageDurabilityLag = ss.getDurabilityLag(); worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag); storageDurabilityLagReverseIndex.insert(std::make_pair(-1 * storageDurabilityLag, &ss)); @@ -575,7 +539,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) { double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0); if (limits->priority == TransactionPriority::DEFAULT) { - addActor.send(tagThrottler->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag)); + addActor.send(tagThrottler->tryUpdateAutoThrottling(ss)); } double inputRate = ss.smoothInputBytes.smoothRate(); @@ -763,11 +727,11 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) { auto& tl = it.value; if (!tl.valid) continue; - maxTLVer = std::max(maxTLVer, tl.lastReply.v); + maxTLVer = std::max(maxTLVer, tl.getLastCommittedVersion()); } if (minSSVer != std::numeric_limits::max() && maxTLVer != std::numeric_limits::min()) { - // writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed + // writeToReadLatencyLimit: 0 = infinite speed; 1 = TL durable speed ; 2 = half TL durable speed writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4); worstVersionLag = std::max((Version)0, maxTLVer - minSSVer); @@ -966,54 +930,137 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) { } } -Future Ratekeeper::refreshStorageServerCommitCost() { - if (lastBusiestCommitTagPick == 0) { // the first call should be skipped - lastBusiestCommitTagPick = now(); - return Void(); - } - double elapsed = now() - lastBusiestCommitTagPick; - // for each SS, select the busiest commit tag from ssTrTagCommitCost - for (auto it = storageQueueInfo.begin(); it != storageQueueInfo.end(); ++it) { - it->value.busiestWriteTag.reset(); - TransactionTag busiestTag; - TransactionCommitCostEstimation maxCost; - double maxRate = 0, maxBusyness = 0; - for (const auto& [tag, cost] : it->value.tagCostEst) { - double rate = cost.getCostSum() / elapsed; - if (rate > maxRate) { - busiestTag = tag; - maxRate = rate; - maxCost = cost; - } - } - if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { - it->value.busiestWriteTag = busiestTag; - // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps); - ASSERT(it->value.totalWriteCosts > 0); - maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts; - it->value.busiestWriteTagFractionalBusyness = maxBusyness; - it->value.busiestWriteTagRate = maxRate; - } - - TraceEvent("BusiestWriteTag", it->key) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagOps", maxCost.getOpsSum()) - .detail("TagCost", maxCost.getCostSum()) - .detail("TotalCost", it->value.totalWriteCosts) - .detail("Reported", it->value.busiestWriteTag.present()) - .trackLatest(it->value.busiestWriteTagEventHolder->trackingKey); - - // reset statistics - it->value.tagCostEst.clear(); - it->value.totalWriteOps = 0; - it->value.totalWriteCosts = 0; - } - lastBusiestCommitTagPick = now(); - return Void(); +Future Ratekeeper::refreshStorageServerCommitCosts() { + return RatekeeperImpl::refreshStorageServerCommitCosts(this); } ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { wait(Ratekeeper::run(rkInterf, dbInfo)); return Void(); } + +StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality) + : busiestWriteTagEventHolder(makeReference(id.toString() + "/BusiestWriteTag")), valid(false), + id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), + limitReason(limitReason_t::unlimited) { + // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo + lastReply.instanceID = -1; +} + +void StorageQueueInfo::addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost) { + tagCostEst[tagName] += cost; + totalWriteCosts += cost.getCostSum(); + totalWriteOps += cost.getOpsSum(); +} + +void StorageQueueInfo::update(StorageQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes) { + valid = true; + auto prevReply = std::move(lastReply); + lastReply = reply; + if (prevReply.instanceID != reply.instanceID) { + smoothDurableBytes.reset(reply.bytesDurable); + verySmoothDurableBytes.reset(reply.bytesDurable); + smoothInputBytes.reset(reply.bytesInput); + smoothFreeSpace.reset(reply.storageBytes.available); + smoothTotalSpace.reset(reply.storageBytes.total); + smoothDurableVersion.reset(reply.durableVersion); + smoothLatestVersion.reset(reply.version); + } else { + smoothTotalDurableBytes.addDelta(reply.bytesDurable - prevReply.bytesDurable); + smoothDurableBytes.setTotal(reply.bytesDurable); + verySmoothDurableBytes.setTotal(reply.bytesDurable); + smoothInputBytes.setTotal(reply.bytesInput); + smoothFreeSpace.setTotal(reply.storageBytes.available); + smoothTotalSpace.setTotal(reply.storageBytes.total); + smoothDurableVersion.setTotal(reply.durableVersion); + smoothLatestVersion.setTotal(reply.version); + } + + busiestReadTags = reply.busiestTags; +} + +void StorageQueueInfo::refreshCommitCost(double elapsed) { + busiestWriteTags.clear(); + TransactionTag busiestTag; + TransactionCommitCostEstimation maxCost; + double maxRate = 0, maxBusyness = 0; + for (const auto& [tag, cost] : tagCostEst) { + double rate = cost.getCostSum() / elapsed; + if (rate > maxRate) { + busiestTag = tag; + maxRate = rate; + maxCost = cost; + } + } + if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { + // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", totalWriteCost).detail("TotalWriteOps",totalWriteOps); + ASSERT_GT(totalWriteCosts, 0); + maxBusyness = double(maxCost.getCostSum()) / totalWriteCosts; + busiestWriteTags.emplace_back(busiestTag, maxRate, maxBusyness); + } + + TraceEvent("BusiestWriteTag", id) + .detail("Elapsed", elapsed) + .detail("Tag", printable(busiestTag)) + .detail("TagOps", maxCost.getOpsSum()) + .detail("TagCost", maxCost.getCostSum()) + .detail("TotalCost", totalWriteCosts) + .detail("Reported", !busiestWriteTags.empty()) + .trackLatest(busiestWriteTagEventHolder->trackingKey); + + // reset statistics + tagCostEst.clear(); + totalWriteOps = 0; + totalWriteCosts = 0; +} + +TLogQueueInfo::TLogQueueInfo(UID id) + : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { + // FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from + // storageQueueInfO) + lastReply.instanceID = -1; +} + +void TLogQueueInfo::update(TLogQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes) { + valid = true; + auto prevReply = std::move(lastReply); + lastReply = reply; + if (prevReply.instanceID != reply.instanceID) { + smoothDurableBytes.reset(reply.bytesDurable); + verySmoothDurableBytes.reset(reply.bytesDurable); + smoothInputBytes.reset(reply.bytesInput); + smoothFreeSpace.reset(reply.storageBytes.available); + smoothTotalSpace.reset(reply.storageBytes.total); + } else { + smoothTotalDurableBytes.addDelta(reply.bytesDurable - prevReply.bytesDurable); + smoothDurableBytes.setTotal(reply.bytesDurable); + verySmoothDurableBytes.setTotal(reply.bytesDurable); + smoothInputBytes.setTotal(reply.bytesInput); + smoothFreeSpace.setTotal(reply.storageBytes.available); + smoothTotalSpace.setTotal(reply.storageBytes.total); + } +} + +RatekeeperLimits::RatekeeperLimits(TransactionPriority priority, + std::string context, + int64_t storageTargetBytes, + int64_t storageSpringBytes, + int64_t logTargetBytes, + int64_t logSpringBytes, + double maxVersionDifference, + int64_t durabilityLagTargetVersions) + : tpsLimit(std::numeric_limits::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)), + reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes), + storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes), + maxVersionDifference(maxVersionDifference), + durabilityLagTargetVersions( + durabilityLagTargetVersions + + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not + // be durable on the storage servers + lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits::infinity()), priority(priority), + context(context), rkUpdateEventCacheHolder(makeReference("RkUpdate" + context)) {} diff --git a/fdbserver/Ratekeeper.h b/fdbserver/Ratekeeper.h index 8552eeb521..1fe0031c15 100644 --- a/fdbserver/Ratekeeper.h +++ b/fdbserver/Ratekeeper.h @@ -46,57 +46,45 @@ enum limitReason_t { limitReason_t_end }; -struct StorageQueueInfo { +class StorageQueueInfo { + uint64_t totalWriteCosts{ 0 }; + int totalWriteOps{ 0 }; + Reference busiestWriteTagEventHolder; + + // refresh periodically + TransactionTagMap tagCostEst; + +public: bool valid; UID id; LocalityData locality; StorageQueuingMetricsReply lastReply; - StorageQueuingMetricsReply prevReply; Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; Smoother smoothDurableVersion, smoothLatestVersion; Smoother smoothFreeSpace; Smoother smoothTotalSpace; limitReason_t limitReason; + std::vector busiestReadTags, busiestWriteTags; - Optional busiestReadTag, busiestWriteTag; - double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; - double busiestReadTagRate = 0, busiestWriteTagRate = 0; - - Reference busiestWriteTagEventHolder; - - // refresh periodically - TransactionTagMap tagCostEst; - uint64_t totalWriteCosts = 0; - int totalWriteOps = 0; - - StorageQueueInfo(UID id, LocalityData locality) - : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - limitReason(limitReason_t::unlimited), - busiestWriteTagEventHolder(makeReference(id.toString() + "/BusiestWriteTag")) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo - lastReply.instanceID = -1; - } + StorageQueueInfo(UID id, LocalityData locality); + void refreshCommitCost(double elapsed); + int64_t getStorageQueueBytes() const { return lastReply.bytesInput - smoothDurableBytes.smoothTotal(); } + int64_t getDurabilityLag() const { return smoothLatestVersion.smoothTotal() - smoothDurableVersion.smoothTotal(); } + void update(StorageQueuingMetricsReply const&, Smoother& smoothTotalDurableBytes); + void addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost); }; struct TLogQueueInfo { + TLogQueuingMetricsReply lastReply; bool valid; UID id; - TLogQueuingMetricsReply lastReply; - TLogQueuingMetricsReply prevReply; Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; Smoother smoothFreeSpace; Smoother smoothTotalSpace; - TLogQueueInfo(UID id) - : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from - // storageQueueInfO) - lastReply.instanceID = -1; - } + + TLogQueueInfo(UID id); + Version getLastCommittedVersion() const { return lastReply.v; } + void update(TLogQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes); }; struct RatekeeperLimits { @@ -126,17 +114,7 @@ struct RatekeeperLimits { int64_t logTargetBytes, int64_t logSpringBytes, double maxVersionDifference, - int64_t durabilityLagTargetVersions) - : tpsLimit(std::numeric_limits::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)), - reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes), - storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes), - maxVersionDifference(maxVersionDifference), - durabilityLagTargetVersions( - durabilityLagTargetVersions + - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not - // be durable on the storage servers - lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits::infinity()), priority(priority), - context(context), rkUpdateEventCacheHolder(makeReference("RkUpdate" + context)) {} + int64_t durabilityLagTargetVersions); }; class Ratekeeper { @@ -144,16 +122,12 @@ class Ratekeeper { // Differentiate from GrvProxyInfo in DatabaseContext.h struct GrvProxyInfo { - int64_t totalTransactions; - int64_t batchTransactions; - uint64_t lastThrottledTagChangeId; + int64_t totalTransactions{ 0 }; + int64_t batchTransactions{ 0 }; + uint64_t lastThrottledTagChangeId{ 0 }; - double lastUpdateTime; - double lastTagPushTime; - - GrvProxyInfo() - : totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), - lastTagPushTime(0) {} + double lastUpdateTime{ 0.0 }; + double lastTagPushTime{ 0.0 }; }; UID id; @@ -181,16 +155,12 @@ class Ratekeeper { Deque actualTpsHistory; Optional remoteDC; - Future expiredTagThrottleCleanup; - - double lastBusiestCommitTagPick; - Ratekeeper(UID id, Database db); Future configurationMonitor(); void updateCommitCostEstimation(UIDTransactionTagMap const& costEstimation); void updateRate(RatekeeperLimits* limits); - Future refreshStorageServerCommitCost(); + Future refreshStorageServerCommitCosts(); Future monitorServerListChange(PromiseStream>> serverChanges); Future trackEachStorageServer(FutureStream>> serverChanges); diff --git a/fdbserver/RkTagThrottleCollection.cpp b/fdbserver/RkTagThrottleCollection.cpp new file mode 100644 index 0000000000..11c376f57d --- /dev/null +++ b/fdbserver/RkTagThrottleCollection.cpp @@ -0,0 +1,356 @@ +/* + * RkTagThrottleCollection.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/Knobs.h" +#include "fdbserver/RkTagThrottleCollection.h" + +double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional requestRate) { + if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { + return limits.tpsRate; + } else { + return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal()); + } +} + +Optional RkTagThrottleCollection::RkTagThrottleData::updateAndGetClientRate(Optional requestRate) { + if (limits.expiration > now()) { + double targetRate = getTargetRate(requestRate); + if (targetRate == std::numeric_limits::max()) { + rateSet = false; + return targetRate; + } + if (!rateSet) { + rateSet = true; + clientRate.reset(targetRate); + } else { + clientRate.setTotal(targetRate); + } + + double rate = clientRate.smoothTotal(); + ASSERT_GE(rate, 0); + return rate; + } else { + TEST(true); // Get throttle rate for expired throttle + rateSet = false; + return Optional(); + } +} + +RkTagThrottleCollection::RkTagThrottleCollection(RkTagThrottleCollection&& other) { + autoThrottledTags = std::move(other.autoThrottledTags); + manualThrottledTags = std::move(other.manualThrottledTags); + tagData = std::move(other.tagData); +} + +RkTagThrottleCollection& RkTagThrottleCollection::RkTagThrottleCollection::operator=(RkTagThrottleCollection&& other) { + autoThrottledTags = std::move(other.autoThrottledTags); + manualThrottledTags = std::move(other.manualThrottledTags); + tagData = std::move(other.tagData); + return *this; +} + +double RkTagThrottleCollection::computeTargetTpsRate(double currentBusyness, + double targetBusyness, + double requestRate) { + ASSERT_GT(currentBusyness, 0); + + if (targetBusyness < 1) { + double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness); + return requestRate * targetFraction; + } else { + return std::numeric_limits::max(); + } +} + +Optional RkTagThrottleCollection::autoThrottleTag(UID id, + TransactionTag const& tag, + double fractionalBusyness, + Optional tpsRate, + Optional expiration) { + ASSERT(!tpsRate.present() || tpsRate.get() >= 0); + ASSERT(!expiration.present() || expiration.get() > now()); + + auto itr = autoThrottledTags.find(tag); + bool present = (itr != autoThrottledTags.end()); + if (!present) { + if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) { + TEST(true); // Reached auto-throttle limit + return Optional(); + } + + itr = autoThrottledTags.try_emplace(tag).first; + initializeTag(tag); + } else if (itr->second.limits.expiration <= now()) { + TEST(true); // Re-throttling expired tag that hasn't been cleaned up + present = false; + itr->second = RkTagThrottleData(); + } + + auto& throttle = itr->second; + + if (!tpsRate.present()) { + if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) { + tpsRate = std::numeric_limits::max(); + if (present) { + return Optional(); + } + } else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) { + TEST(true); // Tag auto-throttled too quickly + return Optional(); + } else { + tpsRate = computeTargetTpsRate(fractionalBusyness, + SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, + tagData[tag].requestRate.smoothRate()); + + if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) { + TEST(true); // Tag auto-throttle rate increase attempt while active + return Optional(); + } + + throttle.lastUpdated = now(); + if (tpsRate.get() < throttle.limits.tpsRate) { + throttle.lastReduced = now(); + } + } + } + if (!expiration.present()) { + expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION; + } + + ASSERT(tpsRate.present() && tpsRate.get() >= 0); + + throttle.limits.tpsRate = tpsRate.get(); + throttle.limits.expiration = expiration.get(); + + Optional clientRate = throttle.updateAndGetClientRate(getRequestRate(tag)); + + TraceEvent("RkSetAutoThrottle", id) + .detail("Tag", tag) + .detail("TargetRate", tpsRate.get()) + .detail("Expiration", expiration.get() - now()) + .detail("ClientRate", clientRate) + .detail("Created", now() - throttle.created) + .detail("LastUpdate", now() - throttle.lastUpdated) + .detail("LastReduced", now() - throttle.lastReduced); + + if (tpsRate.get() != std::numeric_limits::max()) { + return tpsRate.get(); + } else { + return Optional(); + } +} + +void RkTagThrottleCollection::manualThrottleTag(UID id, + TransactionTag const& tag, + TransactionPriority priority, + double tpsRate, + double expiration, + Optional const& oldLimits) { + ASSERT(tpsRate >= 0); + ASSERT(expiration > now()); + + auto& priorityThrottleMap = manualThrottledTags[tag]; + auto result = priorityThrottleMap.try_emplace(priority); + initializeTag(tag); + ASSERT(result.second); // Updating to the map is done by copying the whole map + + result.first->second.limits.tpsRate = tpsRate; + result.first->second.limits.expiration = expiration; + + if (!oldLimits.present()) { + TEST(true); // Transaction tag manually throttled + TraceEvent("RatekeeperAddingManualThrottle", id) + .detail("Tag", tag) + .detail("Rate", tpsRate) + .detail("Priority", transactionPriorityToString(priority)) + .detail("SecondsToExpiration", expiration - now()); + } else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) { + TEST(true); // Manual transaction tag throttle updated + TraceEvent("RatekeeperUpdatingManualThrottle", id) + .detail("Tag", tag) + .detail("Rate", tpsRate) + .detail("Priority", transactionPriorityToString(priority)) + .detail("SecondsToExpiration", expiration - now()); + } + + Optional clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag)); + ASSERT(clientRate.present()); +} + +Optional RkTagThrottleCollection::getManualTagThrottleLimits(TransactionTag const& tag, + TransactionPriority priority) { + auto itr = manualThrottledTags.find(tag); + if (itr != manualThrottledTags.end()) { + auto priorityItr = itr->second.find(priority); + if (priorityItr != itr->second.end()) { + return priorityItr->second.limits; + } + } + + return Optional(); +} + +PrioritizedTransactionTagMap RkTagThrottleCollection::getClientRates( + bool autoThrottlingEnabled) { + PrioritizedTransactionTagMap clientRates; + + for (auto tagItr = tagData.begin(); tagItr != tagData.end();) { + bool tagPresent = false; + + double requestRate = tagItr->second.requestRate.smoothRate(); + auto manualItr = manualThrottledTags.find(tagItr->first); + if (manualItr != manualThrottledTags.end()) { + Optional manualClientRate; + for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); + ++priority) { + auto priorityItr = manualItr->second.find(*priority); + if (priorityItr != manualItr->second.end()) { + Optional priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate); + if (!priorityClientRate.present()) { + TEST(true); // Manual priority throttle expired + priorityItr = manualItr->second.erase(priorityItr); + } else { + if (!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) { + manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), + priorityItr->second.limits.expiration); + } else { + TEST(true); // Manual throttle overriden by higher priority + } + + ++priorityItr; + } + } + + if (manualClientRate.present()) { + tagPresent = true; + TEST(true); // Using manual throttle + clientRates[*priority][tagItr->first] = manualClientRate.get(); + } + } + + if (manualItr->second.empty()) { + TEST(true); // All manual throttles expired + manualThrottledTags.erase(manualItr); + break; + } + } + + auto autoItr = autoThrottledTags.find(tagItr->first); + if (autoItr != autoThrottledTags.end()) { + Optional autoClientRate = autoItr->second.updateAndGetClientRate(requestRate); + if (autoClientRate.present()) { + double adjustedRate = autoClientRate.get(); + double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; + if (now() >= rampStartTime && adjustedRate != std::numeric_limits::max()) { + TEST(true); // Tag auto-throttle ramping up + + double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS; + if (targetBusyness == 0) { + targetBusyness = 0.01; + } + + double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; + adjustedRate = + computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate); + } + + tagPresent = true; + if (autoThrottlingEnabled) { + auto result = clientRates[TransactionPriority::DEFAULT].try_emplace( + tagItr->first, adjustedRate, autoItr->second.limits.expiration); + if (!result.second && result.first->second.tpsRate > adjustedRate) { + result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration); + } else { + TEST(true); // Auto throttle overriden by manual throttle + } + clientRates[TransactionPriority::BATCH][tagItr->first] = + ClientTagThrottleLimits(0, autoItr->second.limits.expiration); + } + } else { + ASSERT(autoItr->second.limits.expiration <= now()); + TEST(true); // Auto throttle expired + if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup + tagPresent = true; + } else { + autoThrottledTags.erase(autoItr); + } + } + } + + if (!tagPresent) { + TEST(true); // All tag throttles expired + tagItr = tagData.erase(tagItr); + } else { + ++tagItr; + } + } + + return clientRates; +} + +void RkTagThrottleCollection::addRequests(TransactionTag const& tag, int requests) { + if (requests > 0) { + TEST(true); // Requests reported for throttled tag + + auto tagItr = tagData.try_emplace(tag); + tagItr.first->second.requestRate.addDelta(requests); + + double requestRate = tagItr.first->second.requestRate.smoothRate(); + + auto autoItr = autoThrottledTags.find(tag); + if (autoItr != autoThrottledTags.end()) { + autoItr->second.updateAndGetClientRate(requestRate); + } + + auto manualItr = manualThrottledTags.find(tag); + if (manualItr != manualThrottledTags.end()) { + for (auto& [priority, tagThrottleData] : manualItr->second) { + tagThrottleData.updateAndGetClientRate(requestRate); + } + } + } +} + +Optional RkTagThrottleCollection::getRequestRate(TransactionTag const& tag) { + auto itr = tagData.find(tag); + if (itr != tagData.end()) { + return itr->second.requestRate.smoothRate(); + } + return Optional(); +} + +int64_t RkTagThrottleCollection::manualThrottleCount() const { + int64_t count = 0; + for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) { + count += itr->second.size(); + } + + return count; +} + +void RkTagThrottleCollection::updateBusyTagCount(TagThrottledReason reason) { + if (reason == TagThrottledReason::BUSY_READ) { + ++busyReadTagCount; + } else if (reason == TagThrottledReason::BUSY_WRITE) { + ++busyWriteTagCount; + } +} diff --git a/fdbserver/RkTagThrottleCollection.h b/fdbserver/RkTagThrottleCollection.h new file mode 100644 index 0000000000..35062cdb7c --- /dev/null +++ b/fdbserver/RkTagThrottleCollection.h @@ -0,0 +1,89 @@ +/* + * RkTagThrottleCollection.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/Knobs.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbrpc/Smoother.h" + +class RkTagThrottleCollection : NonCopyable { + struct RkTagData { + Smoother requestRate; + RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} + }; + + struct RkTagThrottleData { + ClientTagThrottleLimits limits; + Smoother clientRate; + + // Only used by auto-throttles + double created = now(); + double lastUpdated = 0; + double lastReduced = now(); + bool rateSet = false; + + RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} + double getTargetRate(Optional requestRate); + Optional updateAndGetClientRate(Optional requestRate); + }; + + TransactionTagMap autoThrottledTags; + TransactionTagMap> manualThrottledTags; + TransactionTagMap tagData; + uint32_t busyReadTagCount = 0, busyWriteTagCount = 0; + + void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); } + static double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate); + Optional getRequestRate(TransactionTag const& tag); + +public: + RkTagThrottleCollection() = default; + RkTagThrottleCollection(RkTagThrottleCollection&& other); + RkTagThrottleCollection& operator=(RkTagThrottleCollection&& other); + + // Set or update an auto throttling limit for the specified tag and priority combination. + // Returns the TPS rate if the throttle is updated, otherwise returns an empty optional + Optional autoThrottleTag(UID id, + TransactionTag const& tag, + double fractionalBusyness, + Optional tpsRate = Optional(), + Optional expiration = Optional()); + + // Set or update a manual tps rate limit for the specified tag and priority combination + void manualThrottleTag(UID id, + TransactionTag const& tag, + TransactionPriority priority, + double tpsRate, + double expiration, + Optional const& oldLimits); + + Optional getManualTagThrottleLimits(TransactionTag const& tag, + TransactionPriority priority); + + PrioritizedTransactionTagMap getClientRates(bool autoThrottlingEnabled); + void addRequests(TransactionTag const& tag, int requests); + int64_t autoThrottleCount() const { return autoThrottledTags.size(); } + int64_t manualThrottleCount() const; + void updateBusyTagCount(TagThrottledReason); + auto getBusyReadTagCount() const { return busyReadTagCount; } + auto getBusyWriteTagCount() const { return busyWriteTagCount; } +}; diff --git a/fdbserver/TagThrottler.actor.cpp b/fdbserver/TagThrottler.actor.cpp index 5eaad8b072..d85e021544 100644 --- a/fdbserver/TagThrottler.actor.cpp +++ b/fdbserver/TagThrottler.actor.cpp @@ -13,369 +13,14 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbserver/TagThrottler.h" - -class RkTagThrottleCollection : NonCopyable { - struct RkTagData { - Smoother requestRate; - RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - }; - - struct RkTagThrottleData { - ClientTagThrottleLimits limits; - Smoother clientRate; - - // Only used by auto-throttles - double created = now(); - double lastUpdated = 0; - double lastReduced = now(); - bool rateSet = false; - - RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - - double getTargetRate(Optional requestRate) const { - if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { - return limits.tpsRate; - } else { - return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal()); - } - } - - Optional updateAndGetClientRate(Optional requestRate) { - if (limits.expiration > now()) { - double targetRate = getTargetRate(requestRate); - if (targetRate == std::numeric_limits::max()) { - rateSet = false; - return targetRate; - } - if (!rateSet) { - rateSet = true; - clientRate.reset(targetRate); - } else { - clientRate.setTotal(targetRate); - } - - double rate = clientRate.smoothTotal(); - ASSERT(rate >= 0); - return rate; - } else { - TEST(true); // Get throttle rate for expired throttle - rateSet = false; - return Optional(); - } - } - }; - - void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); } - -public: - RkTagThrottleCollection() {} - - RkTagThrottleCollection(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - void operator=(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) { - ASSERT(currentBusyness > 0); - - if (targetBusyness < 1) { - double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness); - return requestRate * targetFraction; - } else { - return std::numeric_limits::max(); - } - } - - // Returns the TPS rate if the throttle is updated, otherwise returns an empty optional - Optional autoThrottleTag(UID id, - TransactionTag const& tag, - double fractionalBusyness, - Optional tpsRate = Optional(), - Optional expiration = Optional()) { - ASSERT(!tpsRate.present() || tpsRate.get() >= 0); - ASSERT(!expiration.present() || expiration.get() > now()); - - auto itr = autoThrottledTags.find(tag); - bool present = (itr != autoThrottledTags.end()); - if (!present) { - if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) { - TEST(true); // Reached auto-throttle limit - return Optional(); - } - - itr = autoThrottledTags.try_emplace(tag).first; - initializeTag(tag); - } else if (itr->second.limits.expiration <= now()) { - TEST(true); // Re-throttling expired tag that hasn't been cleaned up - present = false; - itr->second = RkTagThrottleData(); - } - - auto& throttle = itr->second; - - if (!tpsRate.present()) { - if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) { - tpsRate = std::numeric_limits::max(); - if (present) { - return Optional(); - } - } else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) { - TEST(true); // Tag auto-throttled too quickly - return Optional(); - } else { - tpsRate = computeTargetTpsRate(fractionalBusyness, - SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, - tagData[tag].requestRate.smoothRate()); - - if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) { - TEST(true); // Tag auto-throttle rate increase attempt while active - return Optional(); - } - - throttle.lastUpdated = now(); - if (tpsRate.get() < throttle.limits.tpsRate) { - throttle.lastReduced = now(); - } - } - } - if (!expiration.present()) { - expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION; - } - - ASSERT(tpsRate.present() && tpsRate.get() >= 0); - - throttle.limits.tpsRate = tpsRate.get(); - throttle.limits.expiration = expiration.get(); - - Optional clientRate = throttle.updateAndGetClientRate(getRequestRate(tag)); - - TraceEvent("RkSetAutoThrottle", id) - .detail("Tag", tag) - .detail("TargetRate", tpsRate.get()) - .detail("Expiration", expiration.get() - now()) - .detail("ClientRate", clientRate) - .detail("Created", now() - throttle.created) - .detail("LastUpdate", now() - throttle.lastUpdated) - .detail("LastReduced", now() - throttle.lastReduced); - - if (tpsRate.get() != std::numeric_limits::max()) { - return tpsRate.get(); - } else { - return Optional(); - } - } - - void manualThrottleTag(UID id, - TransactionTag const& tag, - TransactionPriority priority, - double tpsRate, - double expiration, - Optional const& oldLimits) { - ASSERT(tpsRate >= 0); - ASSERT(expiration > now()); - - auto& priorityThrottleMap = manualThrottledTags[tag]; - auto result = priorityThrottleMap.try_emplace(priority); - initializeTag(tag); - ASSERT(result.second); // Updating to the map is done by copying the whole map - - result.first->second.limits.tpsRate = tpsRate; - result.first->second.limits.expiration = expiration; - - if (!oldLimits.present()) { - TEST(true); // Transaction tag manually throttled - TraceEvent("RatekeeperAddingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) { - TEST(true); // Manual transaction tag throttle updated - TraceEvent("RatekeeperUpdatingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } - - Optional clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag)); - ASSERT(clientRate.present()); - } - - Optional getManualTagThrottleLimits(TransactionTag const& tag, - TransactionPriority priority) { - auto itr = manualThrottledTags.find(tag); - if (itr != manualThrottledTags.end()) { - auto priorityItr = itr->second.find(priority); - if (priorityItr != itr->second.end()) { - return priorityItr->second.limits; - } - } - - return Optional(); - } - - PrioritizedTransactionTagMap getClientRates(bool autoThrottlingEnabled) { - PrioritizedTransactionTagMap clientRates; - - for (auto tagItr = tagData.begin(); tagItr != tagData.end();) { - bool tagPresent = false; - - double requestRate = tagItr->second.requestRate.smoothRate(); - auto manualItr = manualThrottledTags.find(tagItr->first); - if (manualItr != manualThrottledTags.end()) { - Optional manualClientRate; - for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); - ++priority) { - auto priorityItr = manualItr->second.find(*priority); - if (priorityItr != manualItr->second.end()) { - Optional priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate); - if (!priorityClientRate.present()) { - TEST(true); // Manual priority throttle expired - priorityItr = manualItr->second.erase(priorityItr); - } else { - if (!manualClientRate.present() || - manualClientRate.get().tpsRate > priorityClientRate.get()) { - manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), - priorityItr->second.limits.expiration); - } else { - TEST(true); // Manual throttle overriden by higher priority - } - - ++priorityItr; - } - } - - if (manualClientRate.present()) { - tagPresent = true; - TEST(true); // Using manual throttle - clientRates[*priority][tagItr->first] = manualClientRate.get(); - } - } - - if (manualItr->second.empty()) { - TEST(true); // All manual throttles expired - manualThrottledTags.erase(manualItr); - break; - } - } - - auto autoItr = autoThrottledTags.find(tagItr->first); - if (autoItr != autoThrottledTags.end()) { - Optional autoClientRate = autoItr->second.updateAndGetClientRate(requestRate); - if (autoClientRate.present()) { - double adjustedRate = autoClientRate.get(); - double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - if (now() >= rampStartTime && adjustedRate != std::numeric_limits::max()) { - TEST(true); // Tag auto-throttle ramping up - - double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS; - if (targetBusyness == 0) { - targetBusyness = 0.01; - } - - double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - adjustedRate = - computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate); - } - - tagPresent = true; - if (autoThrottlingEnabled) { - auto result = clientRates[TransactionPriority::DEFAULT].try_emplace( - tagItr->first, adjustedRate, autoItr->second.limits.expiration); - if (!result.second && result.first->second.tpsRate > adjustedRate) { - result.first->second = - ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration); - } else { - TEST(true); // Auto throttle overriden by manual throttle - } - clientRates[TransactionPriority::BATCH][tagItr->first] = - ClientTagThrottleLimits(0, autoItr->second.limits.expiration); - } - } else { - ASSERT(autoItr->second.limits.expiration <= now()); - TEST(true); // Auto throttle expired - if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup - tagPresent = true; - } else { - autoThrottledTags.erase(autoItr); - } - } - } - - if (!tagPresent) { - TEST(true); // All tag throttles expired - tagItr = tagData.erase(tagItr); - } else { - ++tagItr; - } - } - - return clientRates; - } - - void addRequests(TransactionTag const& tag, int requests) { - if (requests > 0) { - TEST(true); // Requests reported for throttled tag - - auto tagItr = tagData.try_emplace(tag); - tagItr.first->second.requestRate.addDelta(requests); - - double requestRate = tagItr.first->second.requestRate.smoothRate(); - - auto autoItr = autoThrottledTags.find(tag); - if (autoItr != autoThrottledTags.end()) { - autoItr->second.updateAndGetClientRate(requestRate); - } - - auto manualItr = manualThrottledTags.find(tag); - if (manualItr != manualThrottledTags.end()) { - for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); - ++priorityItr) { - priorityItr->second.updateAndGetClientRate(requestRate); - } - } - } - } - - Optional getRequestRate(TransactionTag const& tag) { - auto itr = tagData.find(tag); - if (itr != tagData.end()) { - return itr->second.requestRate.smoothRate(); - } - return Optional(); - } - - int64_t autoThrottleCount() const { return autoThrottledTags.size(); } - - int64_t manualThrottleCount() const { - int64_t count = 0; - for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) { - count += itr->second.size(); - } - - return count; - } - - TransactionTagMap autoThrottledTags; - TransactionTagMap> manualThrottledTags; - TransactionTagMap tagData; - uint32_t busyReadTagCount = 0, busyWriteTagCount = 0; -}; +#include "fdbserver/RkTagThrottleCollection.h" class TagThrottlerImpl { Database db; @@ -383,6 +28,7 @@ class TagThrottlerImpl { RkTagThrottleCollection throttledTags; uint64_t throttledTagChangeId{ 0 }; bool autoThrottlingEnabled{ false }; + Future expiredTagThrottleCleanup; ACTOR static Future monitorThrottlingChanges(TagThrottlerImpl* self) { state bool committed = false; @@ -460,11 +106,7 @@ class TagThrottlerImpl { if (tagKey.throttleType == TagThrottleType::AUTO) { updatedTagThrottles.autoThrottleTag( self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); - if (tagValue.reason == TagThrottledReason::BUSY_READ) { - updatedTagThrottles.busyReadTagCount++; - } else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) { - updatedTagThrottles.busyWriteTagCount++; - } + updatedTagThrottles.updateBusyTagCount(tagValue.reason); } else { updatedTagThrottles.manualThrottleTag(self->id, tag, @@ -495,16 +137,12 @@ class TagThrottlerImpl { } } - Optional autoThrottleTag(UID id, TransactionTag tag, double busyness) { - return throttledTags.autoThrottleTag(id, tag, busyness); - } - - Future tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) { + Future tryUpdateAutoThrottling(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) { // NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE // currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs. if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { TEST(true); // Transaction tag auto-throttled - Optional clientRate = autoThrottleTag(id, tag, busyness); + Optional clientRate = throttledTags.autoThrottleTag(id, tag, busyness); if (clientRate.present()) { TagSet tags; tags.addTag(tag); @@ -524,7 +162,10 @@ class TagThrottlerImpl { } public: - TagThrottlerImpl(Database db, UID id) : db(db), id(id) {} + TagThrottlerImpl(Database db, UID id) : db(db), id(id) { + expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); }, + SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); + } Future monitorThrottlingChanges() { return monitorThrottlingChanges(this); } void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); } @@ -533,28 +174,31 @@ public: return throttledTags.getClientRates(autoThrottlingEnabled); } int64_t autoThrottleCount() const { return throttledTags.autoThrottleCount(); } - uint32_t busyReadTagCount() const { return throttledTags.busyReadTagCount; } - uint32_t busyWriteTagCount() const { return throttledTags.busyWriteTagCount; } + uint32_t busyReadTagCount() const { return throttledTags.getBusyReadTagCount(); } + uint32_t busyWriteTagCount() const { return throttledTags.getBusyWriteTagCount(); } int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); } bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; } - Future tryAutoThrottleTag(StorageQueueInfo const& ss, int64_t storageQueue, int64_t storageDurabilityLag) { + Future tryUpdateAutoThrottling(StorageQueueInfo const& ss) { // NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In // most of situation, this works. More indicators besides queue size and durability lag could be investigated in // the future + auto storageQueue = ss.getStorageQueueBytes(); + auto storageDurabilityLag = ss.getDurabilityLag(); if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { - if (ss.busiestWriteTag.present()) { - return tryAutoThrottleTag(ss.busiestWriteTag.get(), - ss.busiestWriteTagRate, - ss.busiestWriteTagFractionalBusyness, - TagThrottledReason::BUSY_WRITE); + // TODO: Update once size is potentially > 1 + ASSERT_WE_THINK(ss.busiestWriteTags.size() <= 1); + ASSERT_WE_THINK(ss.busiestReadTags.size() <= 1); + for (const auto& busyWriteTag : ss.busiestWriteTags) { + return tryUpdateAutoThrottling(busyWriteTag.tag, + busyWriteTag.rate, + busyWriteTag.fractionalBusyness, + TagThrottledReason::BUSY_WRITE); } - if (ss.busiestReadTag.present()) { - return tryAutoThrottleTag(ss.busiestReadTag.get(), - ss.busiestReadTagRate, - ss.busiestReadTagFractionalBusyness, - TagThrottledReason::BUSY_READ); + for (const auto& busyReadTag : ss.busiestReadTags) { + return tryUpdateAutoThrottling( + busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ); } } return Void(); @@ -591,8 +235,6 @@ int64_t TagThrottler::manualThrottleCount() const { bool TagThrottler::isAutoThrottlingEnabled() const { return impl->isAutoThrottlingEnabled(); } -Future TagThrottler::tryAutoThrottleTag(StorageQueueInfo const& ss, - int64_t storageQueue, - int64_t storageDurabilityLag) { - return impl->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag); +Future TagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) { + return impl->tryUpdateAutoThrottling(ss); } diff --git a/fdbserver/TagThrottler.h b/fdbserver/TagThrottler.h index 29989a5083..69e3909c7d 100644 --- a/fdbserver/TagThrottler.h +++ b/fdbserver/TagThrottler.h @@ -29,14 +29,26 @@ class TagThrottler { public: TagThrottler(Database db, UID id); ~TagThrottler(); + + // Poll the system keyspace looking for updates made through the tag throttling API Future monitorThrottlingChanges(); + + // Increment the number of known requests associated with the specified tag void addRequests(TransactionTag tag, int count); + + // This throttled tag change ID is used to coordinate updates with the GRV proxies uint64_t getThrottledTagChangeId() const; + + // For each tag and priority combination, return the throughput limit and expiration time PrioritizedTransactionTagMap getClientRates(); + int64_t autoThrottleCount() const; uint32_t busyReadTagCount() const; uint32_t busyWriteTagCount() const; int64_t manualThrottleCount() const; bool isAutoThrottlingEnabled() const; - Future tryAutoThrottleTag(StorageQueueInfo const&, int64_t storageQueue, int64_t storageDurabilityLag); + + // Based on the busiest read and write tags in the provided storage queue info, update + // tag throttling limits. + Future tryUpdateAutoThrottling(StorageQueueInfo const&); }; diff --git a/fdbserver/TransactionTagCounter.cpp b/fdbserver/TransactionTagCounter.cpp new file mode 100644 index 0000000000..1f0a25c2cc --- /dev/null +++ b/fdbserver/TransactionTagCounter.cpp @@ -0,0 +1,67 @@ +/* + * TransactionTagCounter.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/TransactionTagCounter.h" +#include "flow/Trace.h" + +TransactionTagCounter::TransactionTagCounter(UID thisServerID) + : thisServerID(thisServerID), + busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} + +void TransactionTagCounter::addRequest(Optional const& tags, int64_t bytes) { + if (tags.present()) { + TEST(true); // Tracking transaction tag in counter + double cost = costFunction(bytes); + for (auto& tag : tags.get()) { + int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())]; + count += cost; + if (count > busiestTagCount) { + busiestTagCount = count; + busiestTag = tag; + } + } + + intervalTotalSampledCount += cost; + } +} + +void TransactionTagCounter::startNewInterval() { + double elapsed = now() - intervalStart; + previousBusiestTags.clear(); + if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { + double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed; + if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) { + previousBusiestTags.emplace_back(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount); + } + + TraceEvent("BusiestReadTag", thisServerID) + .detail("Elapsed", elapsed) + .detail("Tag", printable(busiestTag)) + .detail("TagCost", busiestTagCount) + .detail("TotalSampledCost", intervalTotalSampledCount) + .detail("Reported", !previousBusiestTags.empty()) + .trackLatest(busiestReadTagEventHolder->trackingKey); + } + + intervalCounts.clear(); + intervalTotalSampledCount = 0; + busiestTagCount = 0; + intervalStart = now(); +} diff --git a/fdbserver/TransactionTagCounter.h b/fdbserver/TransactionTagCounter.h new file mode 100644 index 0000000000..d520259c5c --- /dev/null +++ b/fdbserver/TransactionTagCounter.h @@ -0,0 +1,44 @@ +/* + * TransactionTagCounter.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/StorageServerInterface.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbserver/Knobs.h" + +class TransactionTagCounter { + TransactionTagMap intervalCounts; + int64_t intervalTotalSampledCount = 0; + TransactionTag busiestTag; + int64_t busiestTagCount = 0; + double intervalStart = 0; + + std::vector previousBusiestTags; + UID thisServerID; + Reference busiestReadTagEventHolder; + +public: + TransactionTagCounter(UID thisServerID); + static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; } + void addRequest(Optional const& tags, int64_t bytes); + void startNewInterval(); + std::vector const& getBusiestTags() const { return previousBusiestTags; } +}; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c90332a019..aaf33f3ee7 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -62,6 +62,7 @@ #include "fdbserver/ServerCheckpoint.actor.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/TLogInterface.h" +#include "fdbserver/TransactionTagCounter.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbrpc/sim_validation.h" @@ -849,78 +850,6 @@ public: return val; } - struct TransactionTagCounter { - struct TagInfo { - TransactionTag tag; - double rate; - double fractionalBusyness; - - TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness) - : tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {} - }; - - TransactionTagMap intervalCounts; - int64_t intervalTotalSampledCount = 0; - TransactionTag busiestTag; - int64_t busiestTagCount = 0; - double intervalStart = 0; - - Optional previousBusiestTag; - - UID thisServerID; - - Reference busiestReadTagEventHolder; - - TransactionTagCounter(UID thisServerID) - : thisServerID(thisServerID), - busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} - - int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; } - - void addRequest(Optional const& tags, int64_t bytes) { - if (tags.present()) { - TEST(true); // Tracking tag on storage server - double cost = costFunction(bytes); - for (auto& tag : tags.get()) { - int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())]; - count += cost; - if (count > busiestTagCount) { - busiestTagCount = count; - busiestTag = tag; - } - } - - intervalTotalSampledCount += cost; - } - } - - void startNewInterval() { - double elapsed = now() - intervalStart; - previousBusiestTag.reset(); - if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { - double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed; - if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) { - previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount); - } - - TraceEvent("BusiestReadTag", thisServerID) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagCost", busiestTagCount) - .detail("TotalSampledCost", intervalTotalSampledCount) - .detail("Reported", previousBusiestTag.present()) - .trackLatest(busiestReadTagEventHolder->trackingKey); - } - - intervalCounts.clear(); - intervalTotalSampledCount = 0; - busiestTagCount = 0; - intervalStart = now(); - } - - Optional getBusiestTag() const { return previousBusiestTag; } - }; - TransactionTagCounter transactionTagCounter; Optional latencyBandConfig; @@ -4206,11 +4135,7 @@ void getQueuingMetrics(StorageServer* self, StorageQueuingMetricsRequest const& reply.diskUsage = self->diskUsage; reply.durableVersion = self->durableVersion.get(); - Optional busiestTag = self->transactionTagCounter.getBusiestTag(); - reply.busiestTag = busiestTag.map( - [](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; }); - reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0; - reply.busiestTagRate = busiestTag.present() ? busiestTag.get().rate : 0.0; + reply.busiestTags = self->transactionTagCounter.getBusiestTags(); req.reply.send(reply); }