Track individual key range metrics in each physical shard

This commit is contained in:
Zhe Wu 2022-12-14 15:42:14 -08:00
parent d012a74fa8
commit 7f9203bbc0
2 changed files with 77 additions and 16 deletions

View File

@ -208,11 +208,12 @@ int64_t getMaxShardSize(double dbSizeEstimate) {
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
}
ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys,
const Reference<AsyncVar<Optional<ShardMetrics>>>& shardMetrics,
const BandwidthStatus& bandwidthStatus,
PromiseStream<KeyRange> readHotShard) {
std::pair<ShardSizeBounds, bool> calculateShardSizeBounds(
const KeyRange& keys,
const Reference<AsyncVar<Optional<ShardMetrics>>>& shardMetrics,
const BandwidthStatus& bandwidthStatus) {
ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack();
bool readHotShard = false;
if (shardMetrics->get().present()) {
auto bytes = shardMetrics->get().get().metrics.bytes;
auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics);
@ -252,15 +253,13 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys,
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS *
(1.0 - SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER);
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
// TraceEvent("RHDTriggerReadHotLoggingForShard")
// .detail("ShardBegin", keys.begin.printable().c_str())
// .detail("ShardEnd", keys.end.printable().c_str());
readHotShard.send(keys);
readHotShard = true;
} else {
ASSERT(false);
}
}
return bounds;
return { bounds, readHotShard };
}
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
@ -285,7 +284,15 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
try {
loop {
state ShardSizeBounds bounds;
bounds = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus, self()->readHotShard);
bool readHotShard;
std::tie(bounds, readHotShard) = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus);
if (readHotShard) {
// TraceEvent("RHDTriggerReadHotLoggingForShard")
// .detail("ShardBegin", keys.begin.printable().c_str())
// .detail("ShardEnd", keys.end.printable().c_str());
self()->readHotShard.send(keys);
}
loop {
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
@ -1544,6 +1551,56 @@ FDB_DEFINE_BOOLEAN_PARAM(InOverSizePhysicalShard);
FDB_DEFINE_BOOLEAN_PARAM(PhysicalShardAvailable);
FDB_DEFINE_BOOLEAN_PARAM(MoveKeyRangeOutPhysicalShard);
ACTOR Future<Void> trackKeyRangeInPhysicalShardMetrics(Reference<IDDTxnProcessor> db,
KeyRange keys,
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
state BandwidthStatus bandwidthStatus =
shardMetrics->get().present() ? getBandwidthStatus(shardMetrics->get().get().metrics) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime =
shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1;
wait(delay(0, TaskPriority::DataDistribution));
/*TraceEvent("trackKeyRangeInPhysicalShardMetricsStarting")
.detail("Keys", keys)
.detail("TrackedBytesInitiallyPresent", shardMetrics->get().present())
.detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0)
.detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/
loop {
state ShardSizeBounds bounds;
bool readHotShard;
std::tie(bounds, readHotShard) = calculateShardSizeBounds(keys, shardMetrics, bandwidthStatus);
loop {
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
std::pair<Optional<StorageMetrics>, int> metrics =
wait(db->waitStorageMetrics(keys,
bounds.min,
bounds.max,
bounds.permittedError,
CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT,
shardCount));
if (metrics.first.present()) {
BandwidthStatus newBandwidthStatus = getBandwidthStatus(metrics.first.get());
if (newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) {
lastLowBandwidthStartTime = now();
}
bandwidthStatus = newBandwidthStatus;
shardMetrics->set(ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount));
break;
} else {
shardCount = metrics.second;
if (shardMetrics->get().present()) {
auto newShardMetrics = shardMetrics->get().get();
newShardMetrics.shardCount = shardCount;
shardMetrics->set(newShardMetrics);
}
}
}
}
}
void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange) {
if (g_network->isSimulated()) {
// Test that new range must not overlap with any existing range in this shard.
@ -1552,8 +1609,9 @@ void PhysicalShardCollection::PhysicalShard::addRange(const KeyRange& newRange)
}
}
// TODO(zhewu): add metrics tracking actor.
RangeData data;
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, newRange, data.stats);
rangeData.emplace(newRange, data);
}
@ -1569,8 +1627,9 @@ void PhysicalShardCollection::PhysicalShard::removeRange(const KeyRange& outRang
std::vector<KeyRangeRef> remainingRanges = range - outRange;
for (auto& r : remainingRanges) {
ASSERT(r != range);
// TODO(zhewu): add metrics tracking actor.
RangeData data;
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
data.trackMetrics = trackKeyRangeInPhysicalShardMetrics(txnProcessor, r, data.stats);
rangeData.emplace(r, data);
}
// Must erase last since `remainingRanges` uses data in `range`.
@ -1623,7 +1682,7 @@ void PhysicalShardCollection::insertPhysicalShardToCollection(uint64_t physicalS
ASSERT(physicalShardID != anonymousShardId.first() && physicalShardID != UID().first());
ASSERT(physicalShardInstances.count(physicalShardID) == 0);
physicalShardInstances.insert(
std::make_pair(physicalShardID, PhysicalShard(physicalShardID, metrics, teams, whenCreated)));
std::make_pair(physicalShardID, PhysicalShard(txnProcessor, physicalShardID, metrics, teams, whenCreated)));
return;
}

View File

@ -279,11 +279,12 @@ public:
struct PhysicalShard {
PhysicalShard() : id(UID().first()) {}
PhysicalShard(uint64_t id,
PhysicalShard(Reference<IDDTxnProcessor> txnProcessor,
uint64_t id,
StorageMetrics const& metrics,
std::vector<ShardsAffectedByTeamFailure::Team> teams,
PhysicalShardCreationTime whenCreated)
: id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {}
: txnProcessor(txnProcessor), id(id), metrics(metrics), teams(teams), whenCreated(whenCreated) {}
// Adds `newRange` to this physical shard and starts monitoring the shard.
void addRange(const KeyRange& newRange);
@ -293,13 +294,14 @@ public:
std::string toString() const { return fmt::format("{}", std::to_string(id)); }
Reference<IDDTxnProcessor> txnProcessor;
uint64_t id; // physical shard id (never changed)
StorageMetrics metrics; // current metrics, updated by shardTracker
std::vector<ShardsAffectedByTeamFailure::Team> teams; // which team owns this physical shard (never changed)
PhysicalShardCreationTime whenCreated; // when this physical shard is created (never changed)
struct RangeData {
Future<Void> trackMetrics; // TODO(zhewu): add shard tracking actor.
Future<Void> trackMetrics;
Reference<AsyncVar<Optional<ShardMetrics>>>
stats; // TODO(zhewu): aggregate all metrics to a single physical shard metrics.
};