From 7f9203bbc0e07af1621a70a8cc3b9f75d773e4d3 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Wed, 14 Dec 2022 15:42:14 -0800 Subject: [PATCH] Track individual key range metrics in each physical shard --- fdbserver/DDShardTracker.actor.cpp | 85 ++++++++++++++++--- .../fdbserver/DataDistribution.actor.h | 8 +- 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/fdbserver/DDShardTracker.actor.cpp b/fdbserver/DDShardTracker.actor.cpp index 2e92dd551a..d384ad7187 100644 --- a/fdbserver/DDShardTracker.actor.cpp +++ b/fdbserver/DDShardTracker.actor.cpp @@ -208,11 +208,12 @@ int64_t getMaxShardSize(double dbSizeEstimate) { (int64_t)SERVER_KNOBS->MAX_SHARD_BYTES); } -ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys, - const Reference>>& shardMetrics, - const BandwidthStatus& bandwidthStatus, - PromiseStream readHotShard) { +std::pair calculateShardSizeBounds( + const KeyRange& keys, + const Reference>>& shardMetrics, + const BandwidthStatus& bandwidthStatus) { ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack(); + bool readHotShard = false; if (shardMetrics->get().present()) { auto bytes = shardMetrics->get().get().metrics.bytes; auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics); @@ -252,15 +253,13 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS * (1.0 - SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER); bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4; - // TraceEvent("RHDTriggerReadHotLoggingForShard") - // .detail("ShardBegin", keys.begin.printable().c_str()) - // .detail("ShardEnd", keys.end.printable().c_str()); - readHotShard.send(keys); + + readHotShard = true; } else { ASSERT(false); } } - return bounds; + return { bounds, readHotShard }; } ACTOR Future trackShardMetrics(DataDistributionTracker::SafeAccessor self, @@ -285,7 +284,15 @@ ACTOR Future trackShardMetrics(DataDistributionTracker::SafeAccessor self, try { loop { state ShardSizeBounds bounds; - bounds = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus, self()->readHotShard); + bool readHotShard; + std::tie(bounds, readHotShard) = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus); + + if (readHotShard) { + // TraceEvent("RHDTriggerReadHotLoggingForShard") + // .detail("ShardBegin", keys.begin.printable().c_str()) + // .detail("ShardEnd", keys.end.printable().c_str()); + self()->readHotShard.send(keys); + } loop { // metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range @@ -1544,6 +1551,56 @@ FDB_DEFINE_BOOLEAN_PARAM(InOverSizePhysicalShard); FDB_DEFINE_BOOLEAN_PARAM(PhysicalShardAvailable); FDB_DEFINE_BOOLEAN_PARAM(MoveKeyRangeOutPhysicalShard); +ACTOR Future trackKeyRangeInPhysicalShardMetrics(Reference db, + KeyRange keys, + Reference>> shardMetrics) { + state BandwidthStatus bandwidthStatus = + shardMetrics->get().present() ? getBandwidthStatus(shardMetrics->get().get().metrics) : BandwidthStatusNormal; + state double lastLowBandwidthStartTime = + shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now(); + state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1; + wait(delay(0, TaskPriority::DataDistribution)); + + /*TraceEvent("trackKeyRangeInPhysicalShardMetricsStarting") + .detail("Keys", keys) + .detail("TrackedBytesInitiallyPresent", shardMetrics->get().present()) + .detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0) + .detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/ + + loop { + state ShardSizeBounds bounds; + bool readHotShard; + std::tie(bounds, readHotShard) = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus); + + loop { + // metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range + std::pair, int> metrics = + wait(db->waitStorageMetrics(keys, + bounds.min, + bounds.max, + bounds.permittedError, + CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, + shardCount)); + if (metrics.first.present()) { + BandwidthStatus newBandwidthStatus = getBandwidthStatus(metrics.first.get()); + if (newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) { + lastLowBandwidthStartTime = now(); + } + bandwidthStatus = newBandwidthStatus; + shardMetrics->set(ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount)); + break; + } else { + shardCount = metrics.second; + if (shardMetrics->get().present()) { + auto newShardMetrics = shardMetrics->get().get(); + newShardMetrics.shardCount = shardCount; + shardMetrics->set(newShardMetrics); + } + } + } + } +} + void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) { if (g_network->isSimulated()) { // Test that new range must not overlap with any existing range in this shard. @@ -1552,8 +1609,9 @@ void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) } } - // TODO(zhewu): add metrics tracking actor. RangeData data; + data.stats = makeReference>>(); + data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats); rangeData.emplace(newRange, data); } @@ -1569,8 +1627,9 @@ void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRang std::vector remainingRanges = range - outRange; for (auto& r : remainingRanges) { ASSERT(r != range); - // TODO(zhewu): add metrics tracking actor. RangeData data; + data.stats = makeReference>>(); + data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, r, data.stats); rangeData.emplace(r, data); } // Must erase last since `remainingRanges` uses data in `range`. @@ -1623,7 +1682,7 @@ void PhysicalShardCollection::insertPhysicalShardToCollection(uint64_t physicalS ASSERT(physicalShardID != anonymousShardId.first() && physicalShardID != UID().first()); ASSERT(physicalShardInstances.count(physicalShardID) == 0); physicalShardInstances.insert( - std::make_pair(physicalShardID, PhysicalShard(physicalShardID, metrics, teams, whenCreated))); + std::make_pair(physicalShardID, PhysicalShard(txnProcessor, physicalShardID, metrics, teams, whenCreated))); return; } diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 47b244c8fd..8679b6d9b5 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -279,11 +279,12 @@ public: struct PhysicalShard { PhysicalShard() : id(UID().first()) {} - PhysicalShard(uint64_t id, + PhysicalShard(Reference txnProcessor, + uint64_t id, StorageMetrics const& metrics, std::vector teams, PhysicalShardCreationTime whenCreated) - : id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {} + : txnProcessor(txnProcessor), id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {} // Adds `newRange` to this physical shard and starts monitoring the shard. void addRange(const KeyRange& newRange); @@ -293,13 +294,14 @@ public: std::string toString() const { return fmt::format("{}", std::to_string(id)); } + Reference txnProcessor; uint64_t id; // physical shard id (never changed) StorageMetrics metrics; // current metrics, updated by shardTracker std::vector teams; // which team owns this physical shard (never changed) PhysicalShardCreationTime whenCreated; // when this physical shard is created (never changed) struct RangeData { - Future trackMetrics; // TODO(zhewu): add shard tracking actor. + Future trackMetrics; Reference>> stats; // TODO(zhewu): aggregate all metrics to a single physical shard metrics. };