the durabilityLagLimit needs to be tracked separately for batch priority and normal priority
This commit is contained in:
parent
fef58e13a4
commit
b2b2e25324
|
@ -128,7 +128,10 @@ struct RatekeeperLimits {
|
|||
int64_t logTargetBytes;
|
||||
int64_t logSpringBytes;
|
||||
double maxVersionDifference;
|
||||
|
||||
int64_t durabilityLagTargetVersions;
|
||||
int64_t lastDurabilityLag;
|
||||
double durabilityLagLimit;
|
||||
|
||||
std::string context;
|
||||
|
||||
|
@ -142,6 +145,8 @@ struct RatekeeperLimits {
|
|||
logSpringBytes(logSpringBytes),
|
||||
maxVersionDifference(maxVersionDifference),
|
||||
durabilityLagTargetVersions(durabilityLagTargetVersions),
|
||||
durabilityLagLimit(std::numeric_limits<double>::infinity()),
|
||||
lastDurabilityLag(0),
|
||||
context(context)
|
||||
{}
|
||||
};
|
||||
|
@ -172,16 +177,13 @@ struct RatekeeperData {
|
|||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
||||
int64_t lastDurabilityLag;
|
||||
double durabilityLagLimit;
|
||||
Deque<double> actualTpsHistory;
|
||||
|
||||
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
|
||||
durabilityLagLimit(std::numeric_limits<double>::infinity()), lastDurabilityLag(0)
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH)
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -365,8 +367,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
|
||||
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
|
||||
int64_t worstStorageQueueStorageServer = 0;
|
||||
int64_t worstStorageDurabilityLagStorageServer = 0;
|
||||
int64_t limitingStorageQueueStorageServer = 0;
|
||||
int64_t worstDurabilityLag = 0;
|
||||
double worstStorageLocalLimit = 0;
|
||||
double limitingStorageLocalLimit = 0;
|
||||
|
||||
|
@ -402,7 +404,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
worstStorageLocalLimit = std::min(worstStorageLocalLimit, ss.localRateLimit);
|
||||
|
||||
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
||||
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
|
||||
worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag);
|
||||
|
||||
storageDurabilityLagReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
|
||||
|
||||
|
@ -484,7 +486,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t limitingStorageDurabilityLagStorageServer = 0;
|
||||
int64_t limitingDurabilityLag = 0;
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
|
||||
for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) {
|
||||
|
@ -496,31 +498,31 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
continue;
|
||||
}
|
||||
|
||||
limitingStorageDurabilityLagStorageServer = -1*ss->first;
|
||||
if(limitingStorageDurabilityLagStorageServer > limits->durabilityLagTargetVersions && self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) {
|
||||
if(self->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
|
||||
limitingDurabilityLag = -1*ss->first;
|
||||
if(limitingDurabilityLag > limits->durabilityLagTargetVersions && self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) {
|
||||
if(limits->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
|
||||
double maxTps = 0;
|
||||
for(int i = 0; i < self->actualTpsHistory.size(); i++) {
|
||||
maxTps = std::max(maxTps, self->actualTpsHistory[i]);
|
||||
}
|
||||
self->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER*maxTps;
|
||||
limits->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER*maxTps;
|
||||
}
|
||||
if( limitingStorageDurabilityLagStorageServer > self->lastDurabilityLag ) {
|
||||
self->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_REDUCTION_RATE*self->durabilityLagLimit;
|
||||
if( limitingDurabilityLag > limits->lastDurabilityLag ) {
|
||||
limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_REDUCTION_RATE*limits->durabilityLagLimit;
|
||||
}
|
||||
if(self->durabilityLagLimit < limits->tpsLimit) {
|
||||
limits->tpsLimit = self->durabilityLagLimit;
|
||||
if(limits->durabilityLagLimit < limits->tpsLimit) {
|
||||
limits->tpsLimit = limits->durabilityLagLimit;
|
||||
limitReason = limitReason_t::storage_server_durability_lag;
|
||||
}
|
||||
} else {
|
||||
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
||||
limits->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
||||
}
|
||||
self->lastDurabilityLag = limitingStorageDurabilityLagStorageServer;
|
||||
limits->lastDurabilityLag = limitingDurabilityLag;
|
||||
break;
|
||||
}
|
||||
|
||||
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
||||
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
|
||||
self->healthMetrics.worstStorageDurabilityLag = worstDurabilityLag;
|
||||
|
||||
double writeToReadLatencyLimit = 0;
|
||||
Version worstVersionLag = 0;
|
||||
|
@ -667,8 +669,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
|
||||
.detail("WorstStorageServerVersionLag", worstVersionLag)
|
||||
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
|
||||
.detail("WorstDurabilityLag", worstStorageDurabilityLagStorageServer)
|
||||
.detail("LimitingDurabilityLag", limitingStorageDurabilityLagStorageServer)
|
||||
.detail("WorstDurabilityLag", worstDurabilityLag)
|
||||
.detail("LimitingDurabilityLag", limitingDurabilityLag)
|
||||
.trackLatest(name.c_str());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue