Add StorageQueueInfo::refreshCommitCost method

This commit is contained in:
sfc-gh-tclinkenbeard 2022-03-02 19:16:33 -08:00
parent 455b75abca
commit 229f0cca8b
2 changed files with 54 additions and 49 deletions

View File

@ -288,9 +288,7 @@ public:
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
self.addActor.send(self.monitorThrottlingChanges()); self.addActor.send(self.monitorThrottlingChanges());
Ratekeeper* selfPtr = &self; // let flow compiler capture self self.addActor.send(self.refreshStorageServerCommitCosts());
self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); },
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)
@ -410,6 +408,19 @@ public:
return Void(); return Void();
} }
ACTOR static Future<Void> refreshStorageServerCommitCosts(Ratekeeper* self) {
state double lastBusiestCommitTagPick;
loop {
lastBusiestCommitTagPick = now();
wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
double elapsed = now() - lastBusiestCommitTagPick;
// for each SS, select the busiest commit tag from ssTrTagCommitCost
for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) {
ssQueueInfo.refreshCommitCost(elapsed);
}
}
}
}; // class RatekeeperImpl }; // class RatekeeperImpl
Future<Void> Ratekeeper::configurationMonitor() { Future<Void> Ratekeeper::configurationMonitor() {
@ -964,52 +975,46 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
} }
} }
Future<Void> Ratekeeper::refreshStorageServerCommitCost() { Future<Void> Ratekeeper::refreshStorageServerCommitCosts() {
if (lastBusiestCommitTagPick == 0) { // the first call should be skipped return RatekeeperImpl::refreshStorageServerCommitCosts(this);
lastBusiestCommitTagPick = now();
return Void();
}
double elapsed = now() - lastBusiestCommitTagPick;
// for each SS, select the busiest commit tag from ssTrTagCommitCost
for (auto& [ssId, ssQueueInfo] : storageQueueInfo) {
ssQueueInfo.busiestWriteTags.clear();
TransactionTag busiestTag;
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for (const auto& [tag, cost] : ssQueueInfo.tagCostEst) {
double rate = cost.getCostSum() / elapsed;
if (rate > maxRate) {
busiestTag = tag;
maxRate = rate;
maxCost = cost;
}
}
if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) {
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps);
ASSERT_GT(ssQueueInfo.totalWriteCosts, 0);
maxBusyness = double(maxCost.getCostSum()) / ssQueueInfo.totalWriteCosts;
ssQueueInfo.busiestWriteTags.emplace_back(busiestTag, maxBusyness, maxRate);
}
TraceEvent("BusiestWriteTag", ssId)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagCost", maxCost.getCostSum())
.detail("TotalCost", ssQueueInfo.totalWriteCosts)
.detail("Reported", !ssQueueInfo.busiestWriteTags.empty())
.trackLatest(ssQueueInfo.busiestWriteTagEventHolder->trackingKey);
// reset statistics
ssQueueInfo.tagCostEst.clear();
ssQueueInfo.totalWriteOps = 0;
ssQueueInfo.totalWriteCosts = 0;
}
lastBusiestCommitTagPick = now();
return Void();
} }
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) { ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
wait(Ratekeeper::run(rkInterf, dbInfo)); wait(Ratekeeper::run(rkInterf, dbInfo));
return Void(); return Void();
} }
void StorageQueueInfo::refreshCommitCost(double elapsed) {
busiestWriteTags.clear();
TransactionTag busiestTag;
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for (const auto& [tag, cost] : tagCostEst) {
double rate = cost.getCostSum() / elapsed;
if (rate > maxRate) {
busiestTag = tag;
maxRate = rate;
maxCost = cost;
}
}
if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) {
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", totalWriteCost).detail("TotalWriteOps",totalWriteOps);
ASSERT_GT(totalWriteCosts, 0);
maxBusyness = double(maxCost.getCostSum()) / totalWriteCosts;
busiestWriteTags.emplace_back(busiestTag, maxBusyness, maxRate);
}
TraceEvent("BusiestWriteTag", id)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagCost", maxCost.getCostSum())
.detail("TotalCost", totalWriteCosts)
.detail("Reported", !busiestWriteTags.empty())
.trackLatest(busiestWriteTagEventHolder->trackingKey);
// reset statistics
tagCostEst.clear();
totalWriteOps = 0;
totalWriteCosts = 0;
}

View File

@ -76,6 +76,8 @@ struct StorageQueueInfo {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1; lastReply.instanceID = -1;
} }
void refreshCommitCost(double elapsed);
}; };
struct TLogQueueInfo { struct TLogQueueInfo {
@ -180,14 +182,12 @@ class Ratekeeper {
Future<Void> expiredTagThrottleCleanup; Future<Void> expiredTagThrottleCleanup;
double lastBusiestCommitTagPick;
Ratekeeper(UID id, Database db); Ratekeeper(UID id, Database db);
Future<Void> configurationMonitor(); Future<Void> configurationMonitor();
void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation); void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation);
void updateRate(RatekeeperLimits* limits); void updateRate(RatekeeperLimits* limits);
Future<Void> refreshStorageServerCommitCost(); Future<Void> refreshStorageServerCommitCosts();
Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges); Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges); Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);