experimental slow control on durability lag
This commit is contained in:
parent
a0203457d9
commit
e85c05c906
|
@ -41,6 +41,7 @@ enum limitReason_t {
|
|||
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
|
||||
log_server_min_free_space,
|
||||
log_server_min_free_space_ratio,
|
||||
storage_server_durability_lag,
|
||||
limitReason_t_end
|
||||
};
|
||||
|
||||
|
@ -56,7 +57,8 @@ const char* limitReasonName[] = {
|
|||
"storage_server_min_free_space",
|
||||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio"
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_durability_lag"
|
||||
};
|
||||
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
||||
|
@ -72,7 +74,8 @@ const char* limitReasonDesc[] = {
|
|||
"Storage server running out of space (approaching 100MB limit).",
|
||||
"Storage server running out of space (approaching 5% limit).",
|
||||
"Log server running out of space (approaching 100MB limit).",
|
||||
"Log server running out of space (approaching 5% limit)."
|
||||
"Log server running out of space (approaching 5% limit).",
|
||||
"Storage server is overwhelmed by read workload"
|
||||
};
|
||||
|
||||
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
@ -166,12 +169,15 @@ struct RatekeeperData {
|
|||
|
||||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
double durabilityLagLimit;
|
||||
int64_t lastDurabilityLag;
|
||||
|
||||
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH)
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH),
|
||||
durabilityLagLimit(std::numeric_limits<double>::infinity()), lastDurabilityLag(0)
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -342,9 +348,9 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
|
||||
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
|
||||
|
||||
limits->tpsLimit = std::numeric_limits<double>::infinity();
|
||||
limits->tpsLimit = self->durabilityLagLimit;
|
||||
UID reasonID = UID();
|
||||
limitReason_t limitReason = limitReason_t::unlimited;
|
||||
limitReason_t limitReason = self->durabilityLagLimit == std::numeric_limits<double>::infinity() ? limitReason_t::unlimited : limitReasion_t::storage_server_durability_lag;
|
||||
|
||||
int sscount = 0;
|
||||
|
||||
|
@ -446,6 +452,10 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
ssReasons[ss.id] = ssLimitReason;
|
||||
}
|
||||
|
||||
if(worstStorageDurabilityLagStorageServer < 200e6) {
|
||||
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
||||
}
|
||||
|
||||
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
||||
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
|
||||
|
||||
|
@ -617,6 +627,46 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
}
|
||||
}
|
||||
|
||||
void updateVersionLagRate(RatekeeperData* self) {
|
||||
int64_t worstStorageDurabilityLagStorageServer = 0;
|
||||
int64_t limitingStorageDurabilityLagStorageServer = 0;
|
||||
|
||||
std::multimap<int64_t, StorageQueueInfo*> storageTpsLimitReverseIndex;
|
||||
|
||||
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
|
||||
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
|
||||
auto& ss = i->value;
|
||||
if (!ss.valid) continue;
|
||||
|
||||
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
||||
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
|
||||
|
||||
storageTpsLimitReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
|
||||
}
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
|
||||
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
|
||||
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
||||
ignoredMachines.insert(ss->second->locality.zoneId());
|
||||
continue;
|
||||
}
|
||||
if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
limitingStorageDurabilityLagStorageServer = -1*ss->first;
|
||||
if(limitingStorageDurabilityLagStorageServer < 200e6) {
|
||||
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
||||
} else if(self->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
|
||||
self->durabilityLagLimit = std::max( 1000.0, 0.95*self->smoothReleasedTransactions.smoothRate() );
|
||||
} else if(limitingStorageDurabilityLagStorageServer > self->lastDurabilityLag) {
|
||||
self->durabilityLagLimit = 0.95*self->durabilityLagLimit;
|
||||
}
|
||||
self->lastDurabilityLag = limitingStorageDurabilityLagStorageServer;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
|
||||
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
||||
loop {
|
||||
|
@ -645,6 +695,7 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
|
|||
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
state RatekeeperData self;
|
||||
state Future<Void> timeout = Void();
|
||||
state Future<Void> versionLagTimeout = Void();
|
||||
state std::vector<Future<Void>> tlogTrackers;
|
||||
state std::vector<TLogInterface> tlogInterfs;
|
||||
state Promise<Void> err;
|
||||
|
@ -685,6 +736,10 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
}
|
||||
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
|
||||
}
|
||||
when (wait( versionLagTimeout )) {
|
||||
updateVersionLagRate(&self);
|
||||
versionLagTimeout = delayJittered(60.0);
|
||||
}
|
||||
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
|
||||
GetRateInfoReply reply;
|
||||
|
||||
|
|
Loading…
Reference in New Issue