Move all code from Ratekeeper.actor.cpp into RatekeeperData.actor.cpp

This commit is contained in:
sfc-gh-tclinkenbeard 2022-02-14 12:32:34 -08:00
parent 9beae3fb64
commit 687df447ce
3 changed files with 300 additions and 1617 deletions

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,39 @@
#include "fdbserver/RatekeeperData.h"
#include "flow/actorcompiler.h" // must be last include
const char* limitReasonName[] = { "workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed" };
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
int limitReasonEnd = limitReason_t_end;
// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph)
// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS!
const char* limitReasonDesc[] = { "Workload or read performance.",
"Storage server performance (storage queue).",
"Storage server MVCC memory.",
"Storage server version falling behind.",
"Log server MVCC memory.",
"Storage server performance (log queue).",
"Storage server running out of space (approaching 100MB limit).",
"Storage server running out of space (approaching 5% limit).",
"Log server running out of space (approaching 100MB limit).",
"Log server running out of space (approaching 5% limit).",
"Storage server durable version falling behind.",
"Unable to fetch storage server list." };
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
ACTOR static Future<Void> splitError(Future<Void> in, Promise<Void> errOut) {
try {
wait(in);
@ -228,6 +261,258 @@ public:
}
}
ACTOR static Future<Void> monitorThrottlingChanges(RatekeeperData* self) {
state bool committed = false;
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<RangeResult> throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
if (!committed) {
BinaryWriter limitWriter(Unversioned());
limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
tr.set(tagThrottleLimitKey, limitWriter.toValue());
}
wait(success(throttledTagKeys) && success(autoThrottlingEnabled));
if (autoThrottlingEnabled.get().present() &&
autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
TEST(true); // Auto-throttling disabled
if (self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingDisabled", self->id).log();
}
self->autoThrottlingEnabled = false;
} else if (autoThrottlingEnabled.get().present() &&
autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
TEST(true); // Auto-throttling enabled
if (!self->autoThrottlingEnabled) {
TraceEvent("AutoTagThrottlingEnabled", self->id).log();
}
self->autoThrottlingEnabled = true;
} else {
TEST(true); // Auto-throttling unspecified
if (autoThrottlingEnabled.get().present()) {
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id)
.detail("Value", autoThrottlingEnabled.get().get());
}
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
if (!committed)
tr.set(tagThrottleAutoEnabledKey,
LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0"));
}
RkTagThrottleCollection updatedTagThrottles;
TraceEvent("RatekeeperReadThrottledTags", self->id)
.detail("NumThrottledTags", throttledTagKeys.get().size());
for (auto entry : throttledTagKeys.get()) {
TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key);
TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value);
ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported
if (tagValue.expirationTime == 0 ||
tagValue.expirationTime > now() + tagValue.initialDuration) {
TEST(true); // Converting tag throttle duration to absolute time
tagValue.expirationTime = now() + tagValue.initialDuration;
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason()));
wr << tagValue;
state Value value = wr.toValue();
tr.set(entry.key, value);
}
if (tagValue.expirationTime > now()) {
TransactionTag tag = *tagKey.tags.begin();
Optional<ClientTagThrottleLimits> oldLimits =
self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority);
if (tagKey.throttleType == TagThrottleType::AUTO) {
updatedTagThrottles.autoThrottleTag(
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
if (tagValue.reason == TagThrottledReason::BUSY_READ) {
updatedTagThrottles.busyReadTagCount++;
} else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) {
updatedTagThrottles.busyWriteTagCount++;
}
} else {
updatedTagThrottles.manualThrottleTag(self->id,
tag,
tagKey.priority,
tagValue.tpsRate,
tagValue.expirationTime,
oldLimits);
}
}
}
self->throttledTags = std::move(updatedTagThrottles);
++self->throttledTagChangeId;
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
committed = true;
wait(watchFuture);
TraceEvent("RatekeeperThrottleSignaled", self->id).log();
TEST(true); // Tag throttle changes detected
break;
} catch (Error& e) {
TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e);
wait(tr.onError(e));
}
}
}
}
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state RatekeeperData self(rkInterf.id(),
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
state Promise<Void> err;
state Future<Void> collection = actorCollection(self.addActor.getFuture());
TraceEvent("RatekeeperStarting", rkInterf.id());
self.addActor.send(waitFailureServer(rkInterf.waitFailure.getFuture()));
self.addActor.send(self.configurationMonitor());
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges;
self.addActor.send(self.monitorServerListChange(serverChanges));
self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture()));
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
self.addActor.send(self.monitorThrottlingChanges());
RatekeeperData* selfPtr = &self; // let flow compiler capture self
self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); },
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)
.detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail(
"Rate",
(SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) /
((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) +
2.0));
TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id())
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER)
.detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)
.detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES)
.detail(
"Rate",
(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) /
((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) +
2.0));
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
tlogTrackers.reserve(tlogInterfs.size());
for (int i = 0; i < tlogInterfs.size(); i++) {
tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err));
}
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
try {
state bool lastLimited = false;
loop choose {
when(wait(timeout)) {
self.updateRate(&self.normalLimits);
self.updateRate(&self.batchLimits);
lastLimited = self.smoothReleasedTransactions.smoothRate() >
SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0;
for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) {
if (p->second.lastUpdateTime < tooOld)
p = self.grvProxyInfo.erase(p);
else
++p;
}
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
}
when(GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
GetRateInfoReply reply;
auto& p = self.grvProxyInfo[req.requesterID];
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions);
if (p.totalTransactions > 0) {
self.smoothReleasedTransactions.addDelta(req.totalReleasedTransactions - p.totalTransactions);
for (auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
}
if (p.batchTransactions > 0) {
self.smoothBatchReleasedTransactions.addDelta(req.batchReleasedTransactions -
p.batchTransactions);
}
p.totalTransactions = req.totalReleasedTransactions;
p.batchTransactions = req.batchReleasedTransactions;
p.lastUpdateTime = now();
reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
if (p.lastThrottledTagChangeId != self.throttledTagChangeId ||
now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) {
p.lastThrottledTagChangeId = self.throttledTagChangeId;
p.lastTagPushTime = now();
reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled);
bool returningTagsToProxy =
reply.throttledTags.present() && reply.throttledTags.get().size() > 0;
TEST(returningTagsToProxy); // Returning tag throttles to a proxy
}
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
reply.healthMetrics.batchLimited = lastLimited;
req.reply.send(reply);
}
when(HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) {
req.reply.send(Void());
TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(ReportCommitCostEstimationRequest req =
waitNext(rkInterf.reportCommitCostEstimation.getFuture())) {
self.updateCommitCostEstimation(req.ssTrTagCommitCost);
req.reply.send(Void());
}
when(wait(err.getFuture())) {}
when(wait(dbInfo->onChange())) {
if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) {
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
tlogTrackers = std::vector<Future<Void>>();
for (int i = 0; i < tlogInterfs.size(); i++)
tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err));
}
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
}
when(wait(collection)) {
ASSERT(false);
throw internal_error();
}
}
} catch (Error& err) {
TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true);
}
return Void();
}
}; // class RatekeeperDataImpl
Future<Void> RatekeeperData::configurationMonitor() {
@ -252,6 +537,14 @@ Future<Void> RatekeeperData::trackTLogQueueInfo(TLogInterface tli) {
return RatekeeperDataImpl::trackTLogQueueInfo(this, tli);
}
Future<Void> RatekeeperData::monitorThrottlingChanges() {
return RatekeeperDataImpl::monitorThrottlingChanges(this);
}
Future<Void> RatekeeperData::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return RatekeeperDataImpl::run(rkInterf, dbInfo);
}
RatekeeperData::RatekeeperData(UID id, Database db)
: id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
@ -863,3 +1156,8 @@ void RatekeeperData::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQue
}
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
wait(RatekeeperData::run(rkInterf, dbInfo));
return Void();
}

View File

@ -563,6 +563,7 @@ struct RatekeeperData {
void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason);
void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
Future<Void> monitorThrottlingChanges();
static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);
};