From 229f0cca8ba5f9fc4694d45e19c54b09e5f8ecd1 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Wed, 2 Mar 2022 19:16:33 -0800 Subject: [PATCH] Add StorageQueueInfo::refreshCommitCost method --- fdbserver/Ratekeeper.actor.cpp | 97 ++++++++++++++++++---------------- fdbserver/Ratekeeper.h | 6 +-- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 9eddcfc2ec..538bca997a 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -288,9 +288,7 @@ public: self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(self.monitorThrottlingChanges()); - Ratekeeper* selfPtr = &self; // let flow compiler capture self - self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); }, - SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + self.addActor.send(self.refreshStorageServerCommitCosts()); TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) @@ -410,6 +408,19 @@ public: return Void(); } + ACTOR static Future refreshStorageServerCommitCosts(Ratekeeper* self) { + state double lastBusiestCommitTagPick; + loop { + lastBusiestCommitTagPick = now(); + wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + double elapsed = now() - lastBusiestCommitTagPick; + // for each SS, select the busiest commit tag from ssTrTagCommitCost + for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) { + ssQueueInfo.refreshCommitCost(elapsed); + } + } + } + }; // class RatekeeperImpl Future Ratekeeper::configurationMonitor() { @@ -964,52 +975,46 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) { } } -Future Ratekeeper::refreshStorageServerCommitCost() { - if (lastBusiestCommitTagPick == 0) { // the first call should be skipped - lastBusiestCommitTagPick = now(); - return Void(); - } - double elapsed = now() - lastBusiestCommitTagPick; - // for each SS, select the busiest commit tag from ssTrTagCommitCost - for (auto& [ssId, ssQueueInfo] : storageQueueInfo) { - ssQueueInfo.busiestWriteTags.clear(); - TransactionTag busiestTag; - TransactionCommitCostEstimation maxCost; - double maxRate = 0, maxBusyness = 0; - for (const auto& [tag, cost] : ssQueueInfo.tagCostEst) { - double rate = cost.getCostSum() / elapsed; - if (rate > maxRate) { - busiestTag = tag; - maxRate = rate; - maxCost = cost; - } - } - if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { - // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps); - ASSERT_GT(ssQueueInfo.totalWriteCosts, 0); - maxBusyness = double(maxCost.getCostSum()) / ssQueueInfo.totalWriteCosts; - ssQueueInfo.busiestWriteTags.emplace_back(busiestTag, maxBusyness, maxRate); - } - - TraceEvent("BusiestWriteTag", ssId) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagOps", maxCost.getOpsSum()) - .detail("TagCost", maxCost.getCostSum()) - .detail("TotalCost", ssQueueInfo.totalWriteCosts) - .detail("Reported", !ssQueueInfo.busiestWriteTags.empty()) - .trackLatest(ssQueueInfo.busiestWriteTagEventHolder->trackingKey); - - // reset statistics - ssQueueInfo.tagCostEst.clear(); - ssQueueInfo.totalWriteOps = 0; - ssQueueInfo.totalWriteCosts = 0; - } - lastBusiestCommitTagPick = now(); - return Void(); +Future Ratekeeper::refreshStorageServerCommitCosts() { + return RatekeeperImpl::refreshStorageServerCommitCosts(this); } ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { wait(Ratekeeper::run(rkInterf, dbInfo)); return Void(); } + +void StorageQueueInfo::refreshCommitCost(double elapsed) { + busiestWriteTags.clear(); + TransactionTag busiestTag; + TransactionCommitCostEstimation maxCost; + double maxRate = 0, maxBusyness = 0; + for (const auto& [tag, cost] : tagCostEst) { + double rate = cost.getCostSum() / elapsed; + if (rate > maxRate) { + busiestTag = tag; + maxRate = rate; + maxCost = cost; + } + } + if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { + // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", totalWriteCost).detail("TotalWriteOps",totalWriteOps); + ASSERT_GT(totalWriteCosts, 0); + maxBusyness = double(maxCost.getCostSum()) / totalWriteCosts; + busiestWriteTags.emplace_back(busiestTag, maxBusyness, maxRate); + } + + TraceEvent("BusiestWriteTag", id) + .detail("Elapsed", elapsed) + .detail("Tag", printable(busiestTag)) + .detail("TagOps", maxCost.getOpsSum()) + .detail("TagCost", maxCost.getCostSum()) + .detail("TotalCost", totalWriteCosts) + .detail("Reported", !busiestWriteTags.empty()) + .trackLatest(busiestWriteTagEventHolder->trackingKey); + + // reset statistics + tagCostEst.clear(); + totalWriteOps = 0; + totalWriteCosts = 0; +} diff --git a/fdbserver/Ratekeeper.h b/fdbserver/Ratekeeper.h index d463ac0e1d..97037a2a63 100644 --- a/fdbserver/Ratekeeper.h +++ b/fdbserver/Ratekeeper.h @@ -76,6 +76,8 @@ struct StorageQueueInfo { // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo lastReply.instanceID = -1; } + + void refreshCommitCost(double elapsed); }; struct TLogQueueInfo { @@ -180,14 +182,12 @@ class Ratekeeper { Future expiredTagThrottleCleanup; - double lastBusiestCommitTagPick; - Ratekeeper(UID id, Database db); Future configurationMonitor(); void updateCommitCostEstimation(UIDTransactionTagMap const& costEstimation); void updateRate(RatekeeperLimits* limits); - Future refreshStorageServerCommitCost(); + Future refreshStorageServerCommitCosts(); Future monitorServerListChange(PromiseStream>> serverChanges); Future trackEachStorageServer(FutureStream>> serverChanges);