ratekeeper’s control algorithm would oscillate when limited by local ratekeeper
This commit is contained in:
parent
1b939d5208
commit
92b32855ca
|
@ -250,7 +250,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_read_load"
|
||||
"storage_server_durability_lag"
|
||||
]
|
||||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
|
@ -270,7 +270,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_read_load"
|
||||
"storage_server_durability_lag"
|
||||
]
|
||||
},
|
||||
"description":"The database is not being saturated by the workload."
|
||||
|
|
|
@ -371,8 +371,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 500e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
|
||||
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 50e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
|
||||
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
|
||||
init( STORAGE_DURABILITY_LAG_SOFT_MAX, 20e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_SOFT_MAX = 10e6;
|
||||
init( STORAGE_DURABILITY_LAG_HARD_MAX, 200e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_HARD_MAX = 100e6;
|
||||
init( TARGET_VERSIONS_PER_STORAGE_SERVER, 20e6 ); if( smallStorageTarget ) TARGET_VERSIONS_PER_STORAGE_SERVER = 10e6;
|
||||
init( SPRING_VERSIONS_STORAGE_SERVER, 2e6 ); if( smallStorageTarget ) SPRING_VERSIONS_STORAGE_SERVER = 1e6;
|
||||
init( TARGET_VERSIONS_PER_STORAGE_SERVER_BATCH, 10e6 ); if( smallStorageTarget ) TARGET_VERSIONS_PER_STORAGE_SERVER_BATCH = 5e6;
|
||||
init( SPRING_VERSIONS_STORAGE_SERVER_BATCH, 1e6 ); if( smallStorageTarget ) SPRING_VERSIONS_STORAGE_SERVER_BATCH = 5e5;
|
||||
|
||||
bool smallTlogTarget = randomize && BUGGIFY;
|
||||
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
||||
|
|
|
@ -311,6 +311,11 @@ public:
|
|||
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
|
||||
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
|
||||
|
||||
int64_t TARGET_VERSIONS_PER_STORAGE_SERVER;
|
||||
int64_t SPRING_VERSIONS_STORAGE_SERVER;
|
||||
int64_t TARGET_VERSIONS_PER_STORAGE_SERVER_BATCH;
|
||||
int64_t SPRING_VERSIONS_STORAGE_SERVER_BATCH;
|
||||
|
||||
int64_t TARGET_BYTES_PER_TLOG;
|
||||
int64_t SPRING_BYTES_TLOG;
|
||||
int64_t TARGET_BYTES_PER_TLOG_BATCH;
|
||||
|
@ -346,7 +351,6 @@ public:
|
|||
int FETCH_KEYS_PARALLELISM_BYTES;
|
||||
int BUGGIFY_BLOCK_BYTES;
|
||||
int64_t STORAGE_HARD_LIMIT_BYTES;
|
||||
int64_t STORAGE_DURABILITY_LAG_SOFT_MAX;
|
||||
int64_t STORAGE_DURABILITY_LAG_HARD_MAX;
|
||||
int STORAGE_COMMIT_BYTES;
|
||||
double STORAGE_COMMIT_INTERVAL;
|
||||
|
|
|
@ -41,7 +41,7 @@ enum limitReason_t {
|
|||
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
|
||||
log_server_min_free_space,
|
||||
log_server_min_free_space_ratio,
|
||||
storage_server_read_load,
|
||||
storage_server_durability_lag,
|
||||
limitReason_t_end
|
||||
};
|
||||
|
||||
|
@ -58,7 +58,7 @@ const char* limitReasonName[] = {
|
|||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_read_load"
|
||||
"storage_server_durability_lag"
|
||||
};
|
||||
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
||||
|
@ -128,10 +128,12 @@ struct RatekeeperLimits {
|
|||
int64_t logTargetBytes;
|
||||
int64_t logSpringBytes;
|
||||
double maxVersionDifference;
|
||||
Version storageTargetVersions;
|
||||
Version storageSpringVersions;
|
||||
|
||||
std::string context;
|
||||
|
||||
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, double maxVersionDifference) :
|
||||
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, double maxVersionDifference, Version storageTargetVersions, Version storageSpringVersions) :
|
||||
tpsLimit(std::numeric_limits<double>::infinity()),
|
||||
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
|
||||
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
|
||||
|
@ -140,6 +142,8 @@ struct RatekeeperLimits {
|
|||
logTargetBytes(logTargetBytes),
|
||||
logSpringBytes(logSpringBytes),
|
||||
maxVersionDifference(maxVersionDifference),
|
||||
storageTargetVersions(storageTargetVersions),
|
||||
storageSpringVersions(storageSpringVersions),
|
||||
context(context)
|
||||
{}
|
||||
};
|
||||
|
@ -173,8 +177,8 @@ struct RatekeeperData {
|
|||
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH)
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_VERSIONS_STORAGE_SERVER),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_VERSIONS_STORAGE_SERVER_BATCH)
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -396,8 +400,10 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
ssMetrics.cpuUsage = ss.lastReply.cpuUsage;
|
||||
ssMetrics.diskUsage = ss.lastReply.diskUsage;
|
||||
|
||||
int64_t b = storageQueue - targetBytes;
|
||||
double targetRateRatio = std::min(( b + springBytes ) / (double)springBytes, 2.0);
|
||||
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
|
||||
double versionTargetRateRatio = std::min(( storageDurabilityLag - limits->storageTargetVersions + limits->storageSpringVersions ) / (double)limits->storageSpringVersions, 2.0);
|
||||
bool versionLimited = versionTargetRateRatio < targetRateRatio;
|
||||
targetRateRatio = std::min(targetRateRatio, versionTargetRateRatio);
|
||||
|
||||
double inputRate = ss.smoothInputBytes.smoothRate();
|
||||
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
|
||||
|
@ -433,16 +439,9 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
double lim = actualTps * x;
|
||||
if (lim < limitTps) {
|
||||
limitTps = lim;
|
||||
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
|
||||
ssLimitReason = limitReason_t::storage_server_write_queue_size;
|
||||
}
|
||||
}
|
||||
|
||||
if (ss.localRateLimit < 0.99) {
|
||||
auto lim = double(self->actualTpsMetric) * ss.localRateLimit;
|
||||
if (lim < limitTps) {
|
||||
limitTps = lim;
|
||||
ssLimitReason = limitReason_t::storage_server_read_load;
|
||||
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc) {
|
||||
ssLimitReason = versionLimited ? limitReasons_t::storage_server_durability_lag : limitReason_t::storage_server_write_queue_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -314,8 +314,8 @@ public:
|
|||
double res;
|
||||
if (versionLag >= SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX) {
|
||||
res = 0.0;
|
||||
} else if (versionLag > SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX) {
|
||||
res = 1.0 - (double(versionLag) / double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX));
|
||||
} else if (versionLag > SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER) {
|
||||
res = 1.0 - (double(versionLag - SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER) / double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX-SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER));
|
||||
} else {
|
||||
res = 1.0;
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ struct LocalRatekeeperWorkload : TestWorkload {
|
|||
double expectedRateLimit = 1.0;
|
||||
if (durabilityLag >= SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX) {
|
||||
expectedRateLimit = 0.0;
|
||||
} else if (durabilityLag > SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX) {
|
||||
} else if (durabilityLag > SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER) {
|
||||
expectedRateLimit = 1.0 - double(durabilityLag) / double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX);
|
||||
}
|
||||
if (expectedRateLimit < metrics.localRateLimit - 0.01 || expectedRateLimit > metrics.localRateLimit + 0.01) {
|
||||
|
@ -118,7 +118,7 @@ struct LocalRatekeeperWorkload : TestWorkload {
|
|||
g_simulator.disableFor(format("%s/updateStorage", ssi.id().toString().c_str()), now() + self->blockWritesFor);
|
||||
state Future<Void> done = delay(self->blockWritesFor);
|
||||
// not much will happen until the storage goes over the soft limit
|
||||
wait(delay(double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX/1e6)));
|
||||
wait(delay(double(SERVER_KNOBS->TARGET_VERSIONS_PER_STORAGE_SERVER/1e6)));
|
||||
wait(testStorage(self, cx, ssi) || done);
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue