a better attempt a ratekeeper control on durability lag
This commit is contained in:
parent
dc171b3eae
commit
c5fb5494f5
|
@ -169,8 +169,10 @@ struct RatekeeperData {
|
||||||
|
|
||||||
RatekeeperLimits normalLimits;
|
RatekeeperLimits normalLimits;
|
||||||
RatekeeperLimits batchLimits;
|
RatekeeperLimits batchLimits;
|
||||||
double durabilityLagLimit;
|
|
||||||
int64_t lastDurabilityLag;
|
int64_t lastDurabilityLag;
|
||||||
|
double durabilityLagLimit;
|
||||||
|
Deque<double> actualTpsHistory;
|
||||||
|
|
||||||
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||||
|
@ -347,10 +349,15 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
self->actualTpsMetric = (int64_t)actualTps;
|
self->actualTpsMetric = (int64_t)actualTps;
|
||||||
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
|
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
|
||||||
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
|
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
|
||||||
|
|
||||||
|
if(self->actualTpsHistory.size() > 600) {
|
||||||
|
self->actualTpsHistory.pop_front();
|
||||||
|
}
|
||||||
|
self->actualTpsHistory.push_back(actualTps);
|
||||||
|
|
||||||
limits->tpsLimit = self->durabilityLagLimit;
|
limits->tpsLimit = std::numeric_limits<double>::infinity();
|
||||||
UID reasonID = UID();
|
UID reasonID = UID();
|
||||||
limitReason_t limitReason = self->durabilityLagLimit == std::numeric_limits<double>::infinity() ? limitReason_t::unlimited : limitReason_t::storage_server_durability_lag;
|
limitReason_t limitReason = limitReason_t::unlimited;
|
||||||
|
|
||||||
int sscount = 0;
|
int sscount = 0;
|
||||||
|
|
||||||
|
@ -362,6 +369,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
double limitingStorageLocalLimit = 0;
|
double limitingStorageLocalLimit = 0;
|
||||||
|
|
||||||
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
|
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
|
||||||
|
std::multimap<int64_t, StorageQueueInfo*> storageDurabilityLagReverseIndex;
|
||||||
|
|
||||||
std::map<UID, limitReason_t> ssReasons;
|
std::map<UID, limitReason_t> ssReasons;
|
||||||
|
|
||||||
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
|
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
|
||||||
|
@ -393,6 +402,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
||||||
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
|
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
|
||||||
|
|
||||||
|
storageDurabilityLagReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
|
||||||
|
|
||||||
auto& ssMetrics = self->healthMetrics.storageStats[ss.id];
|
auto& ssMetrics = self->healthMetrics.storageStats[ss.id];
|
||||||
ssMetrics.storageQueue = storageQueue;
|
ssMetrics.storageQueue = storageQueue;
|
||||||
ssMetrics.storageDurabilityLag = storageDurabilityLag;
|
ssMetrics.storageDurabilityLag = storageDurabilityLag;
|
||||||
|
@ -452,13 +463,6 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
ssReasons[ss.id] = ssLimitReason;
|
ssReasons[ss.id] = ssLimitReason;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(worstStorageDurabilityLagStorageServer < 200e6) {
|
|
||||||
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
|
||||||
}
|
|
||||||
|
|
||||||
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
|
||||||
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
|
|
||||||
|
|
||||||
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
|
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
|
||||||
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
|
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
|
||||||
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
||||||
|
@ -478,6 +482,44 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t limitingStorageDurabilityLagStorageServer = 0;
|
||||||
|
|
||||||
|
std::set<Optional<Standalone<StringRef>>> ignoredDurabilityLagMachines;
|
||||||
|
for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) {
|
||||||
|
if (ignoredDurabilityLagMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
||||||
|
ignoredDurabilityLagMachines.insert(ss->second->locality.zoneId());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (ignoredDurabilityLagMachines.count(ss->second->locality.zoneId()) > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
limitingStorageDurabilityLagStorageServer = -1*ss->first;
|
||||||
|
if(limitingStorageDurabilityLagStorageServer > 200e6) {
|
||||||
|
if(self->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
|
||||||
|
double maxTps = 0;
|
||||||
|
for(int i = 0; i < self->actualTpsHistory.size(); i++) {
|
||||||
|
maxTps = std::max(maxTps, self->actualTpsHistory[i]);
|
||||||
|
}
|
||||||
|
self->durabilityLagLimit = 1.02*maxTps;
|
||||||
|
}
|
||||||
|
if( limitingStorageDurabilityLagStorageServer > self->lastDurabilityLag ) {
|
||||||
|
self->durabilityLagLimit = 0.9999*self->durabilityLagLimit;
|
||||||
|
}
|
||||||
|
if(self->durabilityLagLimit < limits->tpsLimit) {
|
||||||
|
limits->tpsLimit = self->durabilityLagLimit;
|
||||||
|
limitReason = limitReason_t::storage_server_durability_lag;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
||||||
|
}
|
||||||
|
self->lastDurabilityLag = limitingStorageDurabilityLagStorageServer;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
|
||||||
|
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
|
||||||
|
|
||||||
double writeToReadLatencyLimit = 0;
|
double writeToReadLatencyLimit = 0;
|
||||||
Version worstVersionLag = 0;
|
Version worstVersionLag = 0;
|
||||||
Version limitingVersionLag = 0;
|
Version limitingVersionLag = 0;
|
||||||
|
@ -627,46 +669,6 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateVersionLagRate(RatekeeperData* self) {
|
|
||||||
int64_t worstStorageDurabilityLagStorageServer = 0;
|
|
||||||
int64_t limitingStorageDurabilityLagStorageServer = 0;
|
|
||||||
|
|
||||||
std::multimap<int64_t, StorageQueueInfo*> storageTpsLimitReverseIndex;
|
|
||||||
|
|
||||||
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
|
|
||||||
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
|
|
||||||
auto& ss = i->value;
|
|
||||||
if (!ss.valid) continue;
|
|
||||||
|
|
||||||
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
|
|
||||||
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
|
|
||||||
|
|
||||||
storageTpsLimitReverseIndex.insert(std::make_pair(-1*storageDurabilityLag, &ss));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
|
|
||||||
for (auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) {
|
|
||||||
if (ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
|
|
||||||
ignoredMachines.insert(ss->second->locality.zoneId());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
limitingStorageDurabilityLagStorageServer = -1*ss->first;
|
|
||||||
if(limitingStorageDurabilityLagStorageServer < 200e6) {
|
|
||||||
self->durabilityLagLimit = std::numeric_limits<double>::infinity();
|
|
||||||
} else if(self->durabilityLagLimit == std::numeric_limits<double>::infinity()) {
|
|
||||||
self->durabilityLagLimit = std::max( 1000.0, 0.95*self->smoothReleasedTransactions.smoothRate() );
|
|
||||||
} else if(limitingStorageDurabilityLagStorageServer > self->lastDurabilityLag) {
|
|
||||||
self->durabilityLagLimit = 0.95*self->durabilityLagLimit;
|
|
||||||
}
|
|
||||||
self->lastDurabilityLag = limitingStorageDurabilityLagStorageServer;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
|
ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo, DatabaseConfiguration* conf) {
|
||||||
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
state Database cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
||||||
loop {
|
loop {
|
||||||
|
@ -695,7 +697,6 @@ ACTOR Future<Void> configurationMonitor(Reference<AsyncVar<ServerDBInfo>> dbInfo
|
||||||
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||||
state RatekeeperData self;
|
state RatekeeperData self;
|
||||||
state Future<Void> timeout = Void();
|
state Future<Void> timeout = Void();
|
||||||
state Future<Void> versionLagTimeout = Void();
|
|
||||||
state std::vector<Future<Void>> tlogTrackers;
|
state std::vector<Future<Void>> tlogTrackers;
|
||||||
state std::vector<TLogInterface> tlogInterfs;
|
state std::vector<TLogInterface> tlogInterfs;
|
||||||
state Promise<Void> err;
|
state Promise<Void> err;
|
||||||
|
@ -736,10 +737,6 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
||||||
}
|
}
|
||||||
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
|
timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE);
|
||||||
}
|
}
|
||||||
when (wait( versionLagTimeout )) {
|
|
||||||
updateVersionLagRate(&self);
|
|
||||||
versionLagTimeout = delayJittered(60.0);
|
|
||||||
}
|
|
||||||
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
|
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
|
||||||
GetRateInfoReply reply;
|
GetRateInfoReply reply;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue