Merge pull request #6582 from sfc-gh-tclinkenbeard/global-tag-throttling2

Various `TagThrottler` enhancements
This commit is contained in:
Trevor Clinkenbeard 2022-03-23 12:54:30 -07:00 committed by GitHub
commit bdff100ef9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 800 additions and 632 deletions

View File

@ -993,6 +993,22 @@ struct GetStorageMetricsRequest {
}; };
struct StorageQueuingMetricsReply { struct StorageQueuingMetricsReply {
struct TagInfo {
constexpr static FileIdentifier file_identifier = 4528694;
TransactionTag tag;
double rate{ 0.0 };
double fractionalBusyness{ 0.0 };
TagInfo() = default;
TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness)
: tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tag, rate, fractionalBusyness);
}
};
constexpr static FileIdentifier file_identifier = 7633366; constexpr static FileIdentifier file_identifier = 7633366;
double localTime; double localTime;
int64_t instanceID; // changes if bytesDurable and bytesInput reset int64_t instanceID; // changes if bytesDurable and bytesInput reset
@ -1003,9 +1019,7 @@ struct StorageQueuingMetricsReply {
double cpuUsage; double cpuUsage;
double diskUsage; double diskUsage;
double localRateLimit; double localRateLimit;
Optional<TransactionTag> busiestTag; std::vector<TagInfo> busiestTags;
double busiestTagFractionalBusyness;
double busiestTagRate;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
@ -1020,9 +1034,7 @@ struct StorageQueuingMetricsReply {
cpuUsage, cpuUsage,
diskUsage, diskUsage,
localRateLimit, localRateLimit,
busiestTag, busiestTags);
busiestTagFractionalBusyness,
busiestTagRate);
} }
}; };

View File

@ -118,6 +118,8 @@ set(FDBSERVER_SRCS
RestoreWorker.actor.h RestoreWorker.actor.h
RestoreWorkerInterface.actor.cpp RestoreWorkerInterface.actor.cpp
RestoreWorkerInterface.actor.h RestoreWorkerInterface.actor.h
RkTagThrottleCollection.cpp
RkTagThrottleCollection.h
RocksDBCheckpointUtils.actor.cpp RocksDBCheckpointUtils.actor.cpp
RocksDBCheckpointUtils.actor.h RocksDBCheckpointUtils.actor.h
RoleLineage.actor.cpp RoleLineage.actor.cpp
@ -152,6 +154,8 @@ set(FDBSERVER_SRCS
TesterInterface.actor.h TesterInterface.actor.h
TLogInterface.h TLogInterface.h
TLogServer.actor.cpp TLogServer.actor.cpp
TransactionTagCounter.cpp
TransactionTagCounter.h
TSSMappingUtil.actor.cpp TSSMappingUtil.actor.cpp
TSSMappingUtil.actor.h TSSMappingUtil.actor.h
VersionedBTree.actor.cpp VersionedBTree.actor.cpp

View File

@ -157,32 +157,7 @@ public:
ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( ErrorOr<StorageQueuingMetricsReply> reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor(
StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
if (reply.present()) { if (reply.present()) {
myQueueInfo->value.valid = true; myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.reset(reply.get().version);
} else {
self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable -
myQueueInfo->value.prevReply.bytesDurable);
myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
}
myQueueInfo->value.busiestReadTag = reply.get().busiestTag;
myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate;
} else { } else {
if (myQueueInfo->value.valid) { if (myQueueInfo->value.valid) {
TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id()); TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id());
@ -210,24 +185,7 @@ public:
ErrorOr<TLogQueuingMetricsReply> reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor( ErrorOr<TLogQueuingMetricsReply> reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor(
TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply?
if (reply.present()) { if (reply.present()) {
myQueueInfo->value.valid = true; myQueueInfo->value.update(reply.get(), self->smoothTotalDurableBytes);
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
} else {
self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable -
myQueueInfo->value.prevReply.bytesDurable);
myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
}
} else { } else {
if (myQueueInfo->value.valid) { if (myQueueInfo->value.valid) {
TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id()); TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id());
@ -290,9 +248,7 @@ public:
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
self.addActor.send(self.monitorThrottlingChanges()); self.addActor.send(self.monitorThrottlingChanges());
Ratekeeper* selfPtr = &self; // let flow compiler capture self self.addActor.send(self.refreshStorageServerCommitCosts());
self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); },
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)
@ -412,6 +368,19 @@ public:
return Void(); return Void();
} }
ACTOR static Future<Void> refreshStorageServerCommitCosts(Ratekeeper* self) {
state double lastBusiestCommitTagPick;
loop {
lastBusiestCommitTagPick = now();
wait(delay(SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
double elapsed = now() - lastBusiestCommitTagPick;
// for each SS, select the busiest commit tag from ssTrTagCommitCost
for (auto& [ssId, ssQueueInfo] : self->storageQueueInfo) {
ssQueueInfo.refreshCommitCost(elapsed);
}
}
}
}; // class RatekeeperImpl }; // class RatekeeperImpl
Future<Void> Ratekeeper::configurationMonitor() { Future<Void> Ratekeeper::configurationMonitor() {
@ -464,11 +433,8 @@ Ratekeeper::Ratekeeper(UID id, Database db)
SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH,
SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH,
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) {
lastBusiestCommitTagPick(0.0) {
tagThrottler = std::make_unique<TagThrottler>(db, id); tagThrottler = std::make_unique<TagThrottler>(db, id);
expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); },
SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
} }
void Ratekeeper::updateCommitCostEstimation( void Ratekeeper::updateCommitCostEstimation(
@ -478,9 +444,7 @@ void Ratekeeper::updateCommitCostEstimation(
if (tagCostIt == costEstimation.end()) if (tagCostIt == costEstimation.end())
continue; continue;
for (const auto& [tagName, cost] : tagCostIt->second) { for (const auto& [tagName, cost] : tagCostIt->second) {
it->value.tagCostEst[tagName] += cost; it->value.addCommitCost(tagName, cost);
it->value.totalWriteCosts += cost.getCostSum();
it->value.totalWriteOps += cost.getOpsSum();
} }
} }
} }
@ -558,10 +522,10 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
} }
} }
int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal(); int64_t storageQueue = ss.getStorageQueueBytes();
worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue); worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue);
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal(); int64_t storageDurabilityLag = ss.getDurabilityLag();
worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag); worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag);
storageDurabilityLagReverseIndex.insert(std::make_pair(-1 * storageDurabilityLag, &ss)); storageDurabilityLagReverseIndex.insert(std::make_pair(-1 * storageDurabilityLag, &ss));
@ -575,7 +539,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0); double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0);
if (limits->priority == TransactionPriority::DEFAULT) { if (limits->priority == TransactionPriority::DEFAULT) {
addActor.send(tagThrottler->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag)); addActor.send(tagThrottler->tryUpdateAutoThrottling(ss));
} }
double inputRate = ss.smoothInputBytes.smoothRate(); double inputRate = ss.smoothInputBytes.smoothRate();
@ -763,11 +727,11 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
auto& tl = it.value; auto& tl = it.value;
if (!tl.valid) if (!tl.valid)
continue; continue;
maxTLVer = std::max(maxTLVer, tl.lastReply.v); maxTLVer = std::max(maxTLVer, tl.getLastCommittedVersion());
} }
if (minSSVer != std::numeric_limits<Version>::max() && maxTLVer != std::numeric_limits<Version>::min()) { if (minSSVer != std::numeric_limits<Version>::max() && maxTLVer != std::numeric_limits<Version>::min()) {
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed // writeToReadLatencyLimit: 0 = infinite speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit = writeToReadLatencyLimit =
((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4); ((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer); worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
@ -966,54 +930,137 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
} }
} }
Future<Void> Ratekeeper::refreshStorageServerCommitCost() { Future<Void> Ratekeeper::refreshStorageServerCommitCosts() {
if (lastBusiestCommitTagPick == 0) { // the first call should be skipped return RatekeeperImpl::refreshStorageServerCommitCosts(this);
lastBusiestCommitTagPick = now();
return Void();
}
double elapsed = now() - lastBusiestCommitTagPick;
// for each SS, select the busiest commit tag from ssTrTagCommitCost
for (auto it = storageQueueInfo.begin(); it != storageQueueInfo.end(); ++it) {
it->value.busiestWriteTag.reset();
TransactionTag busiestTag;
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for (const auto& [tag, cost] : it->value.tagCostEst) {
double rate = cost.getCostSum() / elapsed;
if (rate > maxRate) {
busiestTag = tag;
maxRate = rate;
maxCost = cost;
}
}
if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) {
it->value.busiestWriteTag = busiestTag;
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps);
ASSERT(it->value.totalWriteCosts > 0);
maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts;
it->value.busiestWriteTagFractionalBusyness = maxBusyness;
it->value.busiestWriteTagRate = maxRate;
}
TraceEvent("BusiestWriteTag", it->key)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagCost", maxCost.getCostSum())
.detail("TotalCost", it->value.totalWriteCosts)
.detail("Reported", it->value.busiestWriteTag.present())
.trackLatest(it->value.busiestWriteTagEventHolder->trackingKey);
// reset statistics
it->value.tagCostEst.clear();
it->value.totalWriteOps = 0;
it->value.totalWriteCosts = 0;
}
lastBusiestCommitTagPick = now();
return Void();
} }
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) { ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
wait(Ratekeeper::run(rkInterf, dbInfo)); wait(Ratekeeper::run(rkInterf, dbInfo));
return Void(); return Void();
} }
StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality)
: busiestWriteTagEventHolder(makeReference<EventCacheHolder>(id.toString() + "/BusiestWriteTag")), valid(false),
id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
limitReason(limitReason_t::unlimited) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
void StorageQueueInfo::addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost) {
tagCostEst[tagName] += cost;
totalWriteCosts += cost.getCostSum();
totalWriteOps += cost.getOpsSum();
}
void StorageQueueInfo::update(StorageQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes) {
valid = true;
auto prevReply = std::move(lastReply);
lastReply = reply;
if (prevReply.instanceID != reply.instanceID) {
smoothDurableBytes.reset(reply.bytesDurable);
verySmoothDurableBytes.reset(reply.bytesDurable);
smoothInputBytes.reset(reply.bytesInput);
smoothFreeSpace.reset(reply.storageBytes.available);
smoothTotalSpace.reset(reply.storageBytes.total);
smoothDurableVersion.reset(reply.durableVersion);
smoothLatestVersion.reset(reply.version);
} else {
smoothTotalDurableBytes.addDelta(reply.bytesDurable - prevReply.bytesDurable);
smoothDurableBytes.setTotal(reply.bytesDurable);
verySmoothDurableBytes.setTotal(reply.bytesDurable);
smoothInputBytes.setTotal(reply.bytesInput);
smoothFreeSpace.setTotal(reply.storageBytes.available);
smoothTotalSpace.setTotal(reply.storageBytes.total);
smoothDurableVersion.setTotal(reply.durableVersion);
smoothLatestVersion.setTotal(reply.version);
}
busiestReadTags = reply.busiestTags;
}
void StorageQueueInfo::refreshCommitCost(double elapsed) {
busiestWriteTags.clear();
TransactionTag busiestTag;
TransactionCommitCostEstimation maxCost;
double maxRate = 0, maxBusyness = 0;
for (const auto& [tag, cost] : tagCostEst) {
double rate = cost.getCostSum() / elapsed;
if (rate > maxRate) {
busiestTag = tag;
maxRate = rate;
maxCost = cost;
}
}
if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) {
// TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", totalWriteCost).detail("TotalWriteOps",totalWriteOps);
ASSERT_GT(totalWriteCosts, 0);
maxBusyness = double(maxCost.getCostSum()) / totalWriteCosts;
busiestWriteTags.emplace_back(busiestTag, maxRate, maxBusyness);
}
TraceEvent("BusiestWriteTag", id)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagOps", maxCost.getOpsSum())
.detail("TagCost", maxCost.getCostSum())
.detail("TotalCost", totalWriteCosts)
.detail("Reported", !busiestWriteTags.empty())
.trackLatest(busiestWriteTagEventHolder->trackingKey);
// reset statistics
tagCostEst.clear();
totalWriteOps = 0;
totalWriteCosts = 0;
}
TLogQueueInfo::TLogQueueInfo(UID id)
: valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from
// storageQueueInfO)
lastReply.instanceID = -1;
}
void TLogQueueInfo::update(TLogQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes) {
valid = true;
auto prevReply = std::move(lastReply);
lastReply = reply;
if (prevReply.instanceID != reply.instanceID) {
smoothDurableBytes.reset(reply.bytesDurable);
verySmoothDurableBytes.reset(reply.bytesDurable);
smoothInputBytes.reset(reply.bytesInput);
smoothFreeSpace.reset(reply.storageBytes.available);
smoothTotalSpace.reset(reply.storageBytes.total);
} else {
smoothTotalDurableBytes.addDelta(reply.bytesDurable - prevReply.bytesDurable);
smoothDurableBytes.setTotal(reply.bytesDurable);
verySmoothDurableBytes.setTotal(reply.bytesDurable);
smoothInputBytes.setTotal(reply.bytesInput);
smoothFreeSpace.setTotal(reply.storageBytes.available);
smoothTotalSpace.setTotal(reply.storageBytes.total);
}
}
RatekeeperLimits::RatekeeperLimits(TransactionPriority priority,
std::string context,
int64_t storageTargetBytes,
int64_t storageSpringBytes,
int64_t logTargetBytes,
int64_t logSpringBytes,
double maxVersionDifference,
int64_t durabilityLagTargetVersions)
: tpsLimit(std::numeric_limits<double>::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(
durabilityLagTargetVersions +
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
// be durable on the storage servers
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), priority(priority),
context(context), rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}

View File

@ -46,57 +46,45 @@ enum limitReason_t {
limitReason_t_end limitReason_t_end
}; };
struct StorageQueueInfo { class StorageQueueInfo {
uint64_t totalWriteCosts{ 0 };
int totalWriteOps{ 0 };
Reference<EventCacheHolder> busiestWriteTagEventHolder;
// refresh periodically
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
public:
bool valid; bool valid;
UID id; UID id;
LocalityData locality; LocalityData locality;
StorageQueuingMetricsReply lastReply; StorageQueuingMetricsReply lastReply;
StorageQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothDurableVersion, smoothLatestVersion; Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace; Smoother smoothFreeSpace;
Smoother smoothTotalSpace; Smoother smoothTotalSpace;
limitReason_t limitReason; limitReason_t limitReason;
std::vector<StorageQueuingMetricsReply::TagInfo> busiestReadTags, busiestWriteTags;
Optional<TransactionTag> busiestReadTag, busiestWriteTag; StorageQueueInfo(UID id, LocalityData locality);
double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; void refreshCommitCost(double elapsed);
double busiestReadTagRate = 0, busiestWriteTagRate = 0; int64_t getStorageQueueBytes() const { return lastReply.bytesInput - smoothDurableBytes.smoothTotal(); }
int64_t getDurabilityLag() const { return smoothLatestVersion.smoothTotal() - smoothDurableVersion.smoothTotal(); }
Reference<EventCacheHolder> busiestWriteTagEventHolder; void update(StorageQueuingMetricsReply const&, Smoother& smoothTotalDurableBytes);
void addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost);
// refresh periodically
TransactionTagMap<TransactionCommitCostEstimation> tagCostEst;
uint64_t totalWriteCosts = 0;
int totalWriteOps = 0;
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
limitReason(limitReason_t::unlimited),
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(id.toString() + "/BusiestWriteTag")) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
}; };
struct TLogQueueInfo { struct TLogQueueInfo {
TLogQueuingMetricsReply lastReply;
bool valid; bool valid;
UID id; UID id;
TLogQueuingMetricsReply lastReply;
TLogQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothFreeSpace; Smoother smoothFreeSpace;
Smoother smoothTotalSpace; Smoother smoothTotalSpace;
TLogQueueInfo(UID id)
: valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), TLogQueueInfo(UID id);
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), Version getLastCommittedVersion() const { return lastReply.v; }
smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { void update(TLogQueuingMetricsReply const& reply, Smoother& smoothTotalDurableBytes);
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from
// storageQueueInfO)
lastReply.instanceID = -1;
}
}; };
struct RatekeeperLimits { struct RatekeeperLimits {
@ -126,17 +114,7 @@ struct RatekeeperLimits {
int64_t logTargetBytes, int64_t logTargetBytes,
int64_t logSpringBytes, int64_t logSpringBytes,
double maxVersionDifference, double maxVersionDifference,
int64_t durabilityLagTargetVersions) int64_t durabilityLagTargetVersions);
: tpsLimit(std::numeric_limits<double>::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(
durabilityLagTargetVersions +
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
// be durable on the storage servers
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), priority(priority),
context(context), rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}
}; };
class Ratekeeper { class Ratekeeper {
@ -144,16 +122,12 @@ class Ratekeeper {
// Differentiate from GrvProxyInfo in DatabaseContext.h // Differentiate from GrvProxyInfo in DatabaseContext.h
struct GrvProxyInfo { struct GrvProxyInfo {
int64_t totalTransactions; int64_t totalTransactions{ 0 };
int64_t batchTransactions; int64_t batchTransactions{ 0 };
uint64_t lastThrottledTagChangeId; uint64_t lastThrottledTagChangeId{ 0 };
double lastUpdateTime; double lastUpdateTime{ 0.0 };
double lastTagPushTime; double lastTagPushTime{ 0.0 };
GrvProxyInfo()
: totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0),
lastTagPushTime(0) {}
}; };
UID id; UID id;
@ -181,16 +155,12 @@ class Ratekeeper {
Deque<double> actualTpsHistory; Deque<double> actualTpsHistory;
Optional<Key> remoteDC; Optional<Key> remoteDC;
Future<Void> expiredTagThrottleCleanup;
double lastBusiestCommitTagPick;
Ratekeeper(UID id, Database db); Ratekeeper(UID id, Database db);
Future<Void> configurationMonitor(); Future<Void> configurationMonitor();
void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation); void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation);
void updateRate(RatekeeperLimits* limits); void updateRate(RatekeeperLimits* limits);
Future<Void> refreshStorageServerCommitCost(); Future<Void> refreshStorageServerCommitCosts();
Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges); Future<Void> monitorServerListChange(PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);
Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges); Future<Void> trackEachStorageServer(FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges);

View File

@ -0,0 +1,356 @@
/*
* RkTagThrottleCollection.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/Knobs.h"
#include "fdbserver/RkTagThrottleCollection.h"
double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional<double> requestRate) {
if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
} else {
return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal());
}
}
Optional<double> RkTagThrottleCollection::RkTagThrottleData::updateAndGetClientRate(Optional<double> requestRate) {
if (limits.expiration > now()) {
double targetRate = getTargetRate(requestRate);
if (targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return targetRate;
}
if (!rateSet) {
rateSet = true;
clientRate.reset(targetRate);
} else {
clientRate.setTotal(targetRate);
}
double rate = clientRate.smoothTotal();
ASSERT_GE(rate, 0);
return rate;
} else {
TEST(true); // Get throttle rate for expired throttle
rateSet = false;
return Optional<double>();
}
}
RkTagThrottleCollection::RkTagThrottleCollection(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
RkTagThrottleCollection& RkTagThrottleCollection::RkTagThrottleCollection::operator=(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
return *this;
}
double RkTagThrottleCollection::computeTargetTpsRate(double currentBusyness,
double targetBusyness,
double requestRate) {
ASSERT_GT(currentBusyness, 0);
if (targetBusyness < 1) {
double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness);
return requestRate * targetFraction;
} else {
return std::numeric_limits<double>::max();
}
}
Optional<double> RkTagThrottleCollection::autoThrottleTag(UID id,
TransactionTag const& tag,
double fractionalBusyness,
Optional<double> tpsRate,
Optional<double> expiration) {
ASSERT(!tpsRate.present() || tpsRate.get() >= 0);
ASSERT(!expiration.present() || expiration.get() > now());
auto itr = autoThrottledTags.find(tag);
bool present = (itr != autoThrottledTags.end());
if (!present) {
if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
}
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
} else if (itr->second.limits.expiration <= now()) {
TEST(true); // Re-throttling expired tag that hasn't been cleaned up
present = false;
itr->second = RkTagThrottleData();
}
auto& throttle = itr->second;
if (!tpsRate.present()) {
if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = std::numeric_limits<double>::max();
if (present) {
return Optional<double>();
}
} else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
TEST(true); // Tag auto-throttled too quickly
return Optional<double>();
} else {
tpsRate = computeTargetTpsRate(fractionalBusyness,
SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS,
tagData[tag].requestRate.smoothRate());
if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
TEST(true); // Tag auto-throttle rate increase attempt while active
return Optional<double>();
}
throttle.lastUpdated = now();
if (tpsRate.get() < throttle.limits.tpsRate) {
throttle.lastReduced = now();
}
}
}
if (!expiration.present()) {
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now())
.detail("ClientRate", clientRate)
.detail("Created", now() - throttle.created)
.detail("LastUpdate", now() - throttle.lastUpdated)
.detail("LastReduced", now() - throttle.lastReduced);
if (tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
} else {
return Optional<double>();
}
}
void RkTagThrottleCollection::manualThrottleTag(UID id,
TransactionTag const& tag,
TransactionPriority priority,
double tpsRate,
double expiration,
Optional<ClientTagThrottleLimits> const& oldLimits) {
ASSERT(tpsRate >= 0);
ASSERT(expiration > now());
auto& priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
initializeTag(tag);
ASSERT(result.second); // Updating to the map is done by copying the whole map
result.first->second.limits.tpsRate = tpsRate;
result.first->second.limits.expiration = expiration;
if (!oldLimits.present()) {
TEST(true); // Transaction tag manually throttled
TraceEvent("RatekeeperAddingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
} else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
TEST(true); // Manual transaction tag throttle updated
TraceEvent("RatekeeperUpdatingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
}
Optional<double> clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
}
Optional<ClientTagThrottleLimits> RkTagThrottleCollection::getManualTagThrottleLimits(TransactionTag const& tag,
TransactionPriority priority) {
auto itr = manualThrottledTags.find(tag);
if (itr != manualThrottledTags.end()) {
auto priorityItr = itr->second.find(priority);
if (priorityItr != itr->second.end()) {
return priorityItr->second.limits;
}
}
return Optional<ClientTagThrottleLimits>();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> RkTagThrottleCollection::getClientRates(
bool autoThrottlingEnabled) {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for (auto tagItr = tagData.begin(); tagItr != tagData.end();) {
bool tagPresent = false;
double requestRate = tagItr->second.requestRate.smoothRate();
auto manualItr = manualThrottledTags.find(tagItr->first);
if (manualItr != manualThrottledTags.end()) {
Optional<ClientTagThrottleLimits> manualClientRate;
for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend());
++priority) {
auto priorityItr = manualItr->second.find(*priority);
if (priorityItr != manualItr->second.end()) {
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
if (!priorityClientRate.present()) {
TEST(true); // Manual priority throttle expired
priorityItr = manualItr->second.erase(priorityItr);
} else {
if (!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) {
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(),
priorityItr->second.limits.expiration);
} else {
TEST(true); // Manual throttle overriden by higher priority
}
++priorityItr;
}
}
if (manualClientRate.present()) {
tagPresent = true;
TEST(true); // Using manual throttle
clientRates[*priority][tagItr->first] = manualClientRate.get();
}
}
if (manualItr->second.empty()) {
TEST(true); // All manual throttles expired
manualThrottledTags.erase(manualItr);
break;
}
}
auto autoItr = autoThrottledTags.find(tagItr->first);
if (autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if (autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION -
SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
if (now() >= rampStartTime && adjustedRate != std::numeric_limits<double>::max()) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if (targetBusyness == 0) {
targetBusyness = 0.01;
}
double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
adjustedRate =
computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
tagPresent = true;
if (autoThrottlingEnabled) {
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(
tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if (!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
} else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] =
ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
} else {
ASSERT(autoItr->second.limits.expiration <= now());
TEST(true); // Auto throttle expired
if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup
tagPresent = true;
} else {
autoThrottledTags.erase(autoItr);
}
}
}
if (!tagPresent) {
TEST(true); // All tag throttles expired
tagItr = tagData.erase(tagItr);
} else {
++tagItr;
}
}
return clientRates;
}
void RkTagThrottleCollection::addRequests(TransactionTag const& tag, int requests) {
if (requests > 0) {
TEST(true); // Requests reported for throttled tag
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if (autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if (manualItr != manualThrottledTags.end()) {
for (auto& [priority, tagThrottleData] : manualItr->second) {
tagThrottleData.updateAndGetClientRate(requestRate);
}
}
}
}
Optional<double> RkTagThrottleCollection::getRequestRate(TransactionTag const& tag) {
auto itr = tagData.find(tag);
if (itr != tagData.end()) {
return itr->second.requestRate.smoothRate();
}
return Optional<double>();
}
int64_t RkTagThrottleCollection::manualThrottleCount() const {
int64_t count = 0;
for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) {
count += itr->second.size();
}
return count;
}
void RkTagThrottleCollection::updateBusyTagCount(TagThrottledReason reason) {
if (reason == TagThrottledReason::BUSY_READ) {
++busyReadTagCount;
} else if (reason == TagThrottledReason::BUSY_WRITE) {
++busyWriteTagCount;
}
}

View File

@ -0,0 +1,89 @@
/*
* RkTagThrottleCollection.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/Knobs.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbrpc/Smoother.h"
class RkTagThrottleCollection : NonCopyable {
struct RkTagData {
Smoother requestRate;
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
};
struct RkTagThrottleData {
ClientTagThrottleLimits limits;
Smoother clientRate;
// Only used by auto-throttles
double created = now();
double lastUpdated = 0;
double lastReduced = now();
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate);
Optional<double> updateAndGetClientRate(Optional<double> requestRate);
};
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
uint32_t busyReadTagCount = 0, busyWriteTagCount = 0;
void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); }
static double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate);
Optional<double> getRequestRate(TransactionTag const& tag);
public:
RkTagThrottleCollection() = default;
RkTagThrottleCollection(RkTagThrottleCollection&& other);
RkTagThrottleCollection& operator=(RkTagThrottleCollection&& other);
// Set or update an auto throttling limit for the specified tag and priority combination.
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
Optional<double> autoThrottleTag(UID id,
TransactionTag const& tag,
double fractionalBusyness,
Optional<double> tpsRate = Optional<double>(),
Optional<double> expiration = Optional<double>());
// Set or update a manual tps rate limit for the specified tag and priority combination
void manualThrottleTag(UID id,
TransactionTag const& tag,
TransactionPriority priority,
double tpsRate,
double expiration,
Optional<ClientTagThrottleLimits> const& oldLimits);
Optional<ClientTagThrottleLimits> getManualTagThrottleLimits(TransactionTag const& tag,
TransactionPriority priority);
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(bool autoThrottlingEnabled);
void addRequests(TransactionTag const& tag, int requests);
int64_t autoThrottleCount() const { return autoThrottledTags.size(); }
int64_t manualThrottleCount() const;
void updateBusyTagCount(TagThrottledReason);
auto getBusyReadTagCount() const { return busyReadTagCount; }
auto getBusyWriteTagCount() const { return busyWriteTagCount; }
};

View File

@ -13,369 +13,14 @@
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
*
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include "fdbserver/TagThrottler.h" #include "fdbserver/TagThrottler.h"
#include "fdbserver/RkTagThrottleCollection.h"
class RkTagThrottleCollection : NonCopyable {
struct RkTagData {
Smoother requestRate;
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
};
struct RkTagThrottleData {
ClientTagThrottleLimits limits;
Smoother clientRate;
// Only used by auto-throttles
double created = now();
double lastUpdated = 0;
double lastReduced = now();
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate) const {
if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
} else {
return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal());
}
}
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if (limits.expiration > now()) {
double targetRate = getTargetRate(requestRate);
if (targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return targetRate;
}
if (!rateSet) {
rateSet = true;
clientRate.reset(targetRate);
} else {
clientRate.setTotal(targetRate);
}
double rate = clientRate.smoothTotal();
ASSERT(rate >= 0);
return rate;
} else {
TEST(true); // Get throttle rate for expired throttle
rateSet = false;
return Optional<double>();
}
}
};
void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); }
public:
RkTagThrottleCollection() {}
RkTagThrottleCollection(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
void operator=(RkTagThrottleCollection&& other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) {
ASSERT(currentBusyness > 0);
if (targetBusyness < 1) {
double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness);
return requestRate * targetFraction;
} else {
return std::numeric_limits<double>::max();
}
}
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
Optional<double> autoThrottleTag(UID id,
TransactionTag const& tag,
double fractionalBusyness,
Optional<double> tpsRate = Optional<double>(),
Optional<double> expiration = Optional<double>()) {
ASSERT(!tpsRate.present() || tpsRate.get() >= 0);
ASSERT(!expiration.present() || expiration.get() > now());
auto itr = autoThrottledTags.find(tag);
bool present = (itr != autoThrottledTags.end());
if (!present) {
if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
}
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
} else if (itr->second.limits.expiration <= now()) {
TEST(true); // Re-throttling expired tag that hasn't been cleaned up
present = false;
itr->second = RkTagThrottleData();
}
auto& throttle = itr->second;
if (!tpsRate.present()) {
if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = std::numeric_limits<double>::max();
if (present) {
return Optional<double>();
}
} else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
TEST(true); // Tag auto-throttled too quickly
return Optional<double>();
} else {
tpsRate = computeTargetTpsRate(fractionalBusyness,
SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS,
tagData[tag].requestRate.smoothRate());
if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
TEST(true); // Tag auto-throttle rate increase attempt while active
return Optional<double>();
}
throttle.lastUpdated = now();
if (tpsRate.get() < throttle.limits.tpsRate) {
throttle.lastReduced = now();
}
}
}
if (!expiration.present()) {
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now())
.detail("ClientRate", clientRate)
.detail("Created", now() - throttle.created)
.detail("LastUpdate", now() - throttle.lastUpdated)
.detail("LastReduced", now() - throttle.lastReduced);
if (tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
} else {
return Optional<double>();
}
}
void manualThrottleTag(UID id,
TransactionTag const& tag,
TransactionPriority priority,
double tpsRate,
double expiration,
Optional<ClientTagThrottleLimits> const& oldLimits) {
ASSERT(tpsRate >= 0);
ASSERT(expiration > now());
auto& priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
initializeTag(tag);
ASSERT(result.second); // Updating to the map is done by copying the whole map
result.first->second.limits.tpsRate = tpsRate;
result.first->second.limits.expiration = expiration;
if (!oldLimits.present()) {
TEST(true); // Transaction tag manually throttled
TraceEvent("RatekeeperAddingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
} else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
TEST(true); // Manual transaction tag throttle updated
TraceEvent("RatekeeperUpdatingManualThrottle", id)
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
}
Optional<double> clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
}
Optional<ClientTagThrottleLimits> getManualTagThrottleLimits(TransactionTag const& tag,
TransactionPriority priority) {
auto itr = manualThrottledTags.find(tag);
if (itr != manualThrottledTags.end()) {
auto priorityItr = itr->second.find(priority);
if (priorityItr != itr->second.end()) {
return priorityItr->second.limits;
}
}
return Optional<ClientTagThrottleLimits>();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(bool autoThrottlingEnabled) {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for (auto tagItr = tagData.begin(); tagItr != tagData.end();) {
bool tagPresent = false;
double requestRate = tagItr->second.requestRate.smoothRate();
auto manualItr = manualThrottledTags.find(tagItr->first);
if (manualItr != manualThrottledTags.end()) {
Optional<ClientTagThrottleLimits> manualClientRate;
for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend());
++priority) {
auto priorityItr = manualItr->second.find(*priority);
if (priorityItr != manualItr->second.end()) {
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
if (!priorityClientRate.present()) {
TEST(true); // Manual priority throttle expired
priorityItr = manualItr->second.erase(priorityItr);
} else {
if (!manualClientRate.present() ||
manualClientRate.get().tpsRate > priorityClientRate.get()) {
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(),
priorityItr->second.limits.expiration);
} else {
TEST(true); // Manual throttle overriden by higher priority
}
++priorityItr;
}
}
if (manualClientRate.present()) {
tagPresent = true;
TEST(true); // Using manual throttle
clientRates[*priority][tagItr->first] = manualClientRate.get();
}
}
if (manualItr->second.empty()) {
TEST(true); // All manual throttles expired
manualThrottledTags.erase(manualItr);
break;
}
}
auto autoItr = autoThrottledTags.find(tagItr->first);
if (autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if (autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION -
SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
if (now() >= rampStartTime && adjustedRate != std::numeric_limits<double>::max()) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if (targetBusyness == 0) {
targetBusyness = 0.01;
}
double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
adjustedRate =
computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
tagPresent = true;
if (autoThrottlingEnabled) {
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(
tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if (!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second =
ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
} else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] =
ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
} else {
ASSERT(autoItr->second.limits.expiration <= now());
TEST(true); // Auto throttle expired
if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup
tagPresent = true;
} else {
autoThrottledTags.erase(autoItr);
}
}
}
if (!tagPresent) {
TEST(true); // All tag throttles expired
tagItr = tagData.erase(tagItr);
} else {
++tagItr;
}
}
return clientRates;
}
void addRequests(TransactionTag const& tag, int requests) {
if (requests > 0) {
TEST(true); // Requests reported for throttled tag
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if (autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if (manualItr != manualThrottledTags.end()) {
for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end();
++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
}
}
}
}
Optional<double> getRequestRate(TransactionTag const& tag) {
auto itr = tagData.find(tag);
if (itr != tagData.end()) {
return itr->second.requestRate.smoothRate();
}
return Optional<double>();
}
int64_t autoThrottleCount() const { return autoThrottledTags.size(); }
int64_t manualThrottleCount() const {
int64_t count = 0;
for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) {
count += itr->second.size();
}
return count;
}
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
uint32_t busyReadTagCount = 0, busyWriteTagCount = 0;
};
class TagThrottlerImpl { class TagThrottlerImpl {
Database db; Database db;
@ -383,6 +28,7 @@ class TagThrottlerImpl {
RkTagThrottleCollection throttledTags; RkTagThrottleCollection throttledTags;
uint64_t throttledTagChangeId{ 0 }; uint64_t throttledTagChangeId{ 0 };
bool autoThrottlingEnabled{ false }; bool autoThrottlingEnabled{ false };
Future<Void> expiredTagThrottleCleanup;
ACTOR static Future<Void> monitorThrottlingChanges(TagThrottlerImpl* self) { ACTOR static Future<Void> monitorThrottlingChanges(TagThrottlerImpl* self) {
state bool committed = false; state bool committed = false;
@ -460,11 +106,7 @@ class TagThrottlerImpl {
if (tagKey.throttleType == TagThrottleType::AUTO) { if (tagKey.throttleType == TagThrottleType::AUTO) {
updatedTagThrottles.autoThrottleTag( updatedTagThrottles.autoThrottleTag(
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
if (tagValue.reason == TagThrottledReason::BUSY_READ) { updatedTagThrottles.updateBusyTagCount(tagValue.reason);
updatedTagThrottles.busyReadTagCount++;
} else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) {
updatedTagThrottles.busyWriteTagCount++;
}
} else { } else {
updatedTagThrottles.manualThrottleTag(self->id, updatedTagThrottles.manualThrottleTag(self->id,
tag, tag,
@ -495,16 +137,12 @@ class TagThrottlerImpl {
} }
} }
Optional<double> autoThrottleTag(UID id, TransactionTag tag, double busyness) { Future<Void> tryUpdateAutoThrottling(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
return throttledTags.autoThrottleTag(id, tag, busyness);
}
Future<Void> tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
// NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE // NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs. // currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled TEST(true); // Transaction tag auto-throttled
Optional<double> clientRate = autoThrottleTag(id, tag, busyness); Optional<double> clientRate = throttledTags.autoThrottleTag(id, tag, busyness);
if (clientRate.present()) { if (clientRate.present()) {
TagSet tags; TagSet tags;
tags.addTag(tag); tags.addTag(tag);
@ -524,7 +162,10 @@ class TagThrottlerImpl {
} }
public: public:
TagThrottlerImpl(Database db, UID id) : db(db), id(id) {} TagThrottlerImpl(Database db, UID id) : db(db), id(id) {
expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); },
SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
}
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); } Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); } void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); }
@ -533,28 +174,31 @@ public:
return throttledTags.getClientRates(autoThrottlingEnabled); return throttledTags.getClientRates(autoThrottlingEnabled);
} }
int64_t autoThrottleCount() const { return throttledTags.autoThrottleCount(); } int64_t autoThrottleCount() const { return throttledTags.autoThrottleCount(); }
uint32_t busyReadTagCount() const { return throttledTags.busyReadTagCount; } uint32_t busyReadTagCount() const { return throttledTags.getBusyReadTagCount(); }
uint32_t busyWriteTagCount() const { return throttledTags.busyWriteTagCount; } uint32_t busyWriteTagCount() const { return throttledTags.getBusyWriteTagCount(); }
int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); } int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); }
bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; } bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; }
Future<Void> tryAutoThrottleTag(StorageQueueInfo const& ss, int64_t storageQueue, int64_t storageDurabilityLag) { Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
// NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In // NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In
// most of situation, this works. More indicators besides queue size and durability lag could be investigated in // most of situation, this works. More indicators besides queue size and durability lag could be investigated in
// the future // the future
auto storageQueue = ss.getStorageQueueBytes();
auto storageDurabilityLag = ss.getDurabilityLag();
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
if (ss.busiestWriteTag.present()) { // TODO: Update once size is potentially > 1
return tryAutoThrottleTag(ss.busiestWriteTag.get(), ASSERT_WE_THINK(ss.busiestWriteTags.size() <= 1);
ss.busiestWriteTagRate, ASSERT_WE_THINK(ss.busiestReadTags.size() <= 1);
ss.busiestWriteTagFractionalBusyness, for (const auto& busyWriteTag : ss.busiestWriteTags) {
TagThrottledReason::BUSY_WRITE); return tryUpdateAutoThrottling(busyWriteTag.tag,
busyWriteTag.rate,
busyWriteTag.fractionalBusyness,
TagThrottledReason::BUSY_WRITE);
} }
if (ss.busiestReadTag.present()) { for (const auto& busyReadTag : ss.busiestReadTags) {
return tryAutoThrottleTag(ss.busiestReadTag.get(), return tryUpdateAutoThrottling(
ss.busiestReadTagRate, busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ);
ss.busiestReadTagFractionalBusyness,
TagThrottledReason::BUSY_READ);
} }
} }
return Void(); return Void();
@ -591,8 +235,6 @@ int64_t TagThrottler::manualThrottleCount() const {
bool TagThrottler::isAutoThrottlingEnabled() const { bool TagThrottler::isAutoThrottlingEnabled() const {
return impl->isAutoThrottlingEnabled(); return impl->isAutoThrottlingEnabled();
} }
Future<Void> TagThrottler::tryAutoThrottleTag(StorageQueueInfo const& ss, Future<Void> TagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
int64_t storageQueue, return impl->tryUpdateAutoThrottling(ss);
int64_t storageDurabilityLag) {
return impl->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag);
} }

View File

@ -29,14 +29,26 @@ class TagThrottler {
public: public:
TagThrottler(Database db, UID id); TagThrottler(Database db, UID id);
~TagThrottler(); ~TagThrottler();
// Poll the system keyspace looking for updates made through the tag throttling API
Future<Void> monitorThrottlingChanges(); Future<Void> monitorThrottlingChanges();
// Increment the number of known requests associated with the specified tag
void addRequests(TransactionTag tag, int count); void addRequests(TransactionTag tag, int count);
// This throttled tag change ID is used to coordinate updates with the GRV proxies
uint64_t getThrottledTagChangeId() const; uint64_t getThrottledTagChangeId() const;
// For each tag and priority combination, return the throughput limit and expiration time
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(); PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates();
int64_t autoThrottleCount() const; int64_t autoThrottleCount() const;
uint32_t busyReadTagCount() const; uint32_t busyReadTagCount() const;
uint32_t busyWriteTagCount() const; uint32_t busyWriteTagCount() const;
int64_t manualThrottleCount() const; int64_t manualThrottleCount() const;
bool isAutoThrottlingEnabled() const; bool isAutoThrottlingEnabled() const;
Future<Void> tryAutoThrottleTag(StorageQueueInfo const&, int64_t storageQueue, int64_t storageDurabilityLag);
// Based on the busiest read and write tags in the provided storage queue info, update
// tag throttling limits.
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&);
}; };

View File

@ -0,0 +1,67 @@
/*
* TransactionTagCounter.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/TransactionTagCounter.h"
#include "flow/Trace.h"
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
: thisServerID(thisServerID),
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
if (tags.present()) {
TEST(true); // Tracking transaction tag in counter
double cost = costFunction(bytes);
for (auto& tag : tags.get()) {
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
count += cost;
if (count > busiestTagCount) {
busiestTagCount = count;
busiestTag = tag;
}
}
intervalTotalSampledCount += cost;
}
}
void TransactionTagCounter::startNewInterval() {
double elapsed = now() - intervalStart;
previousBusiestTags.clear();
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
previousBusiestTags.emplace_back(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
}
TraceEvent("BusiestReadTag", thisServerID)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagCost", busiestTagCount)
.detail("TotalSampledCost", intervalTotalSampledCount)
.detail("Reported", !previousBusiestTags.empty())
.trackLatest(busiestReadTagEventHolder->trackingKey);
}
intervalCounts.clear();
intervalTotalSampledCount = 0;
busiestTagCount = 0;
intervalStart = now();
}

View File

@ -0,0 +1,44 @@
/*
* TransactionTagCounter.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/Knobs.h"
class TransactionTagCounter {
TransactionTagMap<int64_t> intervalCounts;
int64_t intervalTotalSampledCount = 0;
TransactionTag busiestTag;
int64_t busiestTagCount = 0;
double intervalStart = 0;
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
UID thisServerID;
Reference<EventCacheHolder> busiestReadTagEventHolder;
public:
TransactionTagCounter(UID thisServerID);
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
void addRequest(Optional<TagSet> const& tags, int64_t bytes);
void startNewInterval();
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
};

View File

@ -62,6 +62,7 @@
#include "fdbserver/ServerCheckpoint.actor.h" #include "fdbserver/ServerCheckpoint.actor.h"
#include "fdbserver/ServerDBInfo.h" #include "fdbserver/ServerDBInfo.h"
#include "fdbserver/TLogInterface.h" #include "fdbserver/TLogInterface.h"
#include "fdbserver/TransactionTagCounter.h"
#include "fdbserver/WaitFailure.h" #include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h" #include "fdbrpc/sim_validation.h"
@ -849,78 +850,6 @@ public:
return val; return val;
} }
struct TransactionTagCounter {
struct TagInfo {
TransactionTag tag;
double rate;
double fractionalBusyness;
TagInfo(TransactionTag const& tag, double rate, double fractionalBusyness)
: tag(tag), rate(rate), fractionalBusyness(fractionalBusyness) {}
};
TransactionTagMap<int64_t> intervalCounts;
int64_t intervalTotalSampledCount = 0;
TransactionTag busiestTag;
int64_t busiestTagCount = 0;
double intervalStart = 0;
Optional<TagInfo> previousBusiestTag;
UID thisServerID;
Reference<EventCacheHolder> busiestReadTagEventHolder;
TransactionTagCounter(UID thisServerID)
: thisServerID(thisServerID),
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
if (tags.present()) {
TEST(true); // Tracking tag on storage server
double cost = costFunction(bytes);
for (auto& tag : tags.get()) {
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
count += cost;
if (count > busiestTagCount) {
busiestTagCount = count;
busiestTag = tag;
}
}
intervalTotalSampledCount += cost;
}
}
void startNewInterval() {
double elapsed = now() - intervalStart;
previousBusiestTag.reset();
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
previousBusiestTag = TagInfo(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
}
TraceEvent("BusiestReadTag", thisServerID)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagCost", busiestTagCount)
.detail("TotalSampledCost", intervalTotalSampledCount)
.detail("Reported", previousBusiestTag.present())
.trackLatest(busiestReadTagEventHolder->trackingKey);
}
intervalCounts.clear();
intervalTotalSampledCount = 0;
busiestTagCount = 0;
intervalStart = now();
}
Optional<TagInfo> getBusiestTag() const { return previousBusiestTag; }
};
TransactionTagCounter transactionTagCounter; TransactionTagCounter transactionTagCounter;
Optional<LatencyBandConfig> latencyBandConfig; Optional<LatencyBandConfig> latencyBandConfig;
@ -4206,11 +4135,7 @@ void getQueuingMetrics(StorageServer* self, StorageQueuingMetricsRequest const&
reply.diskUsage = self->diskUsage; reply.diskUsage = self->diskUsage;
reply.durableVersion = self->durableVersion.get(); reply.durableVersion = self->durableVersion.get();
Optional<StorageServer::TransactionTagCounter::TagInfo> busiestTag = self->transactionTagCounter.getBusiestTag(); reply.busiestTags = self->transactionTagCounter.getBusiestTags();
reply.busiestTag = busiestTag.map<TransactionTag>(
[](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; });
reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0;
reply.busiestTagRate = busiestTag.present() ? busiestTag.get().rate : 0.0;
req.reply.send(reply); req.reply.send(reply);
} }