Ratekeeper to throttle tpsLimit to 1 if it is not able to fetch storage server list for some configurable amount of time.
This commit is contained in:
parent
b9e938972c
commit
406bcebdc4
|
@ -626,6 +626,9 @@ std::string getDateInfoString(StatusObjectReader statusObj, std::string key) {
|
|||
}
|
||||
|
||||
std::string getProcessAddressByServerID(StatusObjectReader processesMap, std::string serverID) {
|
||||
if(serverID == "")
|
||||
return "unknown";
|
||||
|
||||
for (auto proc : processesMap.obj()){
|
||||
try {
|
||||
StatusArray rolesArray = proc.second.get_obj()["roles"].get_array();
|
||||
|
|
|
@ -252,7 +252,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_durability_lag"
|
||||
"storage_server_durability_lag",
|
||||
"storage_server_list_fetch_failed"
|
||||
]
|
||||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
|
@ -272,7 +273,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_durability_lag"
|
||||
"storage_server_durability_lag",
|
||||
"storage_server_list_fetch_failed"
|
||||
]
|
||||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
|
|
|
@ -404,7 +404,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( INITIAL_DURABILITY_LAG_MULTIPLIER, 1.02 );
|
||||
init( DURABILITY_LAG_REDUCTION_RATE, 0.9999 );
|
||||
init( DURABILITY_LAG_INCREASE_RATE, 1.001 );
|
||||
|
||||
init( STORAGE_SERVER_LIST_FETCH_TIMEOUT, 20.0 );
|
||||
|
||||
//Storage Metrics
|
||||
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );
|
||||
init( STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, 1000.0 / STORAGE_METRICS_AVERAGE_INTERVAL ); // milliHz!
|
||||
|
|
|
@ -340,7 +340,9 @@ public:
|
|||
double INITIAL_DURABILITY_LAG_MULTIPLIER;
|
||||
double DURABILITY_LAG_REDUCTION_RATE;
|
||||
double DURABILITY_LAG_INCREASE_RATE;
|
||||
|
||||
|
||||
double STORAGE_SERVER_LIST_FETCH_TIMEOUT;
|
||||
|
||||
//Storage Metrics
|
||||
double STORAGE_METRICS_AVERAGE_INTERVAL;
|
||||
double STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
|
||||
|
|
|
@ -42,6 +42,7 @@ enum limitReason_t {
|
|||
log_server_min_free_space,
|
||||
log_server_min_free_space_ratio,
|
||||
storage_server_durability_lag,
|
||||
storage_server_list_fetch_failed,
|
||||
limitReason_t_end
|
||||
};
|
||||
|
||||
|
@ -58,7 +59,8 @@ const char* limitReasonName[] = {
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_durability_lag"
|
||||
"storage_server_durability_lag",
|
||||
"storage_server_list_fetch_failed"
|
||||
};
|
||||
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
||||
|
@ -75,7 +77,8 @@ const char* limitReasonDesc[] = {
|
|||
"Storage server running out of space (approaching 5% limit).",
|
||||
"Log server running out of space (approaching 100MB limit).",
|
||||
"Log server running out of space (approaching 5% limit).",
|
||||
"Storage server durable version falling behind."
|
||||
"Storage server durable version falling behind.",
|
||||
"Unable to fetch storage server list."
|
||||
};
|
||||
|
||||
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
@ -173,6 +176,7 @@ struct RatekeeperData {
|
|||
Int64MetricHandle actualTpsMetric;
|
||||
|
||||
double lastWarning;
|
||||
double lastSSListFetchedTimestamp;
|
||||
|
||||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
@ -181,7 +185,7 @@ struct RatekeeperData {
|
|||
|
||||
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0),
|
||||
lastWarning(0), lastSSListFetchedTimestamp(now()),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH)
|
||||
{}
|
||||
|
@ -307,6 +311,7 @@ ACTOR Future<Void> trackEachStorageServer(
|
|||
}
|
||||
|
||||
ACTOR Future<Void> monitorServerListChange(
|
||||
RatekeeperData* self,
|
||||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges) {
|
||||
state Database db = openDBOnServer(dbInfo, TaskPriority::Ratekeeper, true, true);
|
||||
|
@ -315,7 +320,9 @@ ACTOR Future<Void> monitorServerListChange(
|
|||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
|
||||
vector<std::pair<StorageServerInterface, ProcessClass>> results = wait(getServerListAndProcessClasses(&tr));
|
||||
self->lastSSListFetchedTimestamp = now();
|
||||
|
||||
std::map<UID, StorageServerInterface> newServers;
|
||||
for (int i = 0; i < results.size(); i++) {
|
||||
|
@ -646,6 +653,13 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
if (s.value.valid)
|
||||
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
|
||||
|
||||
if (now() - self->lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) {
|
||||
limits->tpsLimit = 1;
|
||||
limitReason = limitReason_t::storage_server_list_fetch_failed;
|
||||
reasonID = UID();
|
||||
TraceEvent(SevWarnAlways, "RkSSListFetchTimeout").suppressFor(1.0);
|
||||
}
|
||||
|
||||
limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6);
|
||||
limits->reasonMetric = limitReason;
|
||||
|
||||
|
@ -654,7 +668,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
TraceEvent(name.c_str())
|
||||
.detail("TPSLimit", limits->tpsLimit)
|
||||
.detail("Reason", limitReason)
|
||||
.detail("ReasonServerID", reasonID)
|
||||
.detail("ReasonServerID", reasonID==UID() ? std::string() : Traceable<UID>::toString(reasonID))
|
||||
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
|
||||
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
|
||||
.detail("TPSBasis", actualTps)
|
||||
|
@ -715,7 +729,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
self.addActor.send( configurationMonitor(dbInfo, &self.configuration) );
|
||||
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
|
||||
self.addActor.send( monitorServerListChange(dbInfo, serverChanges) );
|
||||
self.addActor.send( monitorServerListChange(&self, dbInfo, serverChanges) );
|
||||
self.addActor.send( trackEachStorageServer(&self, serverChanges.getFuture()) );
|
||||
|
||||
TraceEvent("RkTLogQueueSizeParameters").detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG).detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG)
|
||||
|
|
Loading…
Reference in New Issue