Merge pull request #8774 from sfc-gh-xwang/feature/main/ppwLoadBalance
Make perpetual wiggle wait for byte load balance
This commit is contained in:
commit
a7a09d427a
|
@ -245,7 +245,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ALL_DATA_REMOVED_DELAY, 1.0 );
|
||||
init( INITIAL_FAILURE_REACTION_DELAY, 30.0 ); if( randomize && BUGGIFY ) INITIAL_FAILURE_REACTION_DELAY = 0.0;
|
||||
init( CHECK_TEAM_DELAY, 30.0 );
|
||||
init( PERPETUAL_WIGGLE_DELAY, 50.0 );
|
||||
// This is a safety knob to avoid busy spinning and the case a small cluster don't have enough space when excluding and including too fast. The basic idea is let PW wait for the re-included storage to take on data before wiggling the next one.
|
||||
// This knob's ideal value would vary by cluster based on its size and disk type. In the meanwhile, the wiggle will also wait until the storage load is almost (85%) balanced.
|
||||
init( PERPETUAL_WIGGLE_DELAY, 60 );
|
||||
init( PERPETUAL_WIGGLE_SMALL_LOAD_RATIO, 10 );
|
||||
init( PERPETUAL_WIGGLE_MIN_BYTES_BALANCE_RATIO, 0.85 );
|
||||
init( PERPETUAL_WIGGLE_DISABLE_REMOVER, true );
|
||||
init( LOG_ON_COMPLETION_DELAY, DD_QUEUE_LOGGING_INTERVAL );
|
||||
init( BEST_TEAM_MAX_TEAM_TRIES, 10 );
|
||||
|
|
|
@ -186,9 +186,14 @@ public:
|
|||
double METRIC_DELAY;
|
||||
double ALL_DATA_REMOVED_DELAY;
|
||||
double INITIAL_FAILURE_REACTION_DELAY;
|
||||
double CHECK_TEAM_DELAY;
|
||||
double PERPETUAL_WIGGLE_DELAY;
|
||||
bool PERPETUAL_WIGGLE_DISABLE_REMOVER;
|
||||
double CHECK_TEAM_DELAY; // Perpetual wiggle check cluster team healthy
|
||||
double PERPETUAL_WIGGLE_SMALL_LOAD_RATIO; // If the average load of storage server is less than this ratio * average
|
||||
// shard bytes, the perpetual wiggle won't consider the available space
|
||||
// load balance in the cluster
|
||||
double PERPETUAL_WIGGLE_MIN_BYTES_BALANCE_RATIO; // target min : average space load balance ratio after re-include
|
||||
// before perpetual wiggle will start the next wiggle
|
||||
double PERPETUAL_WIGGLE_DELAY; // The max interval between the last wiggle finish and the next wiggle start
|
||||
bool PERPETUAL_WIGGLE_DISABLE_REMOVER; // Whether the start of perpetual wiggle replace team remover
|
||||
double LOG_ON_COMPLETION_DELAY;
|
||||
int BEST_TEAM_MAX_TEAM_TRIES;
|
||||
int BEST_TEAM_OPTION_COUNT;
|
||||
|
|
|
@ -1957,6 +1957,40 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> perpetualStorageWiggleRest(DDTeamCollection* self) {
|
||||
state bool takeRest = true;
|
||||
state Promise<int64_t> avgShardBytes;
|
||||
while (takeRest) {
|
||||
// a minimal delay to avoid excluding and including SS too fast
|
||||
wait(delay(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY));
|
||||
|
||||
avgShardBytes.reset();
|
||||
self->getAverageShardBytes.send(avgShardBytes);
|
||||
int64_t avgBytes = wait(avgShardBytes.getFuture());
|
||||
double ratio = self->loadBytesBalanceRatio(avgBytes * SERVER_KNOBS->PERPETUAL_WIGGLE_SMALL_LOAD_RATIO);
|
||||
bool imbalance = ratio < SERVER_KNOBS->PERPETUAL_WIGGLE_MIN_BYTES_BALANCE_RATIO;
|
||||
CODE_PROBE(imbalance, "Perpetual Wiggle pause because cluster is imbalance.");
|
||||
|
||||
// there must not have other teams to place wiggled data
|
||||
takeRest = self->server_info.size() <= self->configuration.storageTeamSize ||
|
||||
self->machine_info.size() < self->configuration.storageTeamSize || imbalance;
|
||||
|
||||
// log the extra delay and change the wiggler state
|
||||
if (takeRest) {
|
||||
self->storageWiggler->setWiggleState(StorageWiggler::PAUSE);
|
||||
if (self->configuration.storageMigrationType == StorageMigrationType::GRADUAL) {
|
||||
TraceEvent(SevWarn, "PerpetualStorageWiggleSleep", self->distributorId)
|
||||
.suppressFor(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY * 4)
|
||||
.detail("BytesBalanceRatio", ratio)
|
||||
.detail("ServerSize", self->server_info.size())
|
||||
.detail("MachineSize", self->machine_info.size())
|
||||
.detail("StorageTeamSize", self->configuration.storageTeamSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> perpetualStorageWiggleIterator(DDTeamCollection* teamCollection,
|
||||
AsyncVar<bool>* stopSignal,
|
||||
FutureStream<Void> finishStorageWiggleSignal) {
|
||||
|
@ -1964,24 +1998,9 @@ public:
|
|||
choose {
|
||||
when(wait(stopSignal->onChange())) {}
|
||||
when(waitNext(finishStorageWiggleSignal)) {
|
||||
state bool takeRest = true; // delay to avoid delete and update ServerList too frequently
|
||||
while (takeRest) {
|
||||
wait(delayJittered(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY));
|
||||
// there must not have other teams to place wiggled data
|
||||
takeRest =
|
||||
teamCollection->server_info.size() <= teamCollection->configuration.storageTeamSize ||
|
||||
teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize;
|
||||
if (takeRest) {
|
||||
teamCollection->storageWiggler->setWiggleState(StorageWiggler::PAUSE);
|
||||
if (teamCollection->configuration.storageMigrationType == StorageMigrationType::GRADUAL) {
|
||||
TraceEvent(SevWarn, "PerpetualStorageWiggleSleep", teamCollection->distributorId)
|
||||
.suppressFor(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY * 4)
|
||||
.detail("ServerSize", teamCollection->server_info.size())
|
||||
.detail("MachineSize", teamCollection->machine_info.size())
|
||||
.detail("StorageTeamSize", teamCollection->configuration.storageTeamSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
// delay to avoid delete and update ServerList too frequently, which could result busy loop or over
|
||||
// utilize the disk of other active SS
|
||||
wait(perpetualStorageWiggleRest(teamCollection));
|
||||
wait(updateNextWigglingStorageID(teamCollection));
|
||||
}
|
||||
}
|
||||
|
@ -3394,6 +3413,39 @@ Future<Void> DDTeamCollection::removeBadTeams() {
|
|||
return DDTeamCollectionImpl::removeBadTeams(this);
|
||||
}
|
||||
|
||||
double DDTeamCollection::loadBytesBalanceRatio(int64_t smallLoadThreshold) const {
|
||||
double minLoadBytes = std::numeric_limits<double>::max();
|
||||
double totalLoadBytes = 0;
|
||||
int count = 0;
|
||||
for (auto& [id, s] : server_info) {
|
||||
// If a healthy SS don't have storage metrics, skip this round
|
||||
if (server_status.get(s->getId()).isUnhealthy() || !s->metricsPresent()) {
|
||||
TraceEvent(SevDebug, "LoadBytesBalanceRatioNoMetrics").detail("Server", id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
double load = s->loadBytes();
|
||||
totalLoadBytes += load;
|
||||
++count;
|
||||
minLoadBytes = std::min(minLoadBytes, load);
|
||||
}
|
||||
|
||||
TraceEvent(SevDebug, "LoadBytesBalanceRatioMetrics")
|
||||
.detail("TotalLoad", totalLoadBytes)
|
||||
.detail("MinLoadBytes", minLoadBytes)
|
||||
.detail("SmallLoadThreshold", smallLoadThreshold)
|
||||
.detail("Count", count);
|
||||
|
||||
// avoid division-by-zero
|
||||
double avgLoad = totalLoadBytes / count;
|
||||
if (totalLoadBytes == 0 || avgLoad < smallLoadThreshold) {
|
||||
CODE_PROBE(true, "The cluster load is small enough to ignore load bytes balance.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
return minLoadBytes / avgLoad;
|
||||
}
|
||||
|
||||
Future<Void> DDTeamCollection::storageServerFailureTracker(TCServerInfo* server,
|
||||
ServerStatus* status,
|
||||
Version addedVersion) {
|
||||
|
@ -3542,7 +3594,8 @@ DDTeamCollection::DDTeamCollection(DDTeamCollectionInitParams const& params)
|
|||
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), healthyTeamCount(0),
|
||||
zeroHealthyTeams(params.zeroHealthyTeams), optimalTeamCount(0), zeroOptimalTeams(true), isTssRecruiting(false),
|
||||
includedDCs(params.includedDCs), otherTrackedDCs(params.otherTrackedDCs),
|
||||
processingUnhealthy(params.processingUnhealthy), readyToStart(params.readyToStart),
|
||||
processingUnhealthy(params.processingUnhealthy), getAverageShardBytes(params.getAverageShardBytes),
|
||||
readyToStart(params.readyToStart),
|
||||
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), badTeamRemover(Void()),
|
||||
checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), clearHealthyZoneFuture(true),
|
||||
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
|
||||
|
@ -5104,7 +5157,8 @@ public:
|
|||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>() }));
|
||||
PromiseStream<Promise<int>>(),
|
||||
PromiseStream<Promise<int64_t>>() }));
|
||||
|
||||
for (int id = 1; id <= processCount; ++id) {
|
||||
UID uid(id, 0);
|
||||
|
@ -5148,7 +5202,8 @@ public:
|
|||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>() }));
|
||||
PromiseStream<Promise<int>>(),
|
||||
PromiseStream<Promise<int64_t>>() }));
|
||||
|
||||
for (int id = 1; id <= processCount; id++) {
|
||||
UID uid(id, 0);
|
||||
|
|
|
@ -723,7 +723,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
processingWiggle,
|
||||
getShardMetrics,
|
||||
removeFailedServer,
|
||||
getUnhealthyRelocationCount });
|
||||
getUnhealthyRelocationCount,
|
||||
getAverageShardBytes });
|
||||
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
|
||||
auto recruitStorage = IAsyncListener<RequestStream<RecruitStorageRequest>>::create(
|
||||
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
|
||||
|
@ -744,7 +745,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
processingWiggle,
|
||||
getShardMetrics,
|
||||
removeFailedServer,
|
||||
getUnhealthyRelocationCount });
|
||||
getUnhealthyRelocationCount,
|
||||
getAverageShardBytes });
|
||||
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
|
||||
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
|
||||
actors.push_back(reportErrorsExcept(DDTeamCollection::run(remoteTeamCollection,
|
||||
|
|
|
@ -198,6 +198,7 @@ struct DDTeamCollectionInitParams {
|
|||
PromiseStream<GetMetricsRequest> getShardMetrics;
|
||||
Promise<UID> removeFailedServer;
|
||||
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
|
||||
PromiseStream<Promise<int64_t>> getAverageShardBytes;
|
||||
};
|
||||
|
||||
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
|
||||
|
@ -235,6 +236,7 @@ protected:
|
|||
Reference<AsyncVar<bool>> pauseWiggle;
|
||||
Reference<AsyncVar<bool>> processingWiggle; // track whether wiggling relocation is being processed
|
||||
PromiseStream<StorageWiggleValue> nextWiggleInfo;
|
||||
PromiseStream<Promise<int64_t>> getAverageShardBytes;
|
||||
|
||||
std::vector<Reference<TCTeamInfo>> badTeams;
|
||||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
|
||||
|
@ -463,6 +465,10 @@ protected:
|
|||
|
||||
Future<Void> waitForAllDataRemoved(UID serverID, Version addedVersion) const;
|
||||
|
||||
// calculate minLoadBytes / avgLoadBytes among servers. An unhealthy server's load is considered as 0. If the
|
||||
// average load of each storage server is less than smallLoadThreshold, return 1 always.
|
||||
double loadBytesBalanceRatio(int64_t smallLoadThreshold) const;
|
||||
|
||||
// Create a transaction updating `perpetualStorageWiggleIDPrefix` to the next serverID according to a sorted
|
||||
// wiggle_pq maintained by the wiggler.
|
||||
Future<Void> updateNextWigglingStorageID();
|
||||
|
|
|
@ -208,7 +208,8 @@ struct PerpetualWiggleStatsWorkload : public TestWorkload {
|
|||
makeReference<AsyncVar<bool>>(false),
|
||||
PromiseStream<GetMetricsRequest>(),
|
||||
Promise<UID>(),
|
||||
PromiseStream<Promise<int>>() });
|
||||
PromiseStream<Promise<int>>(),
|
||||
PromiseStream<Promise<int64_t>>() });
|
||||
tester.configuration.storageTeamSize = 3;
|
||||
tester.configuration.perpetualStorageWiggleSpeed = 1;
|
||||
|
||||
|
|
Loading…
Reference in New Issue