change all criteria to knobs
This commit is contained in:
parent
2717cee1f9
commit
e9e11bf53b
|
@ -159,6 +159,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( PRIORITY_SPLIT_SHARD, 950 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
|
||||
|
||||
// Data distribution
|
||||
init( READ_REBALANCE_CPU_THRESHOLD, 15.0 );
|
||||
init( READ_REBALANCE_SRC_PARALLELISM, 5 );
|
||||
init( READ_REBALANCE_SHARD_TOPK, 10 );
|
||||
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
|
||||
init( DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 60.0 ); if( randomize && BUGGIFY ) DATA_DISTRIBUTION_FAILURE_REACTION_TIME = 1.0;
|
||||
bool buggifySmallShards = randomize && BUGGIFY;
|
||||
|
|
|
@ -161,6 +161,10 @@ public:
|
|||
int PRIORITY_SPLIT_SHARD;
|
||||
|
||||
// Data distribution
|
||||
double READ_REBALANCE_CPU_THRESHOLD; // read rebalance only happens if the source servers' CPU > threshold
|
||||
int READ_REBALANCE_SRC_PARALLELISM; // the max count a server become a source server within a certain interval
|
||||
int READ_REBALANCE_SHARD_TOPK; // top k shards were return for random selection in read rebalance
|
||||
|
||||
double RETRY_RELOCATESHARD_DELAY;
|
||||
double DATA_DISTRIBUTION_FAILURE_REACTION_TIME;
|
||||
int MIN_SHARD_BYTES, SHARD_BYTES_RATIO, SHARD_BYTES_PER_SQRT_BYTES, MAX_SHARD_BYTES, KEY_SERVER_SHARD_BYTES;
|
||||
|
|
|
@ -1083,8 +1083,8 @@ struct DDQueueData {
|
|||
bool timeThrottle(const std::vector<UID>& ids) const {
|
||||
return std::any_of(ids.begin(), ids.end(), [this](const UID& id) {
|
||||
if (this->lastAsSource.count(id)) {
|
||||
// TODO: set 5.0 as a knob
|
||||
return (now() - this->lastAsSource.at(id)) * 5.0 < SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
|
||||
return (now() - this->lastAsSource.at(id)) * SERVER_KNOBS->READ_REBALANCE_SRC_PARALLELISM <
|
||||
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
@ -1545,18 +1545,19 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self,
|
|||
traceEvent->detail("SkipReason", "TeamTooSimilar");
|
||||
return false;
|
||||
}
|
||||
// TODO: set 10 as a knob
|
||||
// randomly choose topK shards
|
||||
int topK = std::min(int(0.1 * shards.size()), 10);
|
||||
int topK = std::min(int(0.1 * shards.size()), SERVER_KNOBS->READ_REBALANCE_SHARD_TOPK);
|
||||
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
|
||||
state GetTopKMetricsRequest req(shards, topK, (srcLoad - destLoad) / 10.0); // 1/(5 * 2)
|
||||
state GetTopKMetricsRequest req(
|
||||
shards, topK, (srcLoad - destLoad) / 2.0 / SERVER_KNOBS->READ_REBALANCE_SRC_PARALLELISM);
|
||||
req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) {
|
||||
return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) >
|
||||
b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
};
|
||||
state std::vector<StorageMetrics> metricsList = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req)));
|
||||
wait(ready(healthMetrics));
|
||||
if (getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()) < 25.0) { // 25%
|
||||
if (getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()) <
|
||||
SERVER_KNOBS->READ_REBALANCE_CPU_THRESHOLD) { // 15.0 +- (0.3 * 15) < 20.0
|
||||
traceEvent->detail("SkipReason", "LowReadLoad");
|
||||
return false;
|
||||
}
|
||||
|
@ -1710,7 +1711,8 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
state GetTeamRequest srcReq;
|
||||
state GetTeamRequest destReq;
|
||||
state TraceEvent traceEvent(eventName, self->distributorId);
|
||||
traceEvent.suppressFor(5.0)
|
||||
// FIXME: uncomment
|
||||
traceEvent // .suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
.detail("Rebalance", readRebalance ? "Read" : "Disk");
|
||||
|
||||
|
@ -1719,7 +1721,6 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
|
|||
}
|
||||
|
||||
try {
|
||||
// FIXME: change back to BG_REBALANCE_SWITCH_CHECK_INTERVAL after test
|
||||
delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
|
||||
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
@ -1838,7 +1839,10 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
|
|||
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId);
|
||||
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
|
||||
// FIXME: uncomment
|
||||
traceEvent // .suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
.detail("Rebalance", "Disk");
|
||||
|
||||
if (*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
|
@ -1961,7 +1965,10 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
|
|||
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
|
||||
state bool moved = false;
|
||||
state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId);
|
||||
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
|
||||
// FIXME: uncomment
|
||||
traceEvent //.suppressFor(5.0)
|
||||
.detail("PollingInterval", rebalancePollingInterval)
|
||||
.detail("Rebalance", "Disk");
|
||||
|
||||
if (*self->lastLimited > 0) {
|
||||
traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);
|
||||
|
|
Loading…
Reference in New Issue