new version of getReadHotRanges

This commit is contained in:
Xiaoxi Wang 2023-03-01 15:55:29 -08:00
parent 26237a291d
commit 179f0ba71c
4 changed files with 72 additions and 20 deletions

View File

@ -796,23 +796,30 @@ struct ReadHotRangeWithMetrics {
// The density for key range [A,C) is 30 * 100 / 200 = 15
double density;
// How many bytes of data was sent in a period of time because of read requests.
double readBandwidthKSec;
double readBandwidthSec;
int64_t bytes; // storage bytes
double readOpsKSec; // an interpolated value of 1000 second
double readOpsSec; // an interpolated value over sampling interval
ReadHotRangeWithMetrics() = default;
ReadHotRangeWithMetrics(KeyRangeRef const& keys, double density, double readBandwidth)
: keys(keys), density(density), readBandwidthKSec(readBandwidth) {}
: keys(keys), density(density), readBandwidthSec(readBandwidth) {}
ReadHotRangeWithMetrics(KeyRangeRef const& keys, int64_t bytes, double readBandwidth, double readOpsKSec)
: keys(keys), density(readBandwidth / std::max((int64_t)1, bytes)), readBandwidthSec(readBandwidth), bytes(bytes),
readOpsSec(readOpsKSec) {}
ReadHotRangeWithMetrics(Arena& arena, const ReadHotRangeWithMetrics& rhs)
: keys(arena, rhs.keys), density(rhs.density), readBandwidthKSec(rhs.readBandwidthKSec) {}
: keys(arena, rhs.keys), density(rhs.density), readBandwidthSec(rhs.readBandwidthSec), bytes(rhs.bytes),
readOpsSec(rhs.readOpsSec) {}
int expectedSize() const { return keys.expectedSize() + sizeof(density) + sizeof(readBandwidthKSec); }
int expectedSize() const {
return keys.expectedSize() + sizeof(density) + sizeof(readBandwidthSec) + sizeof(bytes) + sizeof(readOpsSec);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keys, density, readBandwidthKSec, bytes, readOpsKSec);
serializer(ar, keys, density, readBandwidthSec, bytes, readOpsSec);
}
};

View File

@ -382,7 +382,7 @@ ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
for (const auto& keyRange : readHotRanges) {
TraceEvent("ReadHotRangeLog")
.detail("ReadDensity", keyRange.density)
.detail("ReadBandwidth", keyRange.readBandwidthKSec)
.detail("ReadBandwidth", keyRange.readBandwidthSec)
.detail("ReadDensityThreshold", SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO)
.detail("KeyRangeBegin", keyRange.keys.begin)
.detail("KeyRangeEnd", keyRange.keys.end);
@ -893,7 +893,7 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
splitMetrics.bytesWrittenPerKSecond =
keys.begin >= keyServersKeys.begin ? splitMetrics.infinity : SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
splitMetrics.iosPerKSecond = splitMetrics.infinity;
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidthKSec
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidthSec
state Standalone<VectorRef<KeyRef>> splitKeys =
wait(self->db->splitStorageMetrics(keys, splitMetrics, metrics, SERVER_KNOBS->MIN_SHARD_BYTES));

View File

@ -358,10 +358,55 @@ void StorageServerMetrics::getStorageMetrics(GetStorageMetricsRequest req,
req.reply.send(rep);
}
std::vector<ReadHotRangeWithMetrics> StorageServerMetrics::getReadHotRanges(KeyRangeRef parentRange,
int splitCount,
uint8_t splitType) const {
const StorageMetricSample* sampler;
switch (splitType) {
case ReadHotSubRangeRequest::SplitType::BYTES:
sampler = &byteSample;
break;
case ReadHotSubRangeRequest::SplitType::READ_BYTES:
sampler = &bytesReadSample;
break;
case ReadHotSubRangeRequest::SplitType::READ_OPS:
sampler = &opsReadSample;
break;
default:
ASSERT(false);
}
std::vector<ReadHotRangeWithMetrics> toReturn;
double total = sampler->getEstimate(parentRange);
double splitChunk = total / splitCount;
KeyRef beginKey = parentRange.begin;
while (true) {
auto endKey = sampler->sample.index(sampler->sample.sumTo(sampler->sample.lower_bound(beginKey)) + splitChunk);
// Empty chunk
if (beginKey >= *endKey)
break;
// Don't round up the larger range for now.
KeyRangeRef range(beginKey, *endKey);
toReturn.emplace_back(
range,
byteSample.getEstimate(range),
(double)bytesReadSample.getEstimate(range) / SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL,
(double)opsReadSample.getEstimate(range) / SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL);
if (*endKey >= parentRange.end)
break;
beginKey = *endKey;
}
return toReturn;
}
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests
// `StorageMetricsSampleTests.txt` after change made.
std::vector<ReadHotRangeWithMetrics> StorageServerMetrics::getReadHotRanges(
std::vector<ReadHotRangeWithMetrics> StorageServerMetrics::_getReadHotRanges(
KeyRangeRef shard,
double readDensityRatio,
int64_t baseChunkSize,
@ -420,10 +465,7 @@ std::vector<ReadHotRangeWithMetrics> StorageServerMetrics::getReadHotRanges(
void StorageServerMetrics::getReadHotRanges(ReadHotSubRangeRequest req) const {
ReadHotSubRangeReply reply;
auto _ranges = getReadHotRanges(req.keys,
SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
SERVER_KNOBS->SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS);
auto _ranges = getReadHotRanges(req.keys, req.splitCount, req.type);
reply.readHotRanges = VectorRef(_ranges.data(), _ranges.size());
req.reply.send(reply);
}
@ -690,7 +732,7 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/simple") {
ssm.byteSample.sample.insert("Cat"_sr, 300 * sampleUnit);
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef("A"_sr, "C"_sr), 2.0, 200 * sampleUnit, 0);
ssm._getReadHotRanges(KeyRangeRef("A"_sr, "C"_sr), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 1 && (*t.begin()).keys.begin == "Bah"_sr && (*t.begin()).keys.end == "Bob"_sr);
@ -720,7 +762,7 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/moreThanOneRange") {
ssm.byteSample.sample.insert("Dah"_sr, 300 * sampleUnit);
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef("A"_sr, "D"_sr), 2.0, 200 * sampleUnit, 0);
ssm._getReadHotRanges(KeyRangeRef("A"_sr, "D"_sr), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == "Bah"_sr && (*t.begin()).keys.end == "Bob"_sr);
ASSERT(t.at(1).keys.begin == "Cat"_sr && t.at(1).keys.end == "Dah"_sr);
@ -752,7 +794,7 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/consecutiveRanges") {
ssm.byteSample.sample.insert("Dah"_sr, 300 * sampleUnit);
std::vector<ReadHotRangeWithMetrics> t =
ssm.getReadHotRanges(KeyRangeRef("A"_sr, "D"_sr), 2.0, 200 * sampleUnit, 0);
ssm._getReadHotRanges(KeyRangeRef("A"_sr, "D"_sr), 2.0, 200 * sampleUnit, 0);
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == "Bah"_sr && (*t.begin()).keys.end == "But"_sr);
ASSERT(t.at(1).keys.begin == "Cat"_sr && t.at(1).keys.end == "Dah"_sr);

View File

@ -131,10 +131,7 @@ struct StorageServerMetrics {
Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay);
std::vector<ReadHotRangeWithMetrics> getReadHotRanges(KeyRangeRef shard,
double readDensityRatio,
int64_t baseChunkSize,
int64_t minShardReadBandwidthPerKSeconds) const;
std::vector<ReadHotRangeWithMetrics> getReadHotRanges(KeyRangeRef shard, int splitCount, uint8_t splitType) const;
void getReadHotRanges(ReadHotSubRangeRequest req) const;
@ -142,6 +139,12 @@ struct StorageServerMetrics {
void getSplitPoints(SplitRangeRequest req, Optional<KeyRef> prefix) const;
[[maybe_unused]] std::vector<ReadHotRangeWithMetrics> _getReadHotRanges(
KeyRangeRef shard,
double readDensityRatio,
int64_t baseChunkSize,
int64_t minShardReadBandwidthPerKSeconds) const;
private:
static void collapse(KeyRangeMap<int>& map, KeyRef const& key);
static void add(KeyRangeMap<int>& map, KeyRangeRef const& keys, int delta);