diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index c0cc5c79ca..05b3e5dc95 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -159,6 +159,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( PRIORITY_SPLIT_SHARD, 950 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350; // Data distribution + init( READ_REBALANCE_CPU_THRESHOLD, 15.0 ); + init( READ_REBALANCE_SRC_PARALLELISM, 5 ); + init( READ_REBALANCE_SHARD_TOPK, 10 ); init( RETRY_RELOCATESHARD_DELAY, 0.1 ); init( DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 60.0 ); if( randomize && BUGGIFY ) DATA_DISTRIBUTION_FAILURE_REACTION_TIME = 1.0; bool buggifySmallShards = randomize && BUGGIFY; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index ec7f9ab620..a976b5041d 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -161,6 +161,10 @@ public: int PRIORITY_SPLIT_SHARD; // Data distribution + double READ_REBALANCE_CPU_THRESHOLD; // read rebalance only happens if the source servers' CPU > threshold + int READ_REBALANCE_SRC_PARALLELISM; // the max count a server become a source server within a certain interval + int READ_REBALANCE_SHARD_TOPK; // top k shards were return for random selection in read rebalance + double RETRY_RELOCATESHARD_DELAY; double DATA_DISTRIBUTION_FAILURE_REACTION_TIME; int MIN_SHARD_BYTES, SHARD_BYTES_RATIO, SHARD_BYTES_PER_SQRT_BYTES, MAX_SHARD_BYTES, KEY_SERVER_SHARD_BYTES; diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 9cc1780bf2..5caef85278 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1083,8 +1083,8 @@ struct DDQueueData { bool timeThrottle(const std::vector<UID>& ids) const { return std::any_of(ids.begin(), ids.end(), [this](const UID& id) { if (this->lastAsSource.count(id)) { - // TODO: set 5.0 as a knob - return (now() - this->lastAsSource.at(id)) * 5.0 < SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL; + return (now() - this->lastAsSource.at(id)) * SERVER_KNOBS->READ_REBALANCE_SRC_PARALLELISM < + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL; } return false; }); @@ -1545,18 +1545,19 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueueData* self, traceEvent->detail("SkipReason", "TeamTooSimilar"); return false; } - // TODO: set 10 as a knob // randomly choose topK shards - int topK = std::min(int(0.1 * shards.size()), 10); + int topK = std::min(int(0.1 * shards.size()), SERVER_KNOBS->READ_REBALANCE_SHARD_TOPK); state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true); - state GetTopKMetricsRequest req(shards, topK, (srcLoad - destLoad) / 10.0); // 1/(5 * 2) + state GetTopKMetricsRequest req( + shards, topK, (srcLoad - destLoad) / 2.0 / SERVER_KNOBS->READ_REBALANCE_SRC_PARALLELISM); req.comparator = [](const StorageMetrics& a, const StorageMetrics& b) { return a.bytesReadPerKSecond / std::max(a.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES) > b.bytesReadPerKSecond / std::max(b.bytes * 1.0, 1.0 * SERVER_KNOBS->MIN_SHARD_BYTES); }; state std::vector<StorageMetrics> metricsList = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req))); wait(ready(healthMetrics)); - if (getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()) < 25.0) { // 25% + if (getWorstCpu(healthMetrics.get(), sourceTeam->getServerIDs()) < + SERVER_KNOBS->READ_REBALANCE_CPU_THRESHOLD) { // 15.0 +- (0.3 * 15) < 20.0 traceEvent->detail("SkipReason", "LowReadLoad"); return false; } @@ -1710,7 +1711,8 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex, state GetTeamRequest srcReq; state GetTeamRequest destReq; state TraceEvent traceEvent(eventName, self->distributorId); - traceEvent.suppressFor(5.0) + // FIXME: uncomment + traceEvent // .suppressFor(5.0) .detail("PollingInterval", rebalancePollingInterval) .detail("Rebalance", readRebalance ? "Read" : "Disk"); @@ -1719,7 +1721,6 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex, } try { - // FIXME: change back to BG_REBALANCE_SWITCH_CHECK_INTERVAL after test delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch); if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) { tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -1838,7 +1839,10 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam; state bool moved = false; state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId); - traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk"); + // FIXME: uncomment + traceEvent // .suppressFor(5.0) + .detail("PollingInterval", rebalancePollingInterval) + .detail("Rebalance", "Disk"); if (*self->lastLimited > 0) { traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited); @@ -1961,7 +1965,10 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex) state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam; state bool moved = false; state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId); - traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk"); + // FIXME: uncomment + traceEvent //.suppressFor(5.0) + .detail("PollingInterval", rebalancePollingInterval) + .detail("Rebalance", "Disk"); if (*self->lastLimited > 0) { traceEvent.detail("SecondsSinceLastLimited", now() - *self->lastLimited);