Rebalance Storage Queue (Cherrypick from release-7.1) (#11172)

* cherry-pick-storage-queue-rebalance

* address comments

* address comments
This commit is contained in:
Zhe Wang 2024-02-07 13:14:08 -08:00 committed by GitHub
parent 4d027a05e9
commit 09444c7657
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 178 additions and 16 deletions

View File

@ -156,6 +156,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM, 121 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 122 );
init( PRIORITY_REBALANCE_READ_OVERUTIL_TEAM, 123 );
init( PRIORITY_REBALANCE_STORAGE_QUEUE, 124 );
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 141 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
@ -345,7 +346,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ENFORCE_SHARD_COUNT_PER_TEAM, false ); if( randomize && BUGGIFY ) ENFORCE_SHARD_COUNT_PER_TEAM = true;
init( DESIRED_MAX_SHARDS_PER_TEAM, 1000 ); if( randomize && BUGGIFY ) DESIRED_MAX_SHARDS_PER_TEAM = 10;
init( ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION, false ); if( randomize && BUGGIFY ) ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION = true;
init( DD_TARGET_STORAGE_QUEUE_SIZE, TARGET_BYTES_PER_STORAGE_SERVER/3 ); if( randomize && BUGGIFY ) DD_TARGET_STORAGE_QUEUE_SIZE = TARGET_BYTES_PER_STORAGE_SERVER/10;
init( DD_TARGET_STORAGE_QUEUE_SIZE, TARGET_BYTES_PER_STORAGE_SERVER*0.35 ); if( randomize && BUGGIFY ) DD_TARGET_STORAGE_QUEUE_SIZE = TARGET_BYTES_PER_STORAGE_SERVER*0.035;
init( ENABLE_REBALANCE_STORAGE_QUEUE, false ); if( randomize && BUGGIFY ) ENABLE_REBALANCE_STORAGE_QUEUE = true;
init( REBALANCE_STORAGE_QUEUE_LONG_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.15); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_LONG_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.05;
init( REBALANCE_STORAGE_QUEUE_SHORT_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.05); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_SHORT_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.025;
init( DD_LONG_STORAGE_QUEUE_TIMESPAN, 60.0 ); if( isSimulated ) DD_LONG_STORAGE_QUEUE_TIMESPAN = deterministicRandom()->random01() * 10 + 1;
init( DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL, 30.0 ); if( isSimulated ) DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL = 5.0;
init( REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN, SHARD_MIN_BYTES_PER_KSEC);
init( DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD, true ); if ( isSimulated ) DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD = deterministicRandom()->coinflip();
// Large teams are disabled when SHARD_ENCODE_LOCATION_METADATA is enabled
init( DD_MAX_SHARDS_ON_LARGE_TEAMS, 100 ); if( randomize && BUGGIFY ) DD_MAX_SHARDS_ON_LARGE_TEAMS = deterministicRandom()->randomInt(0, 3);

View File

@ -159,6 +159,8 @@ public:
int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM;
// A load-balance priority read mountain chopper
int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM;
// A load-balance priority storage queue too long
int PRIORITY_REBALANCE_STORAGE_QUEUE;
// A team healthy priority for wiggle a storage server
int PRIORITY_PERPETUAL_STORAGE_WIGGLE;
// A team healthy priority when all servers in a team are healthy. When a team changes from any unhealthy states to
@ -327,6 +329,14 @@ public:
// distributor to fetch the list of tenants over storage quota
bool ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION; // experimental!
int64_t DD_TARGET_STORAGE_QUEUE_SIZE;
bool ENABLE_REBALANCE_STORAGE_QUEUE; // experimental!
int64_t REBALANCE_STORAGE_QUEUE_LONG_BYTES; // Lower bound of length indicating the storage queue is too long
int64_t REBALANCE_STORAGE_QUEUE_SHORT_BYTES; // Upper bound of length indicating the storage queue is back to short
double DD_LONG_STORAGE_QUEUE_TIMESPAN;
double DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL;
int64_t REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN;
bool DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD; // Enable to allow storage queue rebalancer to move
// light-traffic shards out of the overloading server
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -65,9 +65,10 @@ enum class DataMovementReason : uint8_t {
TEAM_0_LEFT = 16,
SPLIT_SHARD = 17,
ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD = 18,
ASSIGN_EMPTY_RANGE = 19, // dummy reason, no corresponding data move priority
SEED_SHARD_SERVER = 20, // dummy reason, no corresponding data move priority
NUMBER_OF_REASONS = 21, // dummy reason, no corresponding data move priority
REBALANCE_STORAGE_QUEUE = 19,
ASSIGN_EMPTY_RANGE = 20, // dummy reason, no corresponding data move priority
SEED_SHARD_SERVER = 21, // dummy reason, no corresponding data move priority
NUMBER_OF_REASONS = 22, // dummy reason, no corresponding data move priority
};
// SystemKey is just a Key but with a special type so that instances of it can be found easily throughput the code base

View File

@ -95,6 +95,7 @@ std::pair<const DmReasonPriorityMapping*, const PriorityDmReasonMapping*> buildP
{ DataMovementReason::SPLIT_SHARD, SERVER_KNOBS->PRIORITY_SPLIT_SHARD },
{ DataMovementReason::ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD,
SERVER_KNOBS->PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD },
{ DataMovementReason::REBALANCE_STORAGE_QUEUE, SERVER_KNOBS->PRIORITY_REBALANCE_STORAGE_QUEUE },
{ DataMovementReason::ASSIGN_EMPTY_RANGE, -2 }, // dummy reason, no corresponding actual data move
{ DataMovementReason::SEED_SHARD_SERVER, -3 }, // dummy reason, no corresponding actual data move
{ DataMovementReason::NUMBER_OF_REASONS, -4 }, // dummy reason, no corresponding actual data move
@ -147,7 +148,8 @@ RelocateData::RelocateData(RelocateShard const& rs)
dataMoveId(rs.dataMoveId), workFactor(0),
wantsNewServers(isDataMovementForMountainChopper(rs.moveReason) || isDataMovementForValleyFiller(rs.moveReason) ||
rs.moveReason == DataMovementReason::SPLIT_SHARD ||
rs.moveReason == DataMovementReason::TEAM_REDUNDANT),
rs.moveReason == DataMovementReason::TEAM_REDUNDANT ||
rs.moveReason == DataMovementReason::REBALANCE_STORAGE_QUEUE),
cancellable(true), interval("QueuedRelocation", randomId), dataMove(rs.dataMove) {
if (dataMove != nullptr) {
this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end());

View File

@ -1524,6 +1524,46 @@ struct DataDistributionTrackerImpl {
when(GetMetricsListRequest req = waitNext(self->getShardMetricsList)) {
self->actors.add(fetchShardMetricsList(self, req));
}
when(ServerTeamInfo req = waitNext(self->triggerStorageQueueRebalance)) {
TraceEvent e("TriggerDataMoveStorageQueueRebalance", self->distributorId);
e.detail("Server", req.serverId);
e.detail("Teams", req.teams.size());
int64_t maxShardWriteTraffic = 0;
KeyRange shardToMove;
ShardsAffectedByTeamFailure::Team selectedTeam;
for (const auto& team : req.teams) {
for (auto const& shard : self->shardsAffectedByTeamFailure->getShardsFor(team)) {
for (auto it : self->shards->intersectingRanges(shard)) {
if (it->value().stats->get().present()) {
int64_t shardWriteTraffic =
it->value().stats->get().get().metrics.bytesWrittenPerKSecond;
if (shardWriteTraffic > maxShardWriteTraffic &&
(SERVER_KNOBS->DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD ||
shardWriteTraffic >
SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN)) {
shardToMove = it->range();
maxShardWriteTraffic = shardWriteTraffic;
}
}
}
}
}
if (!shardToMove.empty()) {
e.detail("TeamSelected", selectedTeam.servers);
e.detail("ShardSelected", shardToMove);
e.detail("ShardWriteBytesPerKSec", maxShardWriteTraffic);
RelocateShard rs(shardToMove,
SERVER_KNOBS->PRIORITY_REBALANCE_STORAGE_QUEUE,
RelocateReason::REBALANCE_WRITE);
self->output.send(rs);
TraceEvent("SendRelocateToDDQueue", self->distributorId)
.detail("ServerPrimary", req.primary)
.detail("ServerTeam", selectedTeam.servers)
.detail("KeyBegin", rs.keys.begin)
.detail("KeyEnd", rs.keys.end)
.detail("Priority", rs.priority);
}
}
when(wait(self->actors.getResult())) {}
when(TenantCacheTenantCreated newTenant = waitNext(tenantCreationSignal.getFuture())) {
self->actors.add(tenantCreationHandling(self, newTenant));
@ -1544,11 +1584,13 @@ Future<Void> DataDistributionTracker::run(Reference<DataDistributionTracker> sel
const FutureStream<GetMetricsRequest>& getShardMetrics,
const FutureStream<GetTopKMetricsRequest>& getTopKMetrics,
const FutureStream<GetMetricsListRequest>& getShardMetricsList,
const FutureStream<Promise<int64_t>>& getAverageShardBytes) {
const FutureStream<Promise<int64_t>>& getAverageShardBytes,
const FutureStream<ServerTeamInfo>& triggerStorageQueueRebalance) {
self->getShardMetrics = getShardMetrics;
self->getTopKMetrics = getTopKMetrics;
self->getShardMetricsList = getShardMetricsList;
self->averageShardBytes = getAverageShardBytes;
self->triggerStorageQueueRebalance = triggerStorageQueueRebalance;
self->userRangeConfig = initData->userRangeConfig;
return holdWhile(self, DataDistributionTrackerImpl::run(self.getPtr(), initData));
}

View File

@ -1553,6 +1553,19 @@ public:
when(wait(storageMetadataTracker)) {}
when(wait(server->ssVersionTooFarBehind.onChange())) {}
when(wait(self->disableFailingLaggingServers.onChange())) {}
when(wait(server->longStorageQueue.onChange())) {
TraceEvent(SevInfo, "TriggerStorageQueueRebalance", self->distributorId)
.detail("SSID", server->getId());
std::vector<ShardsAffectedByTeamFailure::Team> teams;
for (const auto& team : server->getTeams()) {
std::vector<UID> servers;
for (const auto& server : team->getServers()) {
servers.push_back(server->getId());
}
teams.push_back(ShardsAffectedByTeamFailure::Team(servers, self->primary));
}
self->triggerStorageQueueRebalance.send(ServerTeamInfo(server->getId(), teams, self->primary));
}
}
if (recordTeamCollectionInfo) {
@ -4175,7 +4188,7 @@ DDTeamCollection::DDTeamCollection(DDTeamCollectionInitParams const& params)
zeroHealthyTeams(params.zeroHealthyTeams), optimalTeamCount(0), zeroOptimalTeams(true), isTssRecruiting(false),
includedDCs(params.includedDCs), otherTrackedDCs(params.otherTrackedDCs),
processingUnhealthy(params.processingUnhealthy), getAverageShardBytes(params.getAverageShardBytes),
readyToStart(params.readyToStart),
triggerStorageQueueRebalance(params.triggerStorageQueueRebalance), readyToStart(params.readyToStart),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), badTeamRemover(Void()),
checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), clearHealthyZoneFuture(true),
lowestUtilizationTeam(0), highestUtilizationTeam(0), getShardMetrics(params.getShardMetrics),
@ -5951,7 +5964,8 @@ public:
PromiseStream<GetMetricsRequest>(),
Promise<UID>(),
PromiseStream<Promise<int>>(),
PromiseStream<Promise<int64_t>>() }));
PromiseStream<Promise<int64_t>>(),
PromiseStream<ServerTeamInfo>() }));
for (int id = 1; id <= processCount; ++id) {
UID uid(id, 0);
@ -6004,7 +6018,8 @@ public:
PromiseStream<GetMetricsRequest>(),
Promise<UID>(),
PromiseStream<Promise<int>>(),
PromiseStream<Promise<int64_t>>() }));
PromiseStream<Promise<int64_t>>(),
PromiseStream<ServerTeamInfo>() }));
for (int id = 1; id <= processCount; id++) {
UID uid(id, 0);

View File

@ -1033,6 +1033,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
ASSERT(self->configuration.storageTeamSize > 0);
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<ServerTeamInfo> triggerStorageQueueRebalance;
state PromiseStream<Promise<int>> getUnhealthyRelocationCount;
state PromiseStream<GetMetricsRequest> getShardMetrics;
state PromiseStream<GetTopKMetricsRequest> getTopKShardMetrics;
@ -1087,7 +1088,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getShardMetrics.getFuture(),
getTopKShardMetrics.getFuture(),
getShardMetricsList.getFuture(),
getAverageShardBytes.getFuture()),
getAverageShardBytes.getFuture(),
triggerStorageQueueRebalance.getFuture()),
"DDTracker",
self->ddId,
&normalDDQueueErrors()));
@ -1150,7 +1152,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getShardMetrics,
removeFailedServer,
getUnhealthyRelocationCount,
getAverageShardBytes });
getAverageShardBytes,
triggerStorageQueueRebalance });
teamCollectionsPtrs.push_back(self->context->primaryTeamCollection.getPtr());
Reference<IAsyncListener<RequestStream<RecruitStorageRequest>>> recruitStorage;
if (!isMocked) {
@ -1175,7 +1178,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getShardMetrics,
removeFailedServer,
getUnhealthyRelocationCount,
getAverageShardBytes });
getAverageShardBytes,
triggerStorageQueueRebalance });
teamCollectionsPtrs.push_back(self->context->remoteTeamCollection.getPtr());
self->context->remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(reportErrorsExcept(DDTeamCollection::run(self->context->remoteTeamCollection,

View File

@ -92,6 +92,25 @@ public:
server->collection->removeLaggingStorageServer(server->lastKnownInterface.locality.zoneId().get());
}
}
// Detect any storage server with a too long storage queue and notify team tracker
// with a minimal interval
if (SERVER_KNOBS->ENABLE_REBALANCE_STORAGE_QUEUE) {
int64_t queueSize = server->getStorageQueueSize();
bool storageQueueKeepTooLong = server->updateAndGetStorageQueueTooLong(queueSize);
double currentTime = now();
if (storageQueueKeepTooLong) {
if (!server->lastTimeNotifyLongStorageQueue.present() ||
currentTime - server->lastTimeNotifyLongStorageQueue.get() >
SERVER_KNOBS->DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL) {
server->longStorageQueue.trigger(); // will trigger data move for rebalancing storage queue
TraceEvent(SevInfo, "SSTrackerTriggerLongStorageQueue", server->getId())
.detail("CurrentQueueSize", queueSize);
server->lastTimeNotifyLongStorageQueue = currentTime;
}
}
}
return Void();
}
@ -174,6 +193,40 @@ Future<Void> TCServerInfo::serverMetricsPolling(Reference<IDDTxnProcessor> txnPr
return TCServerInfoImpl::serverMetricsPolling(this, txnProcessor);
}
// Return true if the storage queue of the input storage server ssi keeps too long for a while
bool TCServerInfo::updateAndGetStorageQueueTooLong(int64_t currentBytes) {
double currentTime = now();
if (currentBytes > SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_LONG_BYTES) {
if (!storageQueueTooLongStartTime.present()) {
storageQueueTooLongStartTime = currentTime;
TraceEvent(SevWarn, "SSTrackerDetectStorageQueueBecomeLong", id)
.detail("StorageQueueBytes", currentBytes)
.detail("Duration", currentTime - storageQueueTooLongStartTime.get());
} else {
TraceEvent(SevDebug, "SSTrackerDetectStorageQueueRemainLong", id)
.detail("StorageQueueBytes", currentBytes)
.detail("Duration", currentTime - storageQueueTooLongStartTime.get());
}
} else if (currentBytes < SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_SHORT_BYTES) {
if (storageQueueTooLongStartTime.present()) {
storageQueueTooLongStartTime.reset();
TraceEvent(SevInfo, "SSTrackerDetectStorageQueueBecomeShort", id).detail("StorageQueueBytes", currentBytes);
}
} else {
if (storageQueueTooLongStartTime.present()) {
TraceEvent(SevDebug, "SSTrackerDetectStorageQueueRemainLong", id)
.detail("StorageQueueBytes", currentBytes)
.detail("Duration", currentTime - storageQueueTooLongStartTime.get());
}
}
if (storageQueueTooLongStartTime.present() &&
currentTime - storageQueueTooLongStartTime.get() > SERVER_KNOBS->DD_LONG_STORAGE_QUEUE_TIMESPAN) {
return true;
} else {
return false;
}
}
void TCServerInfo::updateInDesiredDC(std::vector<Optional<Key>> const& includedDCs) {
inDesiredDC =
(includedDCs.empty() ||

View File

@ -30,6 +30,7 @@ public:
FutureStream<GetTopKMetricsRequest> getTopKMetrics;
FutureStream<GetMetricsListRequest> getShardMetricsList;
FutureStream<Promise<int64_t>> averageShardBytes;
FutureStream<ServerTeamInfo> triggerStorageQueueRebalance;
virtual double getAverageShardBytes() = 0;
virtual ~IDDShardTracker() = default;
@ -121,7 +122,8 @@ public:
FutureStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<GetTopKMetricsRequest> const& getTopKMetrics,
FutureStream<GetMetricsListRequest> const& getShardMetricsList,
FutureStream<Promise<int64_t>> const& getAverageShardBytes);
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
FutureStream<ServerTeamInfo> const& triggerStorageQueueRebalance);
explicit DataDistributionTracker(DataDistributionTrackerInitParams const& params);
};

View File

@ -199,6 +199,7 @@ struct DDTeamCollectionInitParams {
Promise<UID> removeFailedServer;
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
PromiseStream<Promise<int64_t>> getAverageShardBytes;
PromiseStream<ServerTeamInfo> triggerStorageQueueRebalance;
};
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
@ -237,6 +238,7 @@ protected:
Reference<AsyncVar<bool>> processingWiggle; // track whether wiggling relocation is being processed
PromiseStream<StorageWiggleValue> nextWiggleInfo;
PromiseStream<Promise<int64_t>> getAverageShardBytes;
PromiseStream<ServerTeamInfo> triggerStorageQueueRebalance;
std::vector<Reference<TCTeamInfo>> badTeams;
std::vector<Reference<TCTeamInfo>> largeTeams;

View File

@ -54,6 +54,7 @@ public:
OTHER = 0,
REBALANCE_DISK,
REBALANCE_READ,
REBALANCE_WRITE,
MERGE_SHARD,
SIZE_SPLIT,
WRITE_SPLIT,
@ -70,6 +71,8 @@ public:
return "RebalanceDisk";
case REBALANCE_READ:
return "RebalanceRead";
case REBALANCE_WRITE:
return "RebalanceWrite";
case MERGE_SHARD:
return "MergeShard";
case SIZE_SPLIT:
@ -446,6 +449,16 @@ private:
double lastTransitionStartTime;
};
struct ServerTeamInfo {
UID serverId;
std::vector<ShardsAffectedByTeamFailure::Team> teams;
bool primary;
ServerTeamInfo() {}
ServerTeamInfo(UID serverId, const std::vector<ShardsAffectedByTeamFailure::Team>& teams, bool primary)
: serverId(serverId), teams(teams), primary(primary) {}
};
// DDShardInfo is so named to avoid link-time name collision with ShardInfo within the StorageServer
struct DDShardInfo {
Key key;

View File

@ -56,6 +56,12 @@ class TCServerInfo : public ReferenceCounted<TCServerInfo> {
std::vector<Reference<TCTeamInfo>> teams{};
ErrorOr<GetStorageMetricsReply> metrics;
Optional<HealthMetrics::StorageStats> storageStats;
Optional<double> storageQueueTooLongStartTime; // When a storage queue becomes long
// Last time when server notified teamTracker that the queue is long
// We do not want repeatedly notify teamTracker in present of long
// queue lastTimeNotifyLongStorageQueue is used to support this
Optional<double> lastTimeNotifyLongStorageQueue;
void setMetrics(GetStorageMetricsReply serverMetrics) { this->metrics = serverMetrics; }
void setStorageStats(HealthMetrics::StorageStats stats) { storageStats = stats; }
@ -74,6 +80,7 @@ public:
Promise<Void> updated;
AsyncVar<bool> wrongStoreTypeToRemove;
AsyncVar<bool> ssVersionTooFarBehind;
AsyncVar<Void> longStorageQueue; // set when the storage queue remains too long for a while
TCServerInfo(StorageServerInterface ssi,
DDTeamCollection* collection,
@ -121,7 +128,7 @@ public:
Future<Void> updateServerMetrics();
static Future<Void> updateServerMetrics(Reference<TCServerInfo> server);
Future<Void> serverMetricsPolling(Reference<IDDTxnProcessor> txnProcessor);
bool updateAndGetStorageQueueTooLong(int64_t currentBytes);
~TCServerInfo();
};

View File

@ -32,6 +32,7 @@ public:
PromiseStream<GetTopKMetricsRequest> getTopKMetrics;
PromiseStream<GetMetricsListRequest> getShardMetricsList;
PromiseStream<Promise<int64_t>> getAverageShardBytes;
PromiseStream<ServerTeamInfo> triggerStorageQueueRebalance;
KeyRangeMap<ShardTrackedData> shards;
@ -112,7 +113,8 @@ public:
getShardMetrics.getFuture(),
getTopKMetrics.getFuture(),
getShardMetricsList.getFuture(),
getAverageShardBytes.getFuture()));
getAverageShardBytes.getFuture(),
triggerStorageQueueRebalance.getFuture()));
actors.add(relocateShardReporter(this, output.getFuture()));

View File

@ -213,7 +213,8 @@ struct PerpetualWiggleStatsWorkload : public TestWorkload {
PromiseStream<GetMetricsRequest>(),
Promise<UID>(),
PromiseStream<Promise<int>>(),
PromiseStream<Promise<int64_t>>() });
PromiseStream<Promise<int64_t>>(),
PromiseStream<ServerTeamInfo>() });
tester.configuration.storageTeamSize = 3;
tester.configuration.perpetualStorageWiggleSpeed = 1;