foundationdb/fdbserver/Ratekeeper.actor.cpp

809 lines
34 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* Ratekeeper.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/IndexedSet.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Smoother.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/Knobs.h"
2019-02-28 03:51:48 +08:00
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
enum limitReason_t {
unlimited, // TODO: rename to workload?
storage_server_write_queue_size,
storage_server_write_bandwidth_mvcc,
storage_server_readable_behind,
log_server_mvcc_write_bandwidth,
log_server_write_queue,
storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
log_server_min_free_space,
log_server_min_free_space_ratio,
storage_server_durability_lag,
storage_server_list_fetch_failed,
2017-05-26 04:48:44 +08:00
limitReason_t_end
};
int limitReasonEnd = limitReason_t_end;
const char* limitReasonName[] = {
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio",
"storage_server_durability_lag",
"storage_server_list_fetch_failed"
2017-05-26 04:48:44 +08:00
};
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph)
// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS!
const char* limitReasonDesc[] = {
"Workload or read performance.",
"Storage server performance (storage queue).",
"Storage server MVCC memory.",
"Storage server version falling behind.",
"Log server MVCC memory.",
"Storage server performance (log queue).",
"Storage server running out of space (approaching 100MB limit).",
"Storage server running out of space (approaching 5% limit).",
"Log server running out of space (approaching 100MB limit).",
"Log server running out of space (approaching 5% limit).",
"Storage server durable version falling behind.",
"Unable to fetch storage server list."
2017-05-26 04:48:44 +08:00
};
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
struct StorageQueueInfo {
bool valid;
UID id;
LocalityData locality;
StorageQueuingMetricsReply lastReply;
StorageQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother verySmoothDurableVersion, smoothLatestVersion;
2017-05-26 04:48:44 +08:00
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
2019-03-02 05:14:18 +08:00
limitReason_t limitReason;
Fix UBSAN report /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86:8: runtime error: load of value 1231493777, which is not a valid value for type 'limitReason_t' #0 0x310e961 in StorageQueueInfo::StorageQueueInfo(StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:86 #1 0x310eacd in MapPair<UID, StorageQueueInfo>::MapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:242 #2 0x310b35e in MapPair<std::decay<UID>::type, std::decay<StorageQueueInfo>::type> mapPair<UID, StorageQueueInfo>(UID&&, StorageQueueInfo&&) /home/anoyes/workspace/foundationdb/flow/IndexedSet.h:258 #3 0x30a8b79 in a_body1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:195 #4 0x309b529 in TrackStorageServerQueueInfoActor /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:495 #5 0x309b9be in trackStorageServerQueueInfo(RatekeeperData* const&, StorageServerInterface const&) /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:194 #6 0x30cff63 in a_body1loopBody1when1cont1 /home/anoyes/workspace/foundationdb/fdbserver/Ratekeeper.actor.cpp:303 #7 0x30cd9da in a_body1loopBody1when1when1 /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1170 #8 0x30ed4dd in a_callback_fire /home/anoyes/build/foundationdb/fdbserver/Ratekeeper.actor.g.cpp:1185 #9 0x30e6d81 in fire /home/anoyes/workspace/foundationdb/flow/flow.h:998 #10 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #11 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #12 0x7b4b018 in Sim2::execTask(Sim2::Task&) (/home/anoyes/build/foundationdb/bin/fdbserver+0x7b4b018) #13 0x7bf9168 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1cont1(Void const&, int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:979 #14 0x7be7b68 in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_body1loopBody1when1(Void const&, int) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5391 #15 0x7c329ff in Sim2::RunLoopActorState<Sim2::RunLoopActor>::a_callback_fire(ActorCallback<Sim2::RunLoopActor, 0, Void>*, Void) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5406 #16 0x7c1fc73 in ActorCallback<Sim2::RunLoopActor, 0, Void>::fire(Void const&) /home/anoyes/workspace/foundationdb/flow/flow.h:998 #17 0x4df0dc in void SAV<Void>::send<Void>(Void&&) /home/anoyes/workspace/foundationdb/flow/flow.h:447 #18 0x959891 in void Promise<Void>::send<Void>(Void&&) const /home/anoyes/workspace/foundationdb/flow/flow.h:778 #19 0x7fe74a4 in N2::PromiseTask::operator()() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:481 #20 0x7fb6ff7 in N2::Net2::run() /home/anoyes/workspace/foundationdb/flow/Net2.actor.cpp:657 #21 0x7b71bd3 in Sim2::_runActorState<Sim2::_runActor>::a_body1(int) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:989 #22 0x7b2ee51 in Sim2::_runActor::_runActor(Sim2* const&) /home/anoyes/build/foundationdb/fdbrpc/sim2.actor.g.cpp:5608 #23 0x7b2f268 in Sim2::_run(Sim2* const&) /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:987 #24 0x7b2f2c8 in Sim2::run() /home/anoyes/workspace/foundationdb/fdbrpc/sim2.actor.cpp:996 #25 0x21040a6 in main /home/anoyes/workspace/foundationdb/fdbserver/fdbserver.actor.cpp:1793 #26 0x7f03492ba504 in __libc_start_main (/lib64/libc.so.6+0x22504) #27 0x464914 (/home/anoyes/build/foundationdb/bin/fdbserver+0x464914)
2019-12-04 04:45:34 +08:00
StorageQueueInfo(UID id, LocalityData locality)
: valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
verySmoothDurableVersion(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
2017-05-26 04:48:44 +08:00
lastReply.instanceID = -1;
}
};
struct TLogQueueInfo {
bool valid;
UID id;
TLogQueuingMetricsReply lastReply;
TLogQueuingMetricsReply prevReply;
Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
TLogQueueInfo(UID id) : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from storageQueueInfO)
2017-05-26 04:48:44 +08:00
lastReply.instanceID = -1;
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
double maxVersionDifference;
int64_t durabilityLagTargetVersions;
int64_t lastDurabilityLag;
double durabilityLagLimit;
std::string context;
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, double maxVersionDifference, int64_t durabilityLagTargetVersions) :
tpsLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes),
logTargetBytes(logTargetBytes),
logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
durabilityLagTargetVersions(durabilityLagTargetVersions + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not be durable on the storage servers
durabilityLagLimit(std::numeric_limits<double>::infinity()),
lastDurabilityLag(0),
context(context)
{}
};
struct TransactionCounts {
int64_t total;
int64_t batch;
2019-03-02 06:06:47 +08:00
double time;
TransactionCounts() : total(0), batch(0), time(0) {}
};
struct RatekeeperData {
2017-05-26 04:48:44 +08:00
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
2019-03-02 06:06:47 +08:00
std::map<UID, TransactionCounts> proxy_transactionCounts;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
2017-05-26 04:48:44 +08:00
DatabaseConfiguration configuration;
PromiseStream<Future<Void>> addActor;
2017-05-26 04:48:44 +08:00
Int64MetricHandle actualTpsMetric;
2017-05-26 04:48:44 +08:00
double lastWarning;
double lastSSListFetchedTimestamp;
2017-05-26 04:48:44 +08:00
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
2017-05-26 04:48:44 +08:00
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
lastWarning(0), lastSSListFetchedTimestamp(now()),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH)
2017-05-26 04:48:44 +08:00
{}
};
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageServerInterface ssi ) {
2017-05-26 04:48:44 +08:00
self->storageQueueInfo.insert( mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality) ) );
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
TraceEvent("RkTracking", ssi.id());
try {
loop {
ErrorOr<StorageQueuingMetricsReply> reply = wait( ssi.getQueuingMetrics.getReplyUnlessFailedFor( StorageQueuingMetricsRequest(), 0, 0 ) ); // SOMEDAY: or tryGetReply?
if (reply.present()) {
myQueueInfo->value.valid = true;
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
myQueueInfo->value.verySmoothDurableVersion.reset(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.reset(reply.get().version);
2017-05-26 04:48:44 +08:00
} else {
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
myQueueInfo->value.smoothDurableBytes.setTotal( reply.get().bytesDurable );
myQueueInfo->value.verySmoothDurableBytes.setTotal( reply.get().bytesDurable );
myQueueInfo->value.smoothInputBytes.setTotal( reply.get().bytesInput );
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
myQueueInfo->value.verySmoothDurableVersion.setTotal(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
2017-05-26 04:48:44 +08:00
}
} else {
if(myQueueInfo->value.valid) {
TraceEvent("RkStorageServerDidNotRespond", ssi.id());
}
2017-05-26 04:48:44 +08:00
myQueueInfo->value.valid = false;
}
wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && IFailureMonitor::failureMonitor().onStateEqual(ssi.getQueuingMetrics.getEndpoint(), FailureStatus(false)));
2017-05-26 04:48:44 +08:00
}
} catch (...) {
// including cancellation
self->storageQueueInfo.erase( myQueueInfo );
throw;
}
}
ACTOR Future<Void> trackTLogQueueInfo( RatekeeperData* self, TLogInterface tli ) {
2017-05-26 04:48:44 +08:00
self->tlogQueueInfo.insert( mapPair(tli.id(), TLogQueueInfo(tli.id()) ) );
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
TraceEvent("RkTracking", tli.id());
try {
loop {
ErrorOr<TLogQueuingMetricsReply> reply = wait( tli.getQueuingMetrics.getReplyUnlessFailedFor( TLogQueuingMetricsRequest(), 0, 0 ) ); // SOMEDAY: or tryGetReply?
if (reply.present()) {
myQueueInfo->value.valid = true;
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
} else {
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable);
myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total);
}
} else {
if(myQueueInfo->value.valid) {
TraceEvent("RkTLogDidNotRespond", tli.id());
}
2017-05-26 04:48:44 +08:00
myQueueInfo->value.valid = false;
}
wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && IFailureMonitor::failureMonitor().onStateEqual(tli.getQueuingMetrics.getEndpoint(), FailureStatus(false)));
2017-05-26 04:48:44 +08:00
}
} catch (...) {
// including cancellation
self->tlogQueueInfo.erase( myQueueInfo );
throw;
}
}
ACTOR Future<Void> splitError( Future<Void> in, Promise<Void> errOut ) {
try {
wait( in );
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled && !errOut.isSet())
errOut.sendError(e);
throw;
}
}
ACTOR Future<Void> trackEachStorageServer(
RatekeeperData* self,
2017-05-26 04:48:44 +08:00
FutureStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges )
{
state Map<UID, Future<Void>> actors;
state Promise<Void> err;
loop choose {
when (state std::pair< UID, Optional<StorageServerInterface> > change = waitNext(serverChanges) ) {
wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack
2017-05-26 04:48:44 +08:00
if (change.second.present()) {
auto& a = actors[ change.first ];
a = Future<Void>();
a = splitError( trackStorageServerQueueInfo(self, change.second.get()), err );
} else
actors.erase( change.first );
}
when (wait(err.getFuture())) {}
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> monitorServerListChange(
RatekeeperData* self,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
state Database db = openDBOnServer(dbInfo, TaskPriority::Ratekeeper, true, true);
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(db);
loop {
try {
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
2019-03-13 02:34:16 +08:00
vector<std::pair<StorageServerInterface, ProcessClass>> results = wait(getServerListAndProcessClasses(&tr));
self->lastSSListFetchedTimestamp = now();
2019-03-13 02:34:16 +08:00
std::map<UID, StorageServerInterface> newServers;
for (int i = 0; i < results.size(); i++) {
const StorageServerInterface& ssi = results[i].first;
const UID serverId = ssi.id();
newServers[serverId] = ssi;
2019-03-13 02:34:16 +08:00
if (oldServers.count(serverId)) {
if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
oldServers.erase(serverId);
} else {
serverChanges.send( std::make_pair(serverId, Optional<StorageServerInterface>(ssi)) );
}
}
2019-03-13 02:34:16 +08:00
for (const auto& it : oldServers) {
serverChanges.send( std::make_pair(it.first, Optional<StorageServerInterface>()) );
}
oldServers.swap(newServers);
tr = Transaction(db);
wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY));
} catch(Error& e) {
wait( tr.onError(e) );
}
}
}
2019-03-07 02:46:17 +08:00
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
2017-05-26 04:48:44 +08:00
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTps;
2017-05-26 04:48:44 +08:00
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
if(self->actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) {
self->actualTpsHistory.pop_front();
}
self->actualTpsHistory.push_back(actualTps);
2017-05-26 04:48:44 +08:00
limits->tpsLimit = std::numeric_limits<double>::infinity();
2017-05-26 04:48:44 +08:00
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
2017-05-26 04:48:44 +08:00
int sscount = 0;
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
int64_t worstDurabilityLag = 0;
2017-05-26 04:48:44 +08:00
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
std::multimap<int64_t, StorageQueueInfo*> storageDurabilityLagReverseIndex;
std::map<UID, limitReason_t> ssReasons;
2017-05-26 04:48:44 +08:00
2019-03-02 05:14:18 +08:00
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
2017-05-26 04:48:44 +08:00
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
auto& ss = i->value;
if (!ss.valid) continue;
++sscount;
limitReason_t ssLimitReason = limitReason_t::unlimited;
2017-05-26 04:48:44 +08:00
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
2019-03-07 02:46:17 +08:00
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->storageTargetBytes) {
2017-05-26 04:48:44 +08:00
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
ssLimitReason = limitReason_t::storage_server_min_free_space;
2017-05-26 04:48:44 +08:00
} else {
ssLimitReason = limitReason_t::storage_server_min_free_space_ratio;
2017-05-26 04:48:44 +08:00
}
}
int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal();
worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue);
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.verySmoothDurableVersion.smoothTotal();
worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag);
storageDurabilityLagReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
auto& ssMetrics = self->healthMetrics.storageStats[ss.id];
ssMetrics.storageQueue = storageQueue;
ssMetrics.storageDurabilityLag = storageDurabilityLag;
ssMetrics.cpuUsage = ss.lastReply.cpuUsage;
ssMetrics.diskUsage = ss.lastReply.diskUsage;
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
2017-05-26 04:48:44 +08:00
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
/*if( deterministicRandom()->random01() < 0.1 ) {
std::string name = "RatekeeperUpdateRate" + limits.context;
TraceEvent(name, ss.id)
2017-05-26 04:48:44 +08:00
.detail("MinFreeSpace", minFreeSpace)
.detail("SpringBytes", springBytes)
.detail("TargetBytes", targetBytes)
.detail("SmoothTotalSpaceTotal", ss.smoothTotalSpace.smoothTotal())
.detail("SmoothFreeSpaceTotal", ss.smoothFreeSpace.smoothTotal())
.detail("LastReplyBytesInput", ss.lastReply.bytesInput)
.detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal())
.detail("TargetRateRatio", targetRateRatio)
.detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate())
.detail("ActualTPS", actualTps)
2017-05-26 04:48:44 +08:00
.detail("InputRate", inputRate)
.detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate())
.detail("B", b);
2017-05-26 04:48:44 +08:00
}*/
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = (targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0);
double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ssLimitReason == limitReason_t::unlimited)
ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
2017-05-26 04:48:44 +08:00
if (targetRateRatio > 0 && inputRate > 0) {
ASSERT(inputRate != 0);
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
double x = smoothedRate / (inputRate * targetRateRatio);
double lim = actualTps * x;
if (lim < limitTps) {
limitTps = lim;
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc) {
ssLimitReason = limitReason_t::storage_server_write_queue_size;
}
2019-03-14 12:27:23 +08:00
}
}
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
2017-05-26 04:48:44 +08:00
2019-03-07 02:46:17 +08:00
if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
2017-05-26 04:48:44 +08:00
reasonID = ss.id;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = limitTps;
limitReason = ssLimitReason;
2017-05-26 04:48:44 +08:00
}
ssReasons[ss.id] = ssLimitReason;
2017-05-26 04:48:44 +08:00
}
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
2019-03-07 02:46:17 +08:00
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
2017-05-26 04:48:44 +08:00
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
}
2019-03-07 02:46:17 +08:00
if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
2017-05-26 04:48:44 +08:00
continue;
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
2019-03-07 02:46:17 +08:00
limits->tpsLimit = ss->first;
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
2019-03-07 02:46:17 +08:00
limitReason = ssReasons[reasonID];
2017-05-26 04:48:44 +08:00
break;
}
int64_t limitingDurabilityLag = 0;
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) {
if (ignoredDurabilityLagMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredDurabilityLagMachines.insert(ss->second->locality.zoneId());
continue;
}
if (ignoredDurabilityLagMachines.count(ss->second->locality.zoneId()) > 0) {
continue;
}
limitingDurabilityLag = -1*ss->first;
if(limitingDurabilityLag > limits->durabilityLagTargetVersions && self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) {
if(limits->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
double maxTps = 0;
for(int i = 0; i < self->actualTpsHistory.size(); i++) {
maxTps = std::max(maxTps, self->actualTpsHistory[i]);
}
limits->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER*maxTps;
}
if( limitingDurabilityLag > limits->lastDurabilityLag ) {
limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_REDUCTION_RATE*limits->durabilityLagLimit;
}
if(limits->durabilityLagLimit < limits->tpsLimit) {
limits->tpsLimit = limits->durabilityLagLimit;
limitReason = limitReason_t::storage_server_durability_lag;
}
} else if(limits->durabilityLagLimit != std::numeric_limits<double>::infinity() && limitingDurabilityLag > limits->durabilityLagTargetVersions - SERVER_KNOBS->DURABILITY_LAG_UNLIMITED_THRESHOLD) {
limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_INCREASE_RATE*limits->durabilityLagLimit;
} else {
limits->durabilityLagLimit = std::numeric_limits<double>::infinity();
}
limits->lastDurabilityLag = limitingDurabilityLag;
break;
}
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
self->healthMetrics.worstStorageDurabilityLag = worstDurabilityLag;
2017-05-26 04:48:44 +08:00
double writeToReadLatencyLimit = 0;
Version worstVersionLag = 0;
Version limitingVersionLag = 0;
{
Version minSSVer = std::numeric_limits<Version>::max();
Version minLimitingSSVer = std::numeric_limits<Version>::max();
2019-03-07 02:46:17 +08:00
for (const auto& it : self->storageQueueInfo) {
auto& ss = it.value;
2017-05-26 04:48:44 +08:00
if (!ss.valid) continue;
minSSVer = std::min(minSSVer, ss.lastReply.version);
2017-05-26 04:48:44 +08:00
// Machines that ratekeeper isn't controlling can fall arbitrarily far behind
2019-03-07 02:46:17 +08:00
if (ignoredMachines.count(it.value.locality.zoneId()) == 0) {
minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version);
2017-05-26 04:48:44 +08:00
}
}
Version maxTLVer = std::numeric_limits<Version>::min();
2019-03-07 02:46:17 +08:00
for(const auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
2017-05-26 04:48:44 +08:00
if (!tl.valid) continue;
maxTLVer = std::max(maxTLVer, tl.lastReply.v);
}
if (minSSVer != std::numeric_limits<Version>::max() && maxTLVer != std::numeric_limits<Version>::min()) {
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit =
((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
2017-05-26 04:48:44 +08:00
}
int64_t worstFreeSpaceTLog = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueTLog = 0;
int tlcount = 0;
2019-03-07 02:46:17 +08:00
for (auto& it : self->tlogQueueInfo) {
auto& tl = it.value;
2017-05-26 04:48:44 +08:00
if (!tl.valid) continue;
++tlcount;
limitReason_t tlogLimitReason = limitReason_t::log_server_write_queue;
int64_t minFreeSpace = std::max( SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * tl.smoothTotalSpace.smoothTotal()));
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
2019-03-07 02:46:17 +08:00
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits->logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits->logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits->logTargetBytes) {
2017-05-26 04:48:44 +08:00
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
tlogLimitReason = limitReason_t::log_server_min_free_space_ratio;
}
}
int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal();
self->healthMetrics.tLogQueue[tl.id] = queue;
2017-05-26 04:48:44 +08:00
int64_t b = queue - targetBytes;
worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue);
if( tl.lastReply.bytesInput - tl.lastReply.bytesDurable > tl.lastReply.storageBytes.free - minFreeSpace / 2 ) {
if(now() - self->lastWarning > 5.0) {
self->lastWarning = now();
TraceEvent(SevWarnAlways, "RkTlogMinFreeSpaceZero").detail("ReasonId", tl.id);
2017-05-26 04:48:44 +08:00
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = 0.0;
2017-05-26 04:48:44 +08:00
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
if (writeToReadLatencyLimit > targetRateRatio){
targetRateRatio = writeToReadLatencyLimit;
tlogLimitReason = limitReason_t::storage_server_readable_behind;
}
double inputRate = tl.smoothInputBytes.smoothRate();
if (targetRateRatio > 0) {
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
double x = smoothedRate / (inputRate * targetRateRatio);
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTps * x;
2019-03-07 02:46:17 +08:00
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
2017-05-26 04:48:44 +08:00
reasonID = tl.id;
limitReason = tlogLimitReason;
}
}
if (inputRate > 0) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTps * x;
2019-03-07 02:46:17 +08:00
if (lim < limits->tpsLimit){
limits->tpsLimit = lim;
2017-05-26 04:48:44 +08:00
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
}
}
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
2019-03-07 02:46:17 +08:00
limits->tpsLimit = std::max(limits->tpsLimit, 0.0);
2017-05-26 04:48:44 +08:00
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
2019-03-07 02:46:17 +08:00
limits->tpsLimit = std::max(limits->tpsLimit, 100.0);
2017-05-26 04:48:44 +08:00
}
int64_t totalDiskUsageBytes = 0;
for(auto & t : self->tlogQueueInfo)
if (t.value.valid)
totalDiskUsageBytes += t.value.lastReply.storageBytes.used;
for(auto & s : self->storageQueueInfo)
if (s.value.valid)
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
if (now() - self->lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) {
2019-07-19 03:32:35 +08:00
limits->tpsLimit = 0.0;
limitReason = limitReason_t::storage_server_list_fetch_failed;
reasonID = UID();
TraceEvent(SevWarnAlways, "RkSSListFetchTimeout").suppressFor(1.0);
}
2019-03-07 02:46:17 +08:00
limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6);
limits->reasonMetric = limitReason;
2017-05-26 04:48:44 +08:00
if (deterministicRandom()->random01() < 0.1) {
2019-03-07 02:46:17 +08:00
std::string name = "RkUpdate" + limits->context;
TraceEvent(name.c_str())
2019-03-07 02:46:17 +08:00
.detail("TPSLimit", limits->tpsLimit)
2017-05-26 04:48:44 +08:00
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID==UID() ? std::string() : Traceable<UID>::toString(reasonID))
2017-05-26 04:48:44 +08:00
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps)
2017-05-26 04:48:44 +08:00
.detail("StorageServers", sscount)
2019-03-02 06:06:47 +08:00
.detail("Proxies", self->proxy_transactionCounts.size())
2017-05-26 04:48:44 +08:00
.detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
.detail("WorstStorageServerQueue", worstStorageQueueStorageServer)
.detail("LimitingStorageServerQueue", limitingStorageQueueStorageServer)
.detail("WorstTLogQueue", worstStorageQueueTLog)
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
.detail("WorstStorageServerVersionLag", worstVersionLag)
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.trackLatest(name.c_str());
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
loop {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Standalone<RangeResultRef> results = wait( tr.getRange( configKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !results.more && results.size() < CLIENT_KNOBS->TOO_MANY );
conf->fromKeyValues( (VectorRef<KeyValueRef>) results );
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey);
wait( tr.commit() );
wait( watchFuture );
break;
} catch (Error& e) {
wait( tr.onError(e) );
}
}
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
state RatekeeperData self;
2017-05-26 04:48:44 +08:00
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
state Promise<Void> err;
state Future<Void> collection = actorCollection( self.addActor.getFuture() );
2019-07-05 23:12:25 +08:00
TraceEvent("RatekeeperStarting", rkInterf.id());
self.addActor.send( waitFailureServer(rkInterf.waitFailure.getFuture()) );
self.addActor.send( configurationMonitor(dbInfo, &self.configuration) );
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
self.addActor.send( monitorServerListChange(&self, dbInfo, serverChanges) );
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
2017-05-26 04:48:44 +08:00
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
TraceEvent("RkStorageServerQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER).detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER).detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES)
.detail("Rate", (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0));
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
2017-05-26 04:48:44 +08:00
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
try {
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
2019-03-07 02:46:17 +08:00
updateRate(&self, &self.normalLimits);
updateRate(&self, &self.batchLimits);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
2017-05-26 04:48:44 +08:00
double tooOld = now() - 1.0;
2019-03-02 06:06:47 +08:00
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
if (p->second.time < tooOld)
p = self.proxy_transactionCounts.erase(p);
2017-05-26 04:48:44 +08:00
else
++p;
}
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
}
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
2017-05-26 04:48:44 +08:00
GetRateInfoReply reply;
2019-03-02 06:06:47 +08:00
auto& p = self.proxy_transactionCounts[ req.requesterID ];
2017-05-26 04:48:44 +08:00
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
2019-03-02 06:06:47 +08:00
if (p.total > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.total );
}
2019-03-02 06:06:47 +08:00
if(p.batch > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batch );
}
2017-05-26 04:48:44 +08:00
2019-03-02 06:06:47 +08:00
p.total = req.totalReleasedTransactions;
p.batch = req.batchReleasedTransactions;
p.time = now();
2017-05-26 04:48:44 +08:00
2019-03-02 06:06:47 +08:00
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
2017-05-26 04:48:44 +08:00
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
reply.healthMetrics.batchLimited = lastLimited;
2017-05-26 04:48:44 +08:00
req.reply.send( reply );
}
when (HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) {
req.reply.send(Void());
TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID);
break;
}
when (wait(err.getFuture())) {}
when (wait(dbInfo->onChange())) {
if( tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs() ) {
tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs();
2017-05-26 04:48:44 +08:00
tlogTrackers = std::vector<Future<Void>>();
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
}
}
when ( wait(collection) ) {
ASSERT(false);
throw internal_error();
}
2017-05-26 04:48:44 +08:00
}
}
catch (Error& err) {
2019-07-05 23:12:25 +08:00
TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true);
}
return Void();
2017-05-26 04:48:44 +08:00
}