diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 715cf89c51..315cd77f94 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -156,6 +156,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM, 121 ); init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 122 ); init( PRIORITY_REBALANCE_READ_OVERUTIL_TEAM, 123 ); + init( PRIORITY_REBALANCE_STORAGE_QUEUE, 124 ); init( PRIORITY_TEAM_HEALTHY, 140 ); init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 141 ); init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 ); @@ -345,7 +346,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ENFORCE_SHARD_COUNT_PER_TEAM, false ); if( randomize && BUGGIFY ) ENFORCE_SHARD_COUNT_PER_TEAM = true; init( DESIRED_MAX_SHARDS_PER_TEAM, 1000 ); if( randomize && BUGGIFY ) DESIRED_MAX_SHARDS_PER_TEAM = 10; init( ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION, false ); if( randomize && BUGGIFY ) ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION = true; - init( DD_TARGET_STORAGE_QUEUE_SIZE, TARGET_BYTES_PER_STORAGE_SERVER/3 ); if( randomize && BUGGIFY ) DD_TARGET_STORAGE_QUEUE_SIZE = TARGET_BYTES_PER_STORAGE_SERVER/10; + init( DD_TARGET_STORAGE_QUEUE_SIZE, TARGET_BYTES_PER_STORAGE_SERVER*0.35 ); if( randomize && BUGGIFY ) DD_TARGET_STORAGE_QUEUE_SIZE = TARGET_BYTES_PER_STORAGE_SERVER*0.035; + init( ENABLE_REBALANCE_STORAGE_QUEUE, false ); if( randomize && BUGGIFY ) ENABLE_REBALANCE_STORAGE_QUEUE = true; + init( REBALANCE_STORAGE_QUEUE_LONG_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.15); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_LONG_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.05; + init( REBALANCE_STORAGE_QUEUE_SHORT_BYTES, TARGET_BYTES_PER_STORAGE_SERVER*0.05); if( randomize && BUGGIFY ) REBALANCE_STORAGE_QUEUE_SHORT_BYTES = TARGET_BYTES_PER_STORAGE_SERVER*0.025; + init( DD_LONG_STORAGE_QUEUE_TIMESPAN, 60.0 ); if( isSimulated ) DD_LONG_STORAGE_QUEUE_TIMESPAN = deterministicRandom()->random01() * 10 + 1; + init( DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL, 30.0 ); if( isSimulated ) DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL = 5.0; + init( REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN, SHARD_MIN_BYTES_PER_KSEC); + init( DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD, true ); if ( isSimulated ) DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD = deterministicRandom()->coinflip(); // Large teams are disabled when SHARD_ENCODE_LOCATION_METADATA is enabled init( DD_MAX_SHARDS_ON_LARGE_TEAMS, 100 ); if( randomize && BUGGIFY ) DD_MAX_SHARDS_ON_LARGE_TEAMS = deterministicRandom()->randomInt(0, 3); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 33b4d5d1cf..85b3dd140c 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -159,6 +159,8 @@ public: int PRIORITY_REBALANCE_READ_OVERUTIL_TEAM; // A load-balance priority read mountain chopper int PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM; + // A load-balance priority storage queue too long + int PRIORITY_REBALANCE_STORAGE_QUEUE; // A team healthy priority for wiggle a storage server int PRIORITY_PERPETUAL_STORAGE_WIGGLE; // A team healthy priority when all servers in a team are healthy. When a team changes from any unhealthy states to @@ -327,6 +329,14 @@ public: // distributor to fetch the list of tenants over storage quota bool ENABLE_STORAGE_QUEUE_AWARE_TEAM_SELECTION; // experimental! int64_t DD_TARGET_STORAGE_QUEUE_SIZE; + bool ENABLE_REBALANCE_STORAGE_QUEUE; // experimental! + int64_t REBALANCE_STORAGE_QUEUE_LONG_BYTES; // Lower bound of length indicating the storage queue is too long + int64_t REBALANCE_STORAGE_QUEUE_SHORT_BYTES; // Upper bound of length indicating the storage queue is back to short + double DD_LONG_STORAGE_QUEUE_TIMESPAN; + double DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL; + int64_t REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN; + bool DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD; // Enable to allow storage queue rebalancer to move + // light-traffic shards out of the overloading server // TeamRemover to remove redundant teams bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index feb9eaebc5..f365ca2c68 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -65,9 +65,10 @@ enum class DataMovementReason : uint8_t { TEAM_0_LEFT = 16, SPLIT_SHARD = 17, ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD = 18, - ASSIGN_EMPTY_RANGE = 19, // dummy reason, no corresponding data move priority - SEED_SHARD_SERVER = 20, // dummy reason, no corresponding data move priority - NUMBER_OF_REASONS = 21, // dummy reason, no corresponding data move priority + REBALANCE_STORAGE_QUEUE = 19, + ASSIGN_EMPTY_RANGE = 20, // dummy reason, no corresponding data move priority + SEED_SHARD_SERVER = 21, // dummy reason, no corresponding data move priority + NUMBER_OF_REASONS = 22, // dummy reason, no corresponding data move priority }; // SystemKey is just a Key but with a special type so that instances of it can be found easily throughput the code base diff --git a/fdbserver/DDRelocationQueue.actor.cpp b/fdbserver/DDRelocationQueue.actor.cpp index de862d4a55..d5ede05fae 100644 --- a/fdbserver/DDRelocationQueue.actor.cpp +++ b/fdbserver/DDRelocationQueue.actor.cpp @@ -95,6 +95,7 @@ std::pair buildP { DataMovementReason::SPLIT_SHARD, SERVER_KNOBS->PRIORITY_SPLIT_SHARD }, { DataMovementReason::ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD, SERVER_KNOBS->PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD }, + { DataMovementReason::REBALANCE_STORAGE_QUEUE, SERVER_KNOBS->PRIORITY_REBALANCE_STORAGE_QUEUE }, { DataMovementReason::ASSIGN_EMPTY_RANGE, -2 }, // dummy reason, no corresponding actual data move { DataMovementReason::SEED_SHARD_SERVER, -3 }, // dummy reason, no corresponding actual data move { DataMovementReason::NUMBER_OF_REASONS, -4 }, // dummy reason, no corresponding actual data move @@ -147,7 +148,8 @@ RelocateData::RelocateData(RelocateShard const& rs) dataMoveId(rs.dataMoveId), workFactor(0), wantsNewServers(isDataMovementForMountainChopper(rs.moveReason) || isDataMovementForValleyFiller(rs.moveReason) || rs.moveReason == DataMovementReason::SPLIT_SHARD || - rs.moveReason == DataMovementReason::TEAM_REDUNDANT), + rs.moveReason == DataMovementReason::TEAM_REDUNDANT || + rs.moveReason == DataMovementReason::REBALANCE_STORAGE_QUEUE), cancellable(true), interval("QueuedRelocation", randomId), dataMove(rs.dataMove) { if (dataMove != nullptr) { this->src.insert(this->src.end(), dataMove->meta.src.begin(), dataMove->meta.src.end()); diff --git a/fdbserver/DDShardTracker.actor.cpp b/fdbserver/DDShardTracker.actor.cpp index 488515beac..1e0d0e91e2 100644 --- a/fdbserver/DDShardTracker.actor.cpp +++ b/fdbserver/DDShardTracker.actor.cpp @@ -1524,6 +1524,46 @@ struct DataDistributionTrackerImpl { when(GetMetricsListRequest req = waitNext(self->getShardMetricsList)) { self->actors.add(fetchShardMetricsList(self, req)); } + when(ServerTeamInfo req = waitNext(self->triggerStorageQueueRebalance)) { + TraceEvent e("TriggerDataMoveStorageQueueRebalance", self->distributorId); + e.detail("Server", req.serverId); + e.detail("Teams", req.teams.size()); + int64_t maxShardWriteTraffic = 0; + KeyRange shardToMove; + ShardsAffectedByTeamFailure::Team selectedTeam; + for (const auto& team : req.teams) { + for (auto const& shard : self->shardsAffectedByTeamFailure->getShardsFor(team)) { + for (auto it : self->shards->intersectingRanges(shard)) { + if (it->value().stats->get().present()) { + int64_t shardWriteTraffic = + it->value().stats->get().get().metrics.bytesWrittenPerKSecond; + if (shardWriteTraffic > maxShardWriteTraffic && + (SERVER_KNOBS->DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD || + shardWriteTraffic > + SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN)) { + shardToMove = it->range(); + maxShardWriteTraffic = shardWriteTraffic; + } + } + } + } + } + if (!shardToMove.empty()) { + e.detail("TeamSelected", selectedTeam.servers); + e.detail("ShardSelected", shardToMove); + e.detail("ShardWriteBytesPerKSec", maxShardWriteTraffic); + RelocateShard rs(shardToMove, + SERVER_KNOBS->PRIORITY_REBALANCE_STORAGE_QUEUE, + RelocateReason::REBALANCE_WRITE); + self->output.send(rs); + TraceEvent("SendRelocateToDDQueue", self->distributorId) + .detail("ServerPrimary", req.primary) + .detail("ServerTeam", selectedTeam.servers) + .detail("KeyBegin", rs.keys.begin) + .detail("KeyEnd", rs.keys.end) + .detail("Priority", rs.priority); + } + } when(wait(self->actors.getResult())) {} when(TenantCacheTenantCreated newTenant = waitNext(tenantCreationSignal.getFuture())) { self->actors.add(tenantCreationHandling(self, newTenant)); @@ -1544,11 +1584,13 @@ Future DataDistributionTracker::run(Reference sel const FutureStream& getShardMetrics, const FutureStream& getTopKMetrics, const FutureStream& getShardMetricsList, - const FutureStream>& getAverageShardBytes) { + const FutureStream>& getAverageShardBytes, + const FutureStream& triggerStorageQueueRebalance) { self->getShardMetrics = getShardMetrics; self->getTopKMetrics = getTopKMetrics; self->getShardMetricsList = getShardMetricsList; self->averageShardBytes = getAverageShardBytes; + self->triggerStorageQueueRebalance = triggerStorageQueueRebalance; self->userRangeConfig = initData->userRangeConfig; return holdWhile(self, DataDistributionTrackerImpl::run(self.getPtr(), initData)); } diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 1e77f7bdd0..d930250fc2 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -1553,6 +1553,19 @@ public: when(wait(storageMetadataTracker)) {} when(wait(server->ssVersionTooFarBehind.onChange())) {} when(wait(self->disableFailingLaggingServers.onChange())) {} + when(wait(server->longStorageQueue.onChange())) { + TraceEvent(SevInfo, "TriggerStorageQueueRebalance", self->distributorId) + .detail("SSID", server->getId()); + std::vector teams; + for (const auto& team : server->getTeams()) { + std::vector servers; + for (const auto& server : team->getServers()) { + servers.push_back(server->getId()); + } + teams.push_back(ShardsAffectedByTeamFailure::Team(servers, self->primary)); + } + self->triggerStorageQueueRebalance.send(ServerTeamInfo(server->getId(), teams, self->primary)); + } } if (recordTeamCollectionInfo) { @@ -4175,7 +4188,7 @@ DDTeamCollection::DDTeamCollection(DDTeamCollectionInitParams const& params) zeroHealthyTeams(params.zeroHealthyTeams), optimalTeamCount(0), zeroOptimalTeams(true), isTssRecruiting(false), includedDCs(params.includedDCs), otherTrackedDCs(params.otherTrackedDCs), processingUnhealthy(params.processingUnhealthy), getAverageShardBytes(params.getAverageShardBytes), - readyToStart(params.readyToStart), + triggerStorageQueueRebalance(params.triggerStorageQueueRebalance), readyToStart(params.readyToStart), checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()), clearHealthyZoneFuture(true), lowestUtilizationTeam(0), highestUtilizationTeam(0), getShardMetrics(params.getShardMetrics), @@ -5951,7 +5964,8 @@ public: PromiseStream(), Promise(), PromiseStream>(), - PromiseStream>() })); + PromiseStream>(), + PromiseStream() })); for (int id = 1; id <= processCount; ++id) { UID uid(id, 0); @@ -6004,7 +6018,8 @@ public: PromiseStream(), Promise(), PromiseStream>(), - PromiseStream>() })); + PromiseStream>(), + PromiseStream() })); for (int id = 1; id <= processCount; id++) { UID uid(id, 0); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index f6daf65ed1..0f093f97c4 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1033,6 +1033,7 @@ ACTOR Future dataDistribution(Reference self, ASSERT(self->configuration.storageTeamSize > 0); state PromiseStream> getAverageShardBytes; + state PromiseStream triggerStorageQueueRebalance; state PromiseStream> getUnhealthyRelocationCount; state PromiseStream getShardMetrics; state PromiseStream getTopKShardMetrics; @@ -1087,7 +1088,8 @@ ACTOR Future dataDistribution(Reference self, getShardMetrics.getFuture(), getTopKShardMetrics.getFuture(), getShardMetricsList.getFuture(), - getAverageShardBytes.getFuture()), + getAverageShardBytes.getFuture(), + triggerStorageQueueRebalance.getFuture()), "DDTracker", self->ddId, &normalDDQueueErrors())); @@ -1150,7 +1152,8 @@ ACTOR Future dataDistribution(Reference self, getShardMetrics, removeFailedServer, getUnhealthyRelocationCount, - getAverageShardBytes }); + getAverageShardBytes, + triggerStorageQueueRebalance }); teamCollectionsPtrs.push_back(self->context->primaryTeamCollection.getPtr()); Reference>> recruitStorage; if (!isMocked) { @@ -1175,7 +1178,8 @@ ACTOR Future dataDistribution(Reference self, getShardMetrics, removeFailedServer, getUnhealthyRelocationCount, - getAverageShardBytes }); + getAverageShardBytes, + triggerStorageQueueRebalance }); teamCollectionsPtrs.push_back(self->context->remoteTeamCollection.getPtr()); self->context->remoteTeamCollection->teamCollections = teamCollectionsPtrs; actors.push_back(reportErrorsExcept(DDTeamCollection::run(self->context->remoteTeamCollection, diff --git a/fdbserver/TCInfo.actor.cpp b/fdbserver/TCInfo.actor.cpp index 3479b4c413..061e5f740e 100644 --- a/fdbserver/TCInfo.actor.cpp +++ b/fdbserver/TCInfo.actor.cpp @@ -92,6 +92,25 @@ public: server->collection->removeLaggingStorageServer(server->lastKnownInterface.locality.zoneId().get()); } } + + // Detect any storage server with a too long storage queue and notify team tracker + // with a minimal interval + if (SERVER_KNOBS->ENABLE_REBALANCE_STORAGE_QUEUE) { + int64_t queueSize = server->getStorageQueueSize(); + bool storageQueueKeepTooLong = server->updateAndGetStorageQueueTooLong(queueSize); + double currentTime = now(); + if (storageQueueKeepTooLong) { + if (!server->lastTimeNotifyLongStorageQueue.present() || + currentTime - server->lastTimeNotifyLongStorageQueue.get() > + SERVER_KNOBS->DD_REBALANCE_STORAGE_QUEUE_TIME_INTERVAL) { + server->longStorageQueue.trigger(); // will trigger data move for rebalancing storage queue + TraceEvent(SevInfo, "SSTrackerTriggerLongStorageQueue", server->getId()) + .detail("CurrentQueueSize", queueSize); + server->lastTimeNotifyLongStorageQueue = currentTime; + } + } + } + return Void(); } @@ -174,6 +193,40 @@ Future TCServerInfo::serverMetricsPolling(Reference txnPr return TCServerInfoImpl::serverMetricsPolling(this, txnProcessor); } +// Return true if the storage queue of the input storage server ssi keeps too long for a while +bool TCServerInfo::updateAndGetStorageQueueTooLong(int64_t currentBytes) { + double currentTime = now(); + if (currentBytes > SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_LONG_BYTES) { + if (!storageQueueTooLongStartTime.present()) { + storageQueueTooLongStartTime = currentTime; + TraceEvent(SevWarn, "SSTrackerDetectStorageQueueBecomeLong", id) + .detail("StorageQueueBytes", currentBytes) + .detail("Duration", currentTime - storageQueueTooLongStartTime.get()); + } else { + TraceEvent(SevDebug, "SSTrackerDetectStorageQueueRemainLong", id) + .detail("StorageQueueBytes", currentBytes) + .detail("Duration", currentTime - storageQueueTooLongStartTime.get()); + } + } else if (currentBytes < SERVER_KNOBS->REBALANCE_STORAGE_QUEUE_SHORT_BYTES) { + if (storageQueueTooLongStartTime.present()) { + storageQueueTooLongStartTime.reset(); + TraceEvent(SevInfo, "SSTrackerDetectStorageQueueBecomeShort", id).detail("StorageQueueBytes", currentBytes); + } + } else { + if (storageQueueTooLongStartTime.present()) { + TraceEvent(SevDebug, "SSTrackerDetectStorageQueueRemainLong", id) + .detail("StorageQueueBytes", currentBytes) + .detail("Duration", currentTime - storageQueueTooLongStartTime.get()); + } + } + if (storageQueueTooLongStartTime.present() && + currentTime - storageQueueTooLongStartTime.get() > SERVER_KNOBS->DD_LONG_STORAGE_QUEUE_TIMESPAN) { + return true; + } else { + return false; + } +} + void TCServerInfo::updateInDesiredDC(std::vector> const& includedDCs) { inDesiredDC = (includedDCs.empty() || diff --git a/fdbserver/include/fdbserver/DDShardTracker.h b/fdbserver/include/fdbserver/DDShardTracker.h index 40daef2bf6..6f5bc3ae2e 100644 --- a/fdbserver/include/fdbserver/DDShardTracker.h +++ b/fdbserver/include/fdbserver/DDShardTracker.h @@ -30,6 +30,7 @@ public: FutureStream getTopKMetrics; FutureStream getShardMetricsList; FutureStream> averageShardBytes; + FutureStream triggerStorageQueueRebalance; virtual double getAverageShardBytes() = 0; virtual ~IDDShardTracker() = default; @@ -121,7 +122,8 @@ public: FutureStream const& getShardMetrics, FutureStream const& getTopKMetrics, FutureStream const& getShardMetricsList, - FutureStream> const& getAverageShardBytes); + FutureStream> const& getAverageShardBytes, + FutureStream const& triggerStorageQueueRebalance); explicit DataDistributionTracker(DataDistributionTrackerInitParams const& params); }; diff --git a/fdbserver/include/fdbserver/DDTeamCollection.h b/fdbserver/include/fdbserver/DDTeamCollection.h index d654032964..e774c9730c 100644 --- a/fdbserver/include/fdbserver/DDTeamCollection.h +++ b/fdbserver/include/fdbserver/DDTeamCollection.h @@ -199,6 +199,7 @@ struct DDTeamCollectionInitParams { Promise removeFailedServer; PromiseStream> getUnhealthyRelocationCount; PromiseStream> getAverageShardBytes; + PromiseStream triggerStorageQueueRebalance; }; class DDTeamCollection : public ReferenceCounted { @@ -237,6 +238,7 @@ protected: Reference> processingWiggle; // track whether wiggling relocation is being processed PromiseStream nextWiggleInfo; PromiseStream> getAverageShardBytes; + PromiseStream triggerStorageQueueRebalance; std::vector> badTeams; std::vector> largeTeams; diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index a27ae2a100..d6095474e7 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -54,6 +54,7 @@ public: OTHER = 0, REBALANCE_DISK, REBALANCE_READ, + REBALANCE_WRITE, MERGE_SHARD, SIZE_SPLIT, WRITE_SPLIT, @@ -70,6 +71,8 @@ public: return "RebalanceDisk"; case REBALANCE_READ: return "RebalanceRead"; + case REBALANCE_WRITE: + return "RebalanceWrite"; case MERGE_SHARD: return "MergeShard"; case SIZE_SPLIT: @@ -446,6 +449,16 @@ private: double lastTransitionStartTime; }; +struct ServerTeamInfo { + UID serverId; + std::vector teams; + bool primary; + + ServerTeamInfo() {} + ServerTeamInfo(UID serverId, const std::vector& teams, bool primary) + : serverId(serverId), teams(teams), primary(primary) {} +}; + // DDShardInfo is so named to avoid link-time name collision with ShardInfo within the StorageServer struct DDShardInfo { Key key; diff --git a/fdbserver/include/fdbserver/TCInfo.h b/fdbserver/include/fdbserver/TCInfo.h index eabf5bc503..957a0490d3 100644 --- a/fdbserver/include/fdbserver/TCInfo.h +++ b/fdbserver/include/fdbserver/TCInfo.h @@ -56,6 +56,12 @@ class TCServerInfo : public ReferenceCounted { std::vector> teams{}; ErrorOr metrics; Optional storageStats; + Optional storageQueueTooLongStartTime; // When a storage queue becomes long + + // Last time when server notified teamTracker that the queue is long + // We do not want repeatedly notify teamTracker in present of long + // queue lastTimeNotifyLongStorageQueue is used to support this + Optional lastTimeNotifyLongStorageQueue; void setMetrics(GetStorageMetricsReply serverMetrics) { this->metrics = serverMetrics; } void setStorageStats(HealthMetrics::StorageStats stats) { storageStats = stats; } @@ -74,6 +80,7 @@ public: Promise updated; AsyncVar wrongStoreTypeToRemove; AsyncVar ssVersionTooFarBehind; + AsyncVar longStorageQueue; // set when the storage queue remains too long for a while TCServerInfo(StorageServerInterface ssi, DDTeamCollection* collection, @@ -121,7 +128,7 @@ public: Future updateServerMetrics(); static Future updateServerMetrics(Reference server); Future serverMetricsPolling(Reference txnProcessor); - + bool updateAndGetStorageQueueTooLong(int64_t currentBytes); ~TCServerInfo(); }; diff --git a/fdbserver/workloads/MockDDTrackerShardEvaluator.actor.cpp b/fdbserver/workloads/MockDDTrackerShardEvaluator.actor.cpp index 94e1cd7a2a..90f3882917 100644 --- a/fdbserver/workloads/MockDDTrackerShardEvaluator.actor.cpp +++ b/fdbserver/workloads/MockDDTrackerShardEvaluator.actor.cpp @@ -32,6 +32,7 @@ public: PromiseStream getTopKMetrics; PromiseStream getShardMetricsList; PromiseStream> getAverageShardBytes; + PromiseStream triggerStorageQueueRebalance; KeyRangeMap shards; @@ -112,7 +113,8 @@ public: getShardMetrics.getFuture(), getTopKMetrics.getFuture(), getShardMetricsList.getFuture(), - getAverageShardBytes.getFuture())); + getAverageShardBytes.getFuture(), + triggerStorageQueueRebalance.getFuture())); actors.add(relocateShardReporter(this, output.getFuture())); diff --git a/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp b/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp index cf8e0e0114..a67fe6d2d8 100644 --- a/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp +++ b/fdbserver/workloads/PerpetualWiggleStatsWorkload.actor.cpp @@ -213,7 +213,8 @@ struct PerpetualWiggleStatsWorkload : public TestWorkload { PromiseStream(), Promise(), PromiseStream>(), - PromiseStream>() }); + PromiseStream>(), + PromiseStream() }); tester.configuration.storageTeamSize = 3; tester.configuration.perpetualStorageWiggleSpeed = 1;