foundationdb/fdbserver/Ratekeeper.actor.cpp

1346 lines
53 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* Ratekeeper.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/WorkerInterface.actor.h"
2017-05-26 04:48:44 +08:00
#include "flow/IndexedSet.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Smoother.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/TagThrottle.h"
#include "fdbserver/Knobs.h"
2019-02-28 03:51:48 +08:00
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
enum limitReason_t {
unlimited, // TODO: rename to workload?
storage_server_write_queue_size, // 1
2017-05-26 04:48:44 +08:00
storage_server_write_bandwidth_mvcc,
storage_server_readable_behind,
log_server_mvcc_write_bandwidth,
log_server_write_queue, // 5
storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
2017-05-26 04:48:44 +08:00
log_server_min_free_space,
log_server_min_free_space_ratio,
storage_server_durability_lag, // 10
storage_server_list_fetch_failed,
2017-05-26 04:48:44 +08:00
limitReason_t_end
};
int limitReasonEnd = limitReason_t_end;
const char* limitReasonName[] = {
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
2017-05-26 04:48:44 +08:00
};
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph)
// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS!
const char* limitReasonDesc[] = {
"Workload or read performance.",
"Storage server performance (storage queue).",
"Storage server MVCC memory.",
"Storage server version falling behind.",
"Log server MVCC memory.",
"Storage server performance (log queue).",
"Storage server running out of space (approaching 100MB limit).",
"Storage server running out of space (approaching 5% limit).",
"Log server running out of space (approaching 100MB limit).",
"Log server running out of space (approaching 5% limit).",
"Storage server durable version falling behind.",
"Unable to fetch storage server list."
2017-05-26 04:48:44 +08:00
};
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
struct StorageQueueInfo {
bool valid;
UID id;
LocalityData locality;
StorageQueuingMetricsReply lastReply;
StorageQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothDurableVersion, smoothLatestVersion;
2017-05-26 04:48:44 +08:00
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
2019-03-02 05:14:18 +08:00
limitReason_t limitReason;
Optional<TransactionTag> busiestTag;
double busiestTagFractionalBusyness;
double busiestTagRate;
Fix UBSAN report /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86:8: runtime error: load of value 1231493777, which is not a valid value for type 'limitReason_t' #0 0x310e961 in StorageQueueInfo::StorageQueueInfo(StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86 #1 0x310eacd in MapPair<UID, StorageQueueInfo>::MapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:242 #2 0x310b35e in MapPair<std::decay<UID>::type, std::decay<StorageQueueInfo>::type> mapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:258 #3 0x30a8b79 in a_body1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:195 #4 0x309b529 in TrackStorageServerQueueInfoActor /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:495 #5 0x309b9be in trackStorageServerQueueInfo(RatekeeperData* const&, StorageServerInterface const&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:194 #6 0x30cff63 in a_body1loopBody1when1cont1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:303 #7 0x30cd9da in a_body1loopBody1when1when1 /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1170 #8 0x30ed4dd in a_callback_fire /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1185 #9 0x30e6d81 in fire /home/anoyes/workspace/foundationdb/flow/flow.h:998 #10 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #11 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #12 0x7b4b018 in Sim2::execTask(Sim2::Task&) (/home/anoyes/build/foundationdb/bin/fdbserver+0x7b4b018) #13 0x7bf9168 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1cont1(Void const&, int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:979 #14 0x7be7b68 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1when1(Void const&, int) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5391 #15 0x7c329ff in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_callback_fire(ActorCallback<Sim2::RunLoopActor, 0, Void>*, Void) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5406 #16 0x7c1fc73 in ActorCallback<Sim2::RunLoopActor, 0, Void>::fire(Void const&) /home/anoyes/workspace/foundationdb/flow/flow.h:998 #17 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #18 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #19 0x7fe74a4 in N2::PromiseTask::operator()() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:481 #20 0x7fb6ff7 in N2::Net2::run() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:657 #21 0x7b71bd3 in Sim2::_runActorState<Sim2::_runActor>::a_body1(int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:989 #22 0x7b2ee51 in Sim2::_runActor::_runActor(Sim2* const&) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5608 #23 0x7b2f268 in Sim2::_run(Sim2* const&) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:987 #24 0x7b2f2c8 in Sim2::run() /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:996 #25 0x21040a6 in main /home/anoyes/workspace/foundationdb/fdbserver/fdbserver.actor.cpp:1793 #26 0x7f03492ba504 in __libc_start_main (/lib64/libc.so.6+0x22504) #27 0x464914 (/home/anoyes/build/foundationdb/bin/fdbserver+0x464914)
2019-12-04 04:45:34 +08:00
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
Fix UBSAN report /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86:8: runtime error: load of value 1231493777, which is not a valid value for type 'limitReason_t' #0 0x310e961 in StorageQueueInfo::StorageQueueInfo(StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86 #1 0x310eacd in MapPair<UID, StorageQueueInfo>::MapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:242 #2 0x310b35e in MapPair<std::decay<UID>::type, std::decay<StorageQueueInfo>::type> mapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:258 #3 0x30a8b79 in a_body1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:195 #4 0x309b529 in TrackStorageServerQueueInfoActor /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:495 #5 0x309b9be in trackStorageServerQueueInfo(RatekeeperData* const&, StorageServerInterface const&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:194 #6 0x30cff63 in a_body1loopBody1when1cont1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:303 #7 0x30cd9da in a_body1loopBody1when1when1 /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1170 #8 0x30ed4dd in a_callback_fire /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1185 #9 0x30e6d81 in fire /home/anoyes/workspace/foundationdb/flow/flow.h:998 #10 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #11 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #12 0x7b4b018 in Sim2::execTask(Sim2::Task&) (/home/anoyes/build/foundationdb/bin/fdbserver+0x7b4b018) #13 0x7bf9168 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1cont1(Void const&, int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:979 #14 0x7be7b68 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1when1(Void const&, int) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5391 #15 0x7c329ff in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_callback_fire(ActorCallback<Sim2::RunLoopActor, 0, Void>*, Void) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5406 #16 0x7c1fc73 in ActorCallback<Sim2::RunLoopActor, 0, Void>::fire(Void const&) /home/anoyes/workspace/foundationdb/flow/flow.h:998 #17 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #18 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #19 0x7fe74a4 in N2::PromiseTask::operator()() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:481 #20 0x7fb6ff7 in N2::Net2::run() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:657 #21 0x7b71bd3 in Sim2::_runActorState<Sim2::_runActor>::a_body1(int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:989 #22 0x7b2ee51 in Sim2::_runActor::_runActor(Sim2* const&) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5608 #23 0x7b2f268 in Sim2::_run(Sim2* const&) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:987 #24 0x7b2f2c8 in Sim2::run() /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:996 #25 0x21040a6 in main /home/anoyes/workspace/foundationdb/fdbserver/fdbserver.actor.cpp:1793 #26 0x7f03492ba504 in __libc_start_main (/lib64/libc.so.6+0x22504) #27 0x464914 (/home/anoyes/build/foundationdb/bin/fdbserver+0x464914)
2019-12-04 04:45:34 +08:00
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited), busiestTagFractionalBusyness(0),
busiestTagRate(0) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
2017-05-26 04:48:44 +08:00
lastReply.instanceID = -1;
}
};
struct TLogQueueInfo {
bool valid;
UID id;
TLogQueuingMetricsReply lastReply;
TLogQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
TLogQueueInfo(UID id) : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from storageQueueInfO)
2017-05-26 04:48:44 +08:00
lastReply.instanceID = -1;
}
};
class RkTagThrottleCollection : NonCopyable {
private:
struct RkTagData {
Smoother requestRate;
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
};
struct RkTagThrottleData {
ClientTagThrottleLimits limits;
Smoother clientRate;
// Only used by auto-throttles
double created = now();
double lastUpdated = 0;
double lastReduced = now();
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate) {
if(limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
}
else {
return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal());
}
}
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if(limits.expiration > now()) {
double targetRate = getTargetRate(requestRate);
if(targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return targetRate;
}
if(!rateSet) {
rateSet = true;
clientRate.reset(targetRate);
}
else {
clientRate.setTotal(targetRate);
}
double rate = clientRate.smoothTotal();
ASSERT(rate >= 0);
return rate;
}
else {
TEST(true); // Get throttle rate for expired throttle
rateSet = false;
return Optional<double>();
}
}
};
void initializeTag(TransactionTag const& tag) {
tagData.try_emplace(tag);
}
public:
RkTagThrottleCollection() {}
RkTagThrottleCollection(RkTagThrottleCollection &&other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
void operator=(RkTagThrottleCollection &&other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) {
ASSERT(currentBusyness > 0);
if(targetBusyness < 1) {
double targetFraction = targetBusyness * (1-currentBusyness) / ((1-targetBusyness) * currentBusyness);
return requestRate * targetFraction;
}
else {
return std::numeric_limits<double>::max();
}
}
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
2020-05-15 05:17:43 +08:00
Optional<double> autoThrottleTag(UID id, TransactionTag const& tag, double fractionalBusyness, Optional<double> tpsRate = Optional<double>(), Optional<double> expiration = Optional<double>()) {
ASSERT(!tpsRate.present() || tpsRate.get() >= 0);
ASSERT(!expiration.present() || expiration.get() > now());
auto itr = autoThrottledTags.find(tag);
bool present = (itr != autoThrottledTags.end());
if(!present) {
if(autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
}
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
}
else if(itr->second.limits.expiration <= now()) {
TEST(true); // Re-throttling expired tag that hasn't been cleaned up
present = false;
itr->second = RkTagThrottleData();
}
auto &throttle = itr->second;
if(!tpsRate.present()) {
if(now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = std::numeric_limits<double>::max();
if(present) {
return Optional<double>();
}
}
else if(now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
TEST(true); // Tag auto-throttled too quickly
return Optional<double>();
}
else {
tpsRate = computeTargetTpsRate(fractionalBusyness, SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, tagData[tag].requestRate.smoothRate());
if(throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) {
TEST(true); // Tag auto-throttle rate increase attempt while active
return Optional<double>();
}
throttle.lastUpdated = now();
if(tpsRate.get() < throttle.limits.tpsRate) {
throttle.lastReduced = now();
}
}
}
if(!expiration.present()) {
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now())
.detail("ClientRate", clientRate)
.detail("Created", now()-throttle.created)
.detail("LastUpdate", now()-throttle.lastUpdated)
.detail("LastReduced", now()-throttle.lastReduced);
if(tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
}
else {
return Optional<double>();
}
}
2020-05-15 05:17:43 +08:00
void manualThrottleTag(UID id, TransactionTag const& tag, TransactionPriority priority, double tpsRate,
double expiration, Optional<ClientTagThrottleLimits> const& oldLimits) {
ASSERT(tpsRate >= 0);
ASSERT(expiration > now());
auto &priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
initializeTag(tag);
ASSERT(result.second); // Updating to the map is done by copying the whole map
result.first->second.limits.tpsRate = tpsRate;
result.first->second.limits.expiration = expiration;
if(!oldLimits.present()) {
TEST(true); // Transaction tag manually throttled
2020-05-15 05:17:43 +08:00
TraceEvent("RatekeeperAddingManualThrottle", id)
2020-04-24 11:51:53 +08:00
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
2020-04-24 11:51:53 +08:00
}
else if(oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
TEST(true); // Manual transaction tag throttle updated
2020-05-15 05:17:43 +08:00
TraceEvent("RatekeeperUpdatingManualThrottle", id)
2020-04-24 11:51:53 +08:00
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now());
2020-04-24 11:51:53 +08:00
}
Optional<double> clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
}
Optional<ClientTagThrottleLimits> getManualTagThrottleLimits(TransactionTag const& tag, TransactionPriority priority) {
auto itr = manualThrottledTags.find(tag);
if(itr != manualThrottledTags.end()) {
auto priorityItr = itr->second.find(priority);
if(priorityItr != itr->second.end()) {
return priorityItr->second.limits;
}
}
return Optional<ClientTagThrottleLimits>();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for(auto tagItr = tagData.begin(); tagItr != tagData.end();) {
bool tagPresent = false;
double requestRate = tagItr->second.requestRate.smoothRate();
auto manualItr = manualThrottledTags.find(tagItr->first);
if(manualItr != manualThrottledTags.end()) {
Optional<ClientTagThrottleLimits> manualClientRate;
2020-05-07 23:59:47 +08:00
for(auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); ++priority) {
auto priorityItr = manualItr->second.find(*priority);
if(priorityItr != manualItr->second.end()) {
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
if(!priorityClientRate.present()) {
TEST(true); // Manual priority throttle expired
priorityItr = manualItr->second.erase(priorityItr);
}
else {
if(!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) {
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), priorityItr->second.limits.expiration);
}
else {
TEST(true); // Manual throttle overriden by higher priority
}
++priorityItr;
}
}
if(manualClientRate.present()) {
2020-05-01 13:24:17 +08:00
tagPresent = true;
TEST(true); // Using manual throttle
clientRates[*priority][tagItr->first] = manualClientRate.get();
}
}
2020-05-01 13:24:17 +08:00
if(manualItr->second.empty()) {
TEST(true); // All manual throttles expired
2020-05-01 13:24:17 +08:00
manualThrottledTags.erase(manualItr);
break;
}
}
auto autoItr = autoThrottledTags.find(tagItr->first);
if(autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if(autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
if(now() >= rampStartTime && adjustedRate != std::numeric_limits<double>::max()) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
if(targetBusyness == 0) {
targetBusyness = 0.01;
}
double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
adjustedRate = computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
tagPresent = true;
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, adjustedRate, autoItr->second.limits.expiration);
if(!result.second && result.first->second.tpsRate > adjustedRate) {
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
}
else {
TEST(true); // Auto throttle overriden by manual throttle
}
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
else {
ASSERT(autoItr->second.limits.expiration <= now());
TEST(true); // Auto throttle expired
if(BUGGIFY) { // Temporarily extend the window between expiration and cleanup
tagPresent = true;
}
else {
autoThrottledTags.erase(autoItr);
}
}
}
if(!tagPresent) {
TEST(true); // All tag throttles expired
tagItr = tagData.erase(tagItr);
}
else {
++tagItr;
}
}
return clientRates;
}
void addRequests(TransactionTag const& tag, int requests) {
if(requests > 0) {
TEST(true); // Requests reported for throttled tag
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if(autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if(manualItr != manualThrottledTags.end()) {
for(auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); ++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
}
}
}
}
Optional<double> getRequestRate(TransactionTag const& tag) {
auto itr = tagData.find(tag);
if(itr != tagData.end()) {
return itr->second.requestRate.smoothRate();
}
return Optional<double>();
}
int64_t autoThrottleCount() const {
return autoThrottledTags.size();
}
int64_t manualThrottleCount() const {
int64_t count = 0;
for(auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) {
count += itr->second.size();
}
return count;
}
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
double maxVersionDifference;
int64_t durabilityLagTargetVersions;
int64_t lastDurabilityLag;
double durabilityLagLimit;
TransactionPriority priority;
std::string context;
RatekeeperLimits(TransactionPriority priority, std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, double maxVersionDifference, int64_t durabilityLagTargetVersions) :
2020-05-01 13:24:17 +08:00
priority(priority),
tpsLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes),
logTargetBytes(logTargetBytes),
logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(durabilityLagTargetVersions + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not be durable on the storage servers
durabilityLagLimit(std::numeric_limits<double>::infinity()),
lastDurabilityLag(0),
context(context)
{}
};
struct ProxyInfo {
int64_t totalTransactions;
int64_t batchTransactions;
uint64_t lastThrottledTagChangeId;
double lastUpdateTime;
2020-04-23 03:28:51 +08:00
double lastTagPushTime;
2019-03-02 06:06:47 +08:00
2020-04-23 03:28:51 +08:00
ProxyInfo() : totalTransactions(0), batchTransactions(0), lastUpdateTime(0), lastThrottledTagChangeId(0), lastTagPushTime(0) {}
};
struct RatekeeperData {
2020-05-15 05:17:43 +08:00
UID id;
Database db;
2017-05-26 04:48:44 +08:00
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, ProxyInfo> proxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
2017-05-26 04:48:44 +08:00
DatabaseConfiguration configuration;
PromiseStream<Future<Void>> addActor;
2017-05-26 04:48:44 +08:00
Int64MetricHandle actualTpsMetric;
2017-05-26 04:48:44 +08:00
double lastWarning;
double lastSSListFetchedTimestamp;
2017-05-26 04:48:44 +08:00
RkTagThrottleCollection throttledTags;
uint64_t throttledTagChangeId;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
Optional<Key> remoteDC;
Future<Void> expiredTagThrottleCleanup;
bool autoThrottlingEnabled;
2020-05-15 05:17:43 +08:00
RatekeeperData(UID id, Database db)
: id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()),
throttledTagChangeId(0),
normalLimits(TransactionPriority::DEFAULT, "", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG,
SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
batchLimits(TransactionPriority::BATCH, "Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH,
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH,
SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
autoThrottlingEnabled(false) {
expiredTagThrottleCleanup = recurring([this](){ ThrottleApi::expire(this->db); }, SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
}
2017-05-26 04:48:44 +08:00
};
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageServerInterface ssi ) {
2017-05-26 04:48:44 +08:00
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
2020-05-15 05:17:43 +08:00
TraceEvent("RkTracking", self->id)
2020-05-15 05:54:38 +08:00
.detail("StorageServer", ssi.id());
2017-05-26 04:48:44 +08:00
try {
loop {
ErrorOr<StorageQueuingMetricsReply> reply = wait( ssi.getQueuingMetrics.getReplyUnlessFailedFor( StorageQueuingMetricsRequest(), 0, 0 ) ); // SOMEDAY: or tryGetReply?
if (reply.present()) {
myQueueInfo->value.valid = true;
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.reset(reply.get().version);
2017-05-26 04:48:44 +08:00
} else {
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
myQueueInfo->value.smoothDurableBytes.setTotal( reply.get().bytesDurable );
myQueueInfo->value.verySmoothDurableBytes.setTotal( reply.get().bytesDurable );
myQueueInfo->value.smoothInputBytes.setTotal( reply.get().bytesInput );
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
2017-05-26 04:48:44 +08:00
}
myQueueInfo->value.busiestTag = reply.get().busiestTag;
myQueueInfo->value.busiestTagFractionalBusyness = reply.get().busiestTagFractionalBusyness;
myQueueInfo->value.busiestTagRate = reply.get().busiestTagRate;
2017-05-26 04:48:44 +08:00
} else {
if(myQueueInfo->value.valid) {
2020-05-15 05:17:43 +08:00
TraceEvent("RkStorageServerDidNotRespond", self->id)
2020-05-15 05:54:38 +08:00
.detail("StorageServer", ssi.id());
}
2017-05-26 04:48:44 +08:00
myQueueInfo->value.valid = false;
}
wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && IFailureMonitor::failureMonitor().onStateEqual(ssi.getQueuingMetrics.getEndpoint(), FailureStatus(false)));
2017-05-26 04:48:44 +08:00
}
} catch (...) {
// including cancellation
self->storageQueueInfo.erase( myQueueInfo );
throw;
}
}
ACTOR Future<Void> trackTLogQueueInfo( RatekeeperData* self, TLogInterface tli ) {
2017-05-26 04:48:44 +08:00
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
2020-05-15 05:17:43 +08:00
TraceEvent("RkTracking", self->id)
2020-05-15 05:54:38 +08:00
.detail("TransactionLog", tli.id());
2017-05-26 04:48:44 +08:00
try {
loop {
ErrorOr<TLogQueuingMetricsReply> reply = wait( tli.getQueuingMetrics.getReplyUnlessFailedFor( TLogQueuingMetricsRequest(), 0, 0 ) ); // SOMEDAY: or tryGetReply?
if (reply.present()) {
myQueueInfo->value.valid = true;
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
} else {
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
}
} else {
if(myQueueInfo->value.valid) {
2020-05-15 05:17:43 +08:00
TraceEvent("RkTLogDidNotRespond", self->id)
2020-05-15 05:54:38 +08:00
.detail("TransactionLog", tli.id());
}
2017-05-26 04:48:44 +08:00
myQueueInfo->value.valid = false;
}
wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && IFailureMonitor::failureMonitor().onStateEqual(tli.getQueuingMetrics.getEndpoint(), FailureStatus(false)));
2017-05-26 04:48:44 +08:00
}
} catch (...) {
// including cancellation
self->tlogQueueInfo.erase( myQueueInfo );
throw;
}
}
ACTOR Future<Void> splitError( Future<Void> in, Promise<Void> errOut ) {
try {
wait( in );
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled && !errOut.isSet())
errOut.sendError(e);
throw;
}
}
ACTOR Future<Void> trackEachStorageServer(
RatekeeperData* self,
2017-05-26 04:48:44 +08:00
FutureStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges )
{
state Map<UID, Future<Void>> actors;
state Promise<Void> err;
loop choose {
when (state std::pair< UID, Optional<StorageServerInterface> > change = waitNext(serverChanges) ) {
wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack
2017-05-26 04:48:44 +08:00
if (change.second.present()) {
auto& a = actors[ change.first ];
a = Future<Void>();
a = splitError( trackStorageServerQueueInfo(self, change.second.get()), err );
} else
actors.erase( change.first );
}
when (wait(err.getFuture())) {}
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> monitorServerListChange(
RatekeeperData* self,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(self->db);
loop {
try {
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
2019-03-13 02:34:16 +08:00
vector<std::pair<StorageServerInterface, ProcessClass>> results = wait(getServerListAndProcessClasses(&tr));
self->lastSSListFetchedTimestamp = now();
2019-03-13 02:34:16 +08:00
std::map<UID, StorageServerInterface> newServers;
for (int i = 0; i < results.size(); i++) {
const StorageServerInterface& ssi = results[i].first;
const UID serverId = ssi.id();
newServers[serverId] = ssi;
2019-03-13 02:34:16 +08:00
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
oldServers.erase(serverId);
} else {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
}
2019-03-13 02:34:16 +08:00
for (const auto& it : oldServers) {
serverChanges.send( std::make_pair(it.first, Optional<StorageServerInterface>()) );
}
oldServers.swap(newServers);
tr = Transaction(self->db);
2019-03-13 02:34:16 +08:00
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
} catch(Error& e) {
wait( tr.onError(e) );
}
}
}
ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
state bool committed = false;
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<Standalone<RangeResultRef>> throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY);
state Future<Optional<Value>> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey);
if(!committed) {
BinaryWriter limitWriter(Unversioned());
limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
tr.set(tagThrottleLimitKey, limitWriter.toValue());
}
wait(success(throttledTagKeys) && success(autoThrottlingEnabled));
if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
TEST(true); // Auto-throttling disabled
if(self->autoThrottlingEnabled) {
2020-05-15 05:17:43 +08:00
TraceEvent("AutoTagThrottlingDisabled", self->id);
}
self->autoThrottlingEnabled = false;
}
else if(autoThrottlingEnabled.get().present() && autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
TEST(true); // Auto-throttling enabled
if(!self->autoThrottlingEnabled) {
2020-05-15 05:17:43 +08:00
TraceEvent("AutoTagThrottlingEnabled", self->id);
}
self->autoThrottlingEnabled = true;
}
else {
TEST(true); // Auto-throttling unspecified
if(autoThrottlingEnabled.get().present()) {
2020-05-15 05:17:43 +08:00
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id).detail("Value", autoThrottlingEnabled.get().get());
}
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
}
RkTagThrottleCollection updatedTagThrottles;
2020-05-15 05:17:43 +08:00
TraceEvent("RatekeeperReadThrottledTags", self->id).detail("NumThrottledTags", throttledTagKeys.get().size());
for(auto entry : throttledTagKeys.get()) {
TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key);
TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value);
ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported
if(tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) {
TEST(true); // Converting tag throttle duration to absolute time
tagValue.expirationTime = now() + tagValue.initialDuration;
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue()));
wr << tagValue;
state Value value = wr.toValue();
tr.set(entry.key, value);
}
if(tagValue.expirationTime > now()) {
TransactionTag tag = *tagKey.tags.begin();
Optional<ClientTagThrottleLimits> oldLimits = self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority);
if(tagKey.throttleType == TagThrottleType::AUTO) {
2020-05-15 05:17:43 +08:00
updatedTagThrottles.autoThrottleTag(self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
}
else {
2020-05-15 05:17:43 +08:00
updatedTagThrottles.manualThrottleTag(self->id, tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits);
}
}
}
self->throttledTags = std::move(updatedTagThrottles);
++self->throttledTagChangeId;
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
committed = true;
wait(watchFuture);
2020-05-15 05:17:43 +08:00
TraceEvent("RatekeeperThrottleSignaled", self->id);
TEST(true); // Tag throttle changes detected
break;
} catch (Error& e) {
2020-05-15 05:17:43 +08:00
TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e);
wait(tr.onError(e));
}
}
}
}
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
2020-05-15 05:17:43 +08:00
Optional<double> clientRate = self->throttledTags.autoThrottleTag(self->id, ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
if(clientRate.present()) {
TagSet tags;
tags.addTag(ss.busiestTag.get());
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
}
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
2017-05-26 04:48:44 +08:00
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTps;
2017-05-26 04:48:44 +08:00
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
if(self->actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) {
self->actualTpsHistory.pop_front();
}
self->actualTpsHistory.push_back(actualTps);
2017-05-26 04:48:44 +08:00
limits->tpsLimit = std::numeric_limits<double>::infinity();
2017-05-26 04:48:44 +08:00
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
2017-05-26 04:48:44 +08:00
int sscount = 0;
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
int64_t worstDurabilityLag = 0;
2017-05-26 04:48:44 +08:00
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
std::multimap<int64_t, StorageQueueInfo*> storageDurabilityLagReverseIndex;
std::map<UID, limitReason_t> ssReasons;
2017-05-26 04:48:44 +08:00
2019-03-02 05:14:18 +08:00
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
2017-05-26 04:48:44 +08:00
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
auto& ss = i->value;
if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue;
2017-05-26 04:48:44 +08:00
++sscount;
limitReason_t ssLimitReason = limitReason_t::unlimited;
2017-05-26 04:48:44 +08:00
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_AVAILABLE_SPACE, (int64_t)(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
2017-05-26 04:48:44 +08:00
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
2019-03-07 02:46:17 +08:00
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->storageTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_AVAILABLE_SPACE) {
ssLimitReason = limitReason_t::storage_server_min_free_space;
2017-05-26 04:48:44 +08:00
} else {
ssLimitReason = limitReason_t::storage_server_min_free_space_ratio;
2017-05-26 04:48:44 +08:00
}
}
int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal();
worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue);
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag);
storageDurabilityLagReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
auto& ssMetrics = self->healthMetrics.storageStats[ss.id];
ssMetrics.storageQueue = storageQueue;
ssMetrics.storageDurabilityLag = storageDurabilityLag;
ssMetrics.cpuUsage = ss.lastReply.cpuUsage;
ssMetrics.diskUsage = ss.lastReply.diskUsage;
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
2017-05-26 04:48:44 +08:00
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
tryAutoThrottleTag(self, ss);
}
2017-05-26 04:48:44 +08:00
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
/*if( deterministicRandom()->random01() < 0.1 ) {
std::string name = "RatekeeperUpdateRate" + limits.context;
TraceEvent(name, ss.id)
2017-05-26 04:48:44 +08:00
.detail("MinFreeSpace", minFreeSpace)
.detail("SpringBytes", springBytes)
.detail("TargetBytes", targetBytes)
.detail("SmoothTotalSpaceTotal", ss.smoothTotalSpace.smoothTotal())
.detail("SmoothFreeSpaceTotal", ss.smoothFreeSpace.smoothTotal())
.detail("LastReplyBytesInput", ss.lastReply.bytesInput)
.detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal())
.detail("TargetRateRatio", targetRateRatio)
.detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate())
.detail("ActualTPS", actualTps)
2017-05-26 04:48:44 +08:00
.detail("InputRate", inputRate)
.detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate())
.detail("B", b);
2017-05-26 04:48:44 +08:00
}*/
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = (targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0);
double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ssLimitReason == limitReason_t::unlimited)
ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
2017-05-26 04:48:44 +08:00
if (targetRateRatio > 0 && inputRate > 0) {
ASSERT(inputRate != 0);
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
double x = smoothedRate / (inputRate * targetRateRatio);
double lim = actualTps * x;
if (lim < limitTps) {
limitTps = lim;
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc) {
ssLimitReason = limitReason_t::storage_server_write_queue_size;
}
2019-03-14 12:27:23 +08:00
}
}
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
2017-05-26 04:48:44 +08:00
2019-03-07 02:46:17 +08:00
if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
2017-05-26 04:48:44 +08:00
reasonID = ss.id;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = limitTps;
limitReason = ssLimitReason;
2017-05-26 04:48:44 +08:00
}
ssReasons[ss.id] = ssLimitReason;
2017-05-26 04:48:44 +08:00
}
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
2019-03-07 02:46:17 +08:00
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
2017-05-26 04:48:44 +08:00
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
}
2019-03-07 02:46:17 +08:00
if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
2017-05-26 04:48:44 +08:00
continue;
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
2019-03-07 02:46:17 +08:00
limits->tpsLimit = ss->first;
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
2019-03-07 02:46:17 +08:00
limitReason = ssReasons[reasonID];
2017-05-26 04:48:44 +08:00
break;
}
int64_t limitingDurabilityLag = 0;
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) {
if (ignoredDurabilityLagMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredDurabilityLagMachines.insert(ss->second->locality.zoneId());
continue;
}
if (ignoredDurabilityLagMachines.count(ss->second->locality.zoneId()) > 0) {
continue;
}
limitingDurabilityLag = -1*ss->first;
if(limitingDurabilityLag > limits->durabilityLagTargetVersions && self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) {
if(limits->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
double maxTps = 0;
for(int i = 0; i < self->actualTpsHistory.size(); i++) {
maxTps = std::max(maxTps, self->actualTpsHistory[i]);
}
limits->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER*maxTps;
}
if( limitingDurabilityLag > limits->lastDurabilityLag ) {
limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_REDUCTION_RATE*limits->durabilityLagLimit;
}
if(limits->durabilityLagLimit < limits->tpsLimit) {
limits->tpsLimit = limits->durabilityLagLimit;
limitReason = limitReason_t::storage_server_durability_lag;
}
} else if(limits->durabilityLagLimit != std::numeric_limits<double>::infinity() && limitingDurabilityLag > limits->durabilityLagTargetVersions - SERVER_KNOBS->DURABILITY_LAG_UNLIMITED_THRESHOLD) {
limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_INCREASE_RATE*limits->durabilityLagLimit;
} else {
limits->durabilityLagLimit = std::numeric_limits<double>::infinity();
}
limits->lastDurabilityLag = limitingDurabilityLag;
break;
}
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
self->healthMetrics.worstStorageDurabilityLag = worstDurabilityLag;
2017-05-26 04:48:44 +08:00
double writeToReadLatencyLimit = 0;
Version worstVersionLag = 0;
Version limitingVersionLag = 0;
{
Version minSSVer = std::numeric_limits<Version>::max();
Version minLimitingSSVer = std::numeric_limits<Version>::max();
2019-03-07 02:46:17 +08:00
for (const auto& it : self->storageQueueInfo) {
auto& ss = it.value;
if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue;
2017-05-26 04:48:44 +08:00
minSSVer = std::min(minSSVer, ss.lastReply.version);
2017-05-26 04:48:44 +08:00
// Machines that ratekeeper isn't controlling can fall arbitrarily far behind
2019-03-07 02:46:17 +08:00
if (ignoredMachines.count(it.value.locality.zoneId()) == 0) {
minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version);
2017-05-26 04:48:44 +08:00
}
}
Version maxTLVer = std::numeric_limits<Version>::min();
2019-03-07 02:46:17 +08:00
for(const auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
2017-05-26 04:48:44 +08:00
if (!tl.valid) continue;
maxTLVer = std::max(maxTLVer, tl.lastReply.v);
}
if (minSSVer != std::numeric_limits<Version>::max() && maxTLVer != std::numeric_limits<Version>::min()) {
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit =
((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
2017-05-26 04:48:44 +08:00
}
int64_t worstFreeSpaceTLog = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueTLog = 0;
int tlcount = 0;
2019-03-07 02:46:17 +08:00
for (auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
2017-05-26 04:48:44 +08:00
if (!tl.valid) continue;
++tlcount;
limitReason_t tlogLimitReason = limitReason_t::log_server_write_queue;
int64_t minFreeSpace = std::max( SERVER_KNOBS->MIN_AVAILABLE_SPACE, (int64_t)(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * tl.smoothTotalSpace.smoothTotal()));
2017-05-26 04:48:44 +08:00
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
2019-03-07 02:46:17 +08:00
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->logTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_AVAILABLE_SPACE) {
2017-05-26 04:48:44 +08:00
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
tlogLimitReason = limitReason_t::log_server_min_free_space_ratio;
}
}
int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal();
self->healthMetrics.tLogQueue[tl.id] = queue;
2017-05-26 04:48:44 +08:00
int64_t b = queue - targetBytes;
worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue);
if( tl.lastReply.bytesInput - tl.lastReply.bytesDurable > tl.lastReply.storageBytes.free - minFreeSpace / 2 ) {
if(now() - self->lastWarning > 5.0) {
self->lastWarning = now();
2020-05-15 05:17:43 +08:00
TraceEvent(SevWarnAlways, "RkTlogMinFreeSpaceZero", self->id).detail("ReasonId", tl.id);
2017-05-26 04:48:44 +08:00
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = 0.0;
2017-05-26 04:48:44 +08:00
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
if (writeToReadLatencyLimit > targetRateRatio){
targetRateRatio = writeToReadLatencyLimit;
tlogLimitReason = limitReason_t::storage_server_readable_behind;
}
double inputRate = tl.smoothInputBytes.smoothRate();
if (targetRateRatio > 0) {
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
double x = smoothedRate / (inputRate * targetRateRatio);
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTps * x;
2019-03-07 02:46:17 +08:00
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
2017-05-26 04:48:44 +08:00
reasonID = tl.id;
limitReason = tlogLimitReason;
}
}
if (inputRate > 0) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTps * x;
2019-03-07 02:46:17 +08:00
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
2017-05-26 04:48:44 +08:00
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
}
}
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = std::max(limits->tpsLimit, 0.0);
2017-05-26 04:48:44 +08:00
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
2019-03-07 02:46:17 +08:00
limits->tpsLimit = std::max(limits->tpsLimit, 100.0);
2017-05-26 04:48:44 +08:00
}
int64_t totalDiskUsageBytes = 0;
for (auto& t : self->tlogQueueInfo) {
if (t.value.valid) {
2017-05-26 04:48:44 +08:00
totalDiskUsageBytes += t.value.lastReply.storageBytes.used;
}
}
for (auto& s : self->storageQueueInfo) {
if (s.value.valid) {
2017-05-26 04:48:44 +08:00
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
}
}
2017-05-26 04:48:44 +08:00
if (now() - self->lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) {
2019-07-19 03:32:35 +08:00
limits->tpsLimit = 0.0;
limitReason = limitReason_t::storage_server_list_fetch_failed;
reasonID = UID();
2020-05-15 05:17:43 +08:00
TraceEvent(SevWarnAlways, "RkSSListFetchTimeout", self->id).suppressFor(1.0);
}
2020-01-15 07:45:24 +08:00
else if(limits->tpsLimit == std::numeric_limits<double>::infinity()) {
limits->tpsLimit = SERVER_KNOBS->RATEKEEPER_DEFAULT_LIMIT;
}
2019-03-07 02:46:17 +08:00
limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6);
limits->reasonMetric = limitReason;
2017-05-26 04:48:44 +08:00
if (deterministicRandom()->random01() < 0.1) {
2019-03-07 02:46:17 +08:00
std::string name = "RkUpdate" + limits->context;
2020-05-15 05:17:43 +08:00
TraceEvent(name.c_str(), self->id)
2019-03-07 02:46:17 +08:00
.detail("TPSLimit", limits->tpsLimit)
2017-05-26 04:48:44 +08:00
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID==UID() ? std::string() : Traceable<UID>::toString(reasonID))
2017-05-26 04:48:44 +08:00
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps)
2017-05-26 04:48:44 +08:00
.detail("StorageServers", sscount)
.detail("Proxies", self->proxyInfo.size())
2017-05-26 04:48:44 +08:00
.detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
.detail("WorstStorageServerQueue", worstStorageQueueStorageServer)
.detail("LimitingStorageServerQueue", limitingStorageQueueStorageServer)
.detail("WorstTLogQueue", worstStorageQueueTLog)
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
.detail("WorstStorageServerVersionLag", worstVersionLag)
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount())
.detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount())
.trackLatest(name);
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> configurationMonitor(RatekeeperData *self) {
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
self->configuration.fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey);
wait( tr.commit() );
wait( watchFuture );
break;
} catch (Error& e) {
wait( tr.onError(e) );
}
}
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
2020-05-15 05:17:43 +08:00
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true));
2017-05-26 04:48:44 +08:00
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
state Promise<Void> err;
state Future<Void> collection = actorCollection( self.addActor.getFuture() );
2019-07-05 23:12:25 +08:00
TraceEvent("RatekeeperStarting", rkInterf.id());
self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) );
self.addActor.send( configurationMonitor(&self) );
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
self.addActor.send( monitorServerListChange(&self, serverChanges) );
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
self.addActor.send( traceRole(Role::RATEKEEPER, rkInterf.id()) );
2017-05-26 04:48:44 +08:00
self.addActor.send(monitorThrottlingChanges(&self));
2020-05-15 05:17:43 +08:00
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
2017-05-26 04:48:44 +08:00
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
2020-05-15 05:17:43 +08:00
TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id()).detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER).detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER).detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES)
2017-05-26 04:48:44 +08:00
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
for (int i = 0; i < tlogInterfs.size(); i++) {
2017-05-26 04:48:44 +08:00
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
}
2017-05-26 04:48:44 +08:00
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
try {
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
updateRate(&self, &self.normalLimits);
updateRate(&self, &self.batchLimits);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
2017-05-26 04:48:44 +08:00
double tooOld = now() - 1.0;
for(auto p=self.proxyInfo.begin(); p!=self.proxyInfo.end(); ) {
if (p->second.lastUpdateTime < tooOld)
p = self.proxyInfo.erase(p);
2017-05-26 04:48:44 +08:00
else
++p;
}
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
}
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
2017-05-26 04:48:44 +08:00
GetRateInfoReply reply;
auto& p = self.proxyInfo[ req.requesterID ];
2017-05-26 04:48:44 +08:00
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
if (p.totalTransactions > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions );
for(auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
2020-07-18 09:48:58 +08:00
// TODO process commitCostEstimation
// for (const auto &[tagName, cost] : req.throttledTagCommitCostEst) {
//
// }
}
if(p.batchTransactions > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );
}
2017-05-26 04:48:44 +08:00
p.totalTransactions = req.totalReleasedTransactions;
p.batchTransactions = req.batchReleasedTransactions;
p.lastUpdateTime = now();
2017-05-26 04:48:44 +08:00
reply.transactionRate = self.normalLimits.tpsLimit / self.proxyInfo.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxyInfo.size();
2017-05-26 04:48:44 +08:00
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
2020-04-23 03:28:51 +08:00
if(p.lastThrottledTagChangeId != self.throttledTagChangeId || now() < p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) {
p.lastThrottledTagChangeId = self.throttledTagChangeId;
p.lastTagPushTime = now();
reply.throttledTags = self.throttledTags.getClientRates();
TEST(reply.throttledTags.present() && reply.throttledTags.get().size() > 0); // Returning tag throttles to a proxy
}
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
reply.healthMetrics.batchLimited = lastLimited;
2017-05-26 04:48:44 +08:00
req.reply.send( reply );
}
when (HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) {
req.reply.send(Void());
TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID);
break;
}
when (wait(err.getFuture())) {}
when (wait(dbInfo->onChange())) {
if( tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs() ) {
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
2017-05-26 04:48:44 +08:00
tlogTrackers = std::vector<Future<Void>>();
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
}
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
2017-05-26 04:48:44 +08:00
}
when ( wait(collection) ) {
ASSERT(false);
throw internal_error();
}
2017-05-26 04:48:44 +08:00
}
}
catch (Error& err) {
2019-07-05 23:12:25 +08:00
TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true);
}
return Void();
2017-05-26 04:48:44 +08:00
}