Write-Traffic-Aware Load Balancer (#11189)

* write traffic aware load balancer

* address comments

* address comments
This commit is contained in:
Zhe Wang 2024-02-11 15:05:41 -08:00 committed by GitHub
parent 9d8d52cbb7
commit b699a7e4f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 72 additions and 39 deletions

View File

@ -346,8 +346,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ENFORCE_SHARD_COUNT_PER_TEAM, false ); if( randomize && BUGGIFY ) ENFORCE_SHARD_COUNT_PER_TEAM = true;
init( DESIRED_MAX_SHARDS_PER_TEAM, 1000 ); if( randomize && BUGGIFY ) DESIRED_MAX_SHARDS_PER_TEAM = 10;
init( ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION, false ); if( randomize && BUGGIFY ) ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION = true;
init( DD_TARGET_STORAGE_QUEUE_SIZE, TARGET_BYTES_PER_STORAGE_SERVER*0.35 ); if( randomize && BUGGIFY ) DD_TARGET_STORAGE_QUEUE_SIZE = TARGET_BYTES_PER_STORAGE_SERVER*0.035;
init( ENABLE_REBALANCE_STORAGE_QUEUE, false ); if( randomize && BUGGIFY ) ENABLE_REBALANCE_STORAGE_QUEUE = true;
init( DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE, 0.5 ); if( randomize && BUGGIFY ) DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE = deterministicRandom()->random01();
init( ENABLE_REBALANCE_STORAGE_QUEUE, false ); if( randomize && BUGGIFY ) ENABLE_REBALANCE_STORAGE_QUEUE = true;
init( REBALANCE_STORAGE_QUEUE_LONG_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.15); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_LONG_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.05;
init( REBALANCE_STORAGE_QUEUE_SHORT_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.05); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_SHORT_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.025;
init( DD_LONG_STORAGE_QUEUE_TIMESPAN, 60.0 ); if( isSimulated ) DD_LONG_STORAGE_QUEUE_TIMESPAN = deterministicRandom()->random01() * 10 + 1;

View File

@ -244,7 +244,7 @@ public:
// When the sampled read operations changes more than this threshold, the
// shard metrics will update immediately
int64_t SHARD_READ_OPS_CHANGE_THRESHOLD;
bool ENABLE_WRITE_BASED_SHARD_SPLIT; // experimental
bool ENABLE_WRITE_BASED_SHARD_SPLIT; // Experimental. Enable to enforce shard split when write traffic is high
double SHARD_MAX_READ_DENSITY_RATIO;
int64_t SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS;
@ -327,9 +327,12 @@ public:
// storage bytes used by a tenant group
int CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL; // How often the commit proxies send requests to the data
// distributor to fetch the list of tenants over storage quota
bool ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION; // experimental!
int64_t DD_TARGET_STORAGE_QUEUE_SIZE;
bool ENABLE_REBALANCE_STORAGE_QUEUE; // experimental!
bool ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION; // Experimental! Enable to avoid moving data to a team which has a
// long storage queue
double DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE; // p% amount teams which have longer queues (team queue size
// = max SSes queue size)
bool ENABLE_REBALANCE_STORAGE_QUEUE; // Experimental! Enable to trigger data moves to rebalance storage queues when
// a queue is significantly longer than others
int64_t REBALANCE_STORAGE_QUEUE_LONG_BYTES; // Lower bound of length indicating the storage queue is too long
int64_t REBALANCE_STORAGE_QUEUE_SHORT_BYTES; // Upper bound of length indicating the storage queue is back to short
double DD_LONG_STORAGE_QUEUE_TIMESPAN;

View File

@ -195,13 +195,37 @@ public:
req.reply.send(std::make_pair(res, false));
}
// Return a threshold of team queue size which guarantees at least DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE
// portion of teams that have longer storage queues
// A team storage queue size is defined as the longest storage queue size among all SSes of the team
static int64_t calculateTeamStorageQueueThreshold(const std::vector<Reference<TCTeamInfo>>& teams) {
std::vector<int64_t> queueLengthList;
for (const auto& team : teams) {
Optional<int64_t> storageQueueSize = team->getLongestStorageQueueSize();
if (!storageQueueSize.present()) {
// This team may have an unhealthy SS, so avoid selecting it
queueLengthList.push_back(std::numeric_limits<int64_t>::max());
} else {
queueLengthList.push_back(storageQueueSize.get());
}
}
double percentile = std::max(0.0, std::min(SERVER_KNOBS->DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE, 1.0));
int position = queueLengthList.size() * (1 - percentile);
std::nth_element(queueLengthList.begin(), queueLengthList.begin() + position, queueLengthList.end());
int64_t threshold = queueLengthList[position];
TraceEvent(SevInfo, "StorageQueueAwareGotThreshold").suppressFor(5.0).detail("Threshold", threshold);
return threshold;
}
// Returns the overall best team that matches the requirement from `req`. When preferWithinShardLimit is true, it
// also tries to select a team whose existing shard is less than SERVER_KNOBS->DESIRED_MAX_SHARDS_PER_TEAM.
static Optional<Reference<IDataDistributionTeam>> getBestTeam(DDTeamCollection* self,
const GetTeamRequest& req,
bool preferWithinShardLimit,
int& numSkippedSSFailedGetQueueLength,
int& numSkippedSSQueueTooLong) {
int& numSkippedSSQueueTooLong,
Optional<int64_t> storageQueueThreshold) {
ASSERT(!req.storageQueueAware || storageQueueThreshold.present());
auto& startIndex = req.preferLowerDiskUtil ? self->lowestUtilizationTeam : self->highestUtilizationTeam;
if (startIndex >= self->teams.size()) {
startIndex = 0;
@ -224,10 +248,10 @@ public:
Optional<int64_t> storageQueueSize = self->teams[currentIndex]->getLongestStorageQueueSize();
if (!storageQueueSize.present()) {
numSkippedSSFailedGetQueueLength++;
continue; // this SS may not healthy, skip
} else if (storageQueueSize.get() > SERVER_KNOBS->DD_TARGET_STORAGE_QUEUE_SIZE) {
continue; // this team may have an unhealthy SS, skip
} else if (storageQueueSize.get() > storageQueueThreshold.get()) {
numSkippedSSQueueTooLong++;
continue; // this SS storage queue is too long, skip
continue; // this team has a SS with a too long storage queue, skip
}
}
@ -263,6 +287,7 @@ public:
// Returns the best team from `candidates` that matches the requirement from `req`. When preferWithinShardLimit is
// true, it also tries to select a team whose existing team is less than SERVER_KNOBS->DESIRED_MAX_SHARDS_PER_TEAM.
// Do not check storage queue size since getTeam has checked the size when selecting the input candidates
static Optional<Reference<IDataDistributionTeam>> getBestTeamFromCandidates(
DDTeamCollection* self,
const GetTeamRequest& req,
@ -289,17 +314,6 @@ public:
continue;
}
if (req.storageQueueAware) {
Optional<int64_t> storageQueueSize = candidates[i]->getLongestStorageQueueSize();
if (!storageQueueSize.present()) {
numSkippedSSFailedGetQueueLength++;
continue; // this SS may not healthy, skip
} else if (storageQueueSize.get() > SERVER_KNOBS->DD_TARGET_STORAGE_QUEUE_SIZE) {
numSkippedSSQueueTooLong++;
continue; // this SS storage queue is too long, skip
}
}
bestLoadBytes = loadBytes;
bestOption = candidates[i];
wigglingBestOption = candidates[i]->hasWigglePausedServer();
@ -411,6 +425,10 @@ public:
}
}
Optional<int64_t> storageQueueThreshold;
if (req.storageQueueAware) {
storageQueueThreshold = calculateTeamStorageQueueThreshold(self->teams);
}
if (req.teamSelect == TeamSelect::WANT_TRUE_BEST) {
ASSERT(!bestOption.present());
if (SERVER_KNOBS->ENFORCE_SHARD_COUNT_PER_TEAM && req.preferWithinShardLimit) {
@ -418,7 +436,8 @@ public:
req,
/*preferWithinShardLimit=*/true,
numSkippedSSFailedGetQueueLength,
numSkippedSSQueueTooLong);
numSkippedSSQueueTooLong,
storageQueueThreshold);
if (!bestOption.present()) {
// In case, we may return a team whose shard count is more than DESIRED_MAX_SHARDS_PER_TEAM.
TraceEvent("GetBestTeamPreferWithinShardLimitFailed").log();
@ -429,7 +448,8 @@ public:
req,
/*preferWithinShardLimit=*/false,
numSkippedSSFailedGetQueueLength,
numSkippedSSQueueTooLong);
numSkippedSSQueueTooLong,
storageQueueThreshold);
}
} else {
ASSERT(!bestOption.present());
@ -463,10 +483,10 @@ public:
Optional<int64_t> storageQueueSize = dest->getLongestStorageQueueSize();
if (!storageQueueSize.present()) {
numSkippedSSFailedGetQueueLength++;
ok = false; // this SS may not healthy, skip
} else if (storageQueueSize.get() > SERVER_KNOBS->DD_TARGET_STORAGE_QUEUE_SIZE) {
ok = false; // this team may have an unhealthy SS, skip
} else if (storageQueueSize.get() > storageQueueThreshold.get()) {
numSkippedSSQueueTooLong++;
ok = false; // this SS storage queue is too long, skip
ok = false; // this team has a SS with a too long storage queue, skip
}
}
@ -533,6 +553,8 @@ public:
if (req.storageQueueAware && !bestOption.present()) {
req.storageQueueAware = false;
TraceEvent(SevWarn, "StorageQueueAwareGetTeamFailed", self->distributorId)
.detail("Reason", "bestOption not present");
wait(getTeam(self, req)); // re-run getTeam without storageQueueAware
} else {
req.reply.send(std::make_pair(bestOption, foundSrc));
@ -1554,17 +1576,26 @@ public:
when(wait(server->ssVersionTooFarBehind.onChange())) {}
when(wait(self->disableFailingLaggingServers.onChange())) {}
when(wait(server->longStorageQueue.onChange())) {
TraceEvent(SevInfo, "TriggerStorageQueueRebalance", self->distributorId)
.detail("SSID", server->getId());
std::vector<ShardsAffectedByTeamFailure::Team> teams;
for (const auto& team : server->getTeams()) {
std::vector<UID> servers;
for (const auto& server : team->getServers()) {
servers.push_back(server->getId());
int64_t threshold = calculateTeamStorageQueueThreshold(self->teams);
// threshold represents the queue length of majority teams
// team queue length is defined as the max queue size among all SSes of the team
if (server->longStorageQueue.get() < threshold) {
TraceEvent(SevInfo, "TriggerStorageQueueRebalanceIgnored", self->distributorId)
.detail("SSID", server->getId());
} else {
TraceEvent(SevInfo, "TriggerStorageQueueRebalance", self->distributorId)
.detail("SSID", server->getId());
std::vector<ShardsAffectedByTeamFailure::Team> teams;
for (const auto& team : server->getTeams()) {
std::vector<UID> servers;
for (const auto& server : team->getServers()) {
servers.push_back(server->getId());
}
teams.push_back(ShardsAffectedByTeamFailure::Team(servers, self->primary));
}
teams.push_back(ShardsAffectedByTeamFailure::Team(servers, self->primary));
self->triggerStorageQueueRebalance.send(
ServerTeamInfo(server->getId(), teams, self->primary));
}
self->triggerStorageQueueRebalance.send(ServerTeamInfo(server->getId(), teams, self->primary));
}
}

View File

@ -311,8 +311,7 @@ void StorageServerMetrics::splitMetrics(SplitMetricsRequest req) const {
lastKey,
key,
hasUsed);
if (used.bytes < minSplitBytes && (!SERVER_KNOBS->ENABLE_WRITE_BASED_SHARD_SPLIT ||
remaining.bytesWrittenPerKSecond < minSplitWriteTraffic))
if (used.bytes < minSplitBytes)
key = std::max(
key, byteSample.splitEstimate(KeyRangeRef(lastKey, req.keys.end), minSplitBytes - used.bytes));
key = getSplitKey(remaining.iosPerKSecond,

View File

@ -103,7 +103,7 @@ public:
if (!server->lastTimeNotifyLongStorageQueue.present() ||
currentTime - server->lastTimeNotifyLongStorageQueue.get() >
SERVER_KNOBS->DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL) {
server->longStorageQueue.trigger(); // will trigger data move for rebalancing storage queue
server->longStorageQueue.set(queueSize); // will trigger data move for rebalancing storage queue
TraceEvent(SevInfo, "SSTrackerTriggerLongStorageQueue", server->getId())
.detail("CurrentQueueSize", queueSize);
server->lastTimeNotifyLongStorageQueue = currentTime;

View File

@ -80,7 +80,7 @@ public:
Promise<Void> updated;
AsyncVar<bool> wrongStoreTypeToRemove;
AsyncVar<bool> ssVersionTooFarBehind;
AsyncVar<Void> longStorageQueue; // set when the storage queue remains too long for a while
AsyncVar<int64_t> longStorageQueue; // set when the storage queue remains too long for a while
TCServerInfo(StorageServerInterface ssi,
DDTeamCollection* collection,