Increased some knobs to throttle the spammy read hot logging. Also added more details inside the read hot log to make it useful
This commit is contained in:
parent
13ddf0618f
commit
3ac6996844
|
@ -3862,7 +3862,7 @@ ACTOR Future< StorageMetrics > extractMetrics( Future<std::pair<Optional<Storage
|
|||
return x.first.get();
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, KeyRange keys) {
|
||||
ACTOR Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(Database cx, KeyRange keys) {
|
||||
loop {
|
||||
int64_t shardLimit = 100; // Shard limit here does not really matter since this function is currently only used
|
||||
// to find the read-hot sub ranges within a read-hot shard.
|
||||
|
@ -3889,7 +3889,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, K
|
|||
}
|
||||
|
||||
wait(waitForAll(fReplies));
|
||||
Standalone<VectorRef<KeyRangeRef>> results;
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> results;
|
||||
|
||||
for (int i = 0; i < nLocs; i++)
|
||||
results.append(results.arena(), fReplies[i].get().readHotRanges.begin(),
|
||||
|
@ -3906,7 +3906,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, K
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ACTOR Future< std::pair<Optional<StorageMetrics>, int> > waitStorageMetrics(
|
||||
Database cx,
|
||||
KeyRange keys,
|
||||
|
@ -3994,7 +3994,7 @@ ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsLis
|
|||
}
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> Transaction::getReadHotRanges(KeyRange const& keys) {
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> Transaction::getReadHotRanges(KeyRange const& keys) {
|
||||
return ::getReadHotRanges(cx, keys);
|
||||
}
|
||||
|
||||
|
|
|
@ -261,7 +261,7 @@ public:
|
|||
// Pass a negative value for `shardLimit` to indicate no limit on the shard number.
|
||||
Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit );
|
||||
Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated );
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(KeyRange const& keys);
|
||||
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(KeyRange const& keys);
|
||||
|
||||
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
|
||||
void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );
|
||||
|
|
|
@ -423,9 +423,30 @@ struct SplitMetricsRequest {
|
|||
}
|
||||
};
|
||||
|
||||
// Should always be used inside a `Standalone`.
|
||||
struct ReadHotRangeWithMetrics {
|
||||
KeyRangeRef keys;
|
||||
double density;
|
||||
double readBandwidth;
|
||||
|
||||
ReadHotRangeWithMetrics() {}
|
||||
ReadHotRangeWithMetrics(KeyRangeRef const& keys, double density, double readBandwidth)
|
||||
: keys(keys), density(density), readBandwidth(readBandwidth) {}
|
||||
|
||||
ReadHotRangeWithMetrics(Arena& arena, const ReadHotRangeWithMetrics& rhs)
|
||||
: keys(arena, rhs.keys), density(rhs.density), readBandwidth(rhs.readBandwidth) {}
|
||||
|
||||
int expectedSize() { return keys.expectedSize() + sizeof(density) + sizeof(readBandwidth); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, keys, density, readBandwidth);
|
||||
}
|
||||
};
|
||||
|
||||
struct ReadHotSubRangeReply {
|
||||
constexpr static FileIdentifier file_identifier = 10424537;
|
||||
Standalone<VectorRef<KeyRangeRef>> readHotRanges;
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
|
@ -299,11 +299,14 @@ ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
|
|||
state Transaction tr(self->cx);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> readHotRanges = wait(tr.getReadHotRanges(keys));
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges = wait(tr.getReadHotRanges(keys));
|
||||
for (auto& keyRange : readHotRanges) {
|
||||
TraceEvent("ReadHotRangeLog")
|
||||
.detail("KeyRangeBegin", keyRange.begin)
|
||||
.detail("KeyRangeEnd", keyRange.end);
|
||||
.detail("ReadDensity", keyRange.density)
|
||||
.detail("ReadBandwidth", keyRange.readBandwidth)
|
||||
.detail("ReadDensityThreshold", SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO)
|
||||
.detail("KeyRangeBegin", keyRange.keys.begin)
|
||||
.detail("KeyRangeEnd", keyRange.keys.end);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -144,17 +144,17 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( SHARD_BYTES_PER_SQRT_BYTES, 45 ); if( buggifySmallShards ) SHARD_BYTES_PER_SQRT_BYTES = 0;//Approximately 10000 bytes per shard
|
||||
init( MAX_SHARD_BYTES, 500000000 );
|
||||
init( KEY_SERVER_SHARD_BYTES, 500000000 );
|
||||
init( SHARD_MAX_READ_DENSITY_RATIO, 2.0);
|
||||
init( SHARD_MAX_READ_DENSITY_RATIO, 8.0);
|
||||
/*
|
||||
The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 2.0 was chosen to avoid reporting table scan as read hot.
|
||||
*/
|
||||
init ( SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS, 166667 * 1000);
|
||||
init ( SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS, 1666667 * 1000);
|
||||
/*
|
||||
The read bandwidth of a given shard needs to be larger than this value in order to be evaluated if it's read hot. The roughly 167KB per second is calculated as following:
|
||||
- Heuristic data suggests that each storage process can do max 50K read operations per second
|
||||
The read bandwidth of a given shard needs to be larger than this value in order to be evaluated if it's read hot. The roughly 1.67MB per second is calculated as following:
|
||||
- Heuristic data suggests that each storage process can do max 500K read operations per second
|
||||
- Each read has a minimum cost of EMPTY_READ_PENALTY, which is 20 bytes
|
||||
- Thus that gives a minimum 1MB per second
|
||||
- But to be conservative, set that number to be 1/6 of 1MB, which is roughly 166,667 bytes per second
|
||||
- Thus that gives a minimum 10MB per second
|
||||
- But to be conservative, set that number to be 1/6 of 10MB, which is roughly 1,666,667 bytes per second
|
||||
Shard with a read bandwidth smaller than this value will never be too busy to handle the reads.
|
||||
*/
|
||||
init( SHARD_MAX_BYTES_READ_PER_KSEC_JITTER, 0.1 );
|
||||
|
|
|
@ -416,9 +416,10 @@ struct StorageServerMetrics {
|
|||
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
|
||||
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests
|
||||
// `StorageMetricsSampleTests.txt` after change made.
|
||||
std::vector<KeyRangeRef> getReadHotRanges(KeyRangeRef shard, double readDensityRatio, int64_t baseChunkSize,
|
||||
int64_t minShardReadBandwidthPerKSeconds) {
|
||||
std::vector<KeyRangeRef> toReturn;
|
||||
std::vector<ReadHotRangeWithMetrics> getReadHotRanges(KeyRangeRef shard, double readDensityRatio,
|
||||
int64_t baseChunkSize,
|
||||
int64_t minShardReadBandwidthPerKSeconds) {
|
||||
std::vector<ReadHotRangeWithMetrics> toReturn;
|
||||
double shardSize = (double)byteSample.getEstimate(shard);
|
||||
int64_t shardReadBandwidth = bytesReadSample.getEstimate(shard);
|
||||
if (shardReadBandwidth * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS <=
|
||||
|
@ -428,7 +429,9 @@ struct StorageServerMetrics {
|
|||
if (shardSize <= baseChunkSize) {
|
||||
// Shard is small, use it as is
|
||||
if (bytesReadSample.getEstimate(shard) > (readDensityRatio * shardSize)) {
|
||||
toReturn.push_back(shard);
|
||||
toReturn.emplace_back(shard, bytesReadSample.getEstimate(shard) / shardSize,
|
||||
bytesReadSample.getEstimate(shard) /
|
||||
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL);
|
||||
}
|
||||
return toReturn;
|
||||
}
|
||||
|
@ -450,14 +453,15 @@ struct StorageServerMetrics {
|
|||
if (bytesReadSample.getEstimate(KeyRangeRef(beginKey, *endKey)) >
|
||||
(readDensityRatio * std::max(baseChunkSize, byteSample.getEstimate(KeyRangeRef(beginKey, *endKey))))) {
|
||||
auto range = KeyRangeRef(beginKey, *endKey);
|
||||
if (!toReturn.empty() && toReturn.back().end == range.begin) {
|
||||
if (!toReturn.empty() && toReturn.back().keys.end == range.begin) {
|
||||
// in case two consecutive chunks both are over the ratio, merge them.
|
||||
auto updatedTail = KeyRangeRef(toReturn.back().begin, *endKey);
|
||||
range = KeyRangeRef(toReturn.back().keys.begin, *endKey);
|
||||
toReturn.pop_back();
|
||||
toReturn.push_back(updatedTail);
|
||||
} else {
|
||||
toReturn.push_back(range);
|
||||
}
|
||||
toReturn.emplace_back(
|
||||
range,
|
||||
(double)bytesReadSample.getEstimate(range) / std::max(baseChunkSize, byteSample.getEstimate(range)),
|
||||
bytesReadSample.getEstimate(range) / SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL);
|
||||
}
|
||||
beginKey = *endKey;
|
||||
endKey = byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) +
|
||||
|
@ -468,10 +472,10 @@ struct StorageServerMetrics {
|
|||
|
||||
void getReadHotRanges(ReadHotSubRangeRequest req) {
|
||||
ReadHotSubRangeReply reply;
|
||||
std::vector<KeyRangeRef> v = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
|
||||
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
|
||||
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
|
||||
reply.readHotRanges = VectorRef<KeyRangeRef>(v.data(), v.size());
|
||||
auto _ranges = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
|
||||
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
|
||||
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
|
||||
reply.readHotRanges = VectorRef(_ranges.data(), _ranges.size());
|
||||
req.reply.send(reply);
|
||||
}
|
||||
|
||||
|
@ -515,11 +519,11 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/simple") {
|
|||
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
|
||||
|
||||
vector<KeyRangeRef> t =
|
||||
std::vector<ReadHotRangeWithMetrics> t =
|
||||
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 2.0, 200 * sampleUnit, 0);
|
||||
|
||||
ASSERT(t.size() == 1 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("Bob"));
|
||||
ASSERT(t.size() == 1 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).keys.end == LiteralStringRef("Bob"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -546,12 +550,12 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/moreThanOneRange") {
|
|||
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Dah"), 300 * sampleUnit);
|
||||
|
||||
vector<KeyRangeRef> t =
|
||||
std::vector<ReadHotRangeWithMetrics> t =
|
||||
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 2.0, 200 * sampleUnit, 0);
|
||||
|
||||
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("Bob"));
|
||||
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
|
||||
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).keys.end == LiteralStringRef("Bob"));
|
||||
ASSERT(t.at(1).keys.begin == LiteralStringRef("Cat") && t.at(1).keys.end == LiteralStringRef("Dah"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -579,12 +583,12 @@ TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/consecutiveRanges") {
|
|||
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Dah"), 300 * sampleUnit);
|
||||
|
||||
vector<KeyRangeRef> t =
|
||||
std::vector<ReadHotRangeWithMetrics> t =
|
||||
ssm.getReadHotRanges(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 2.0, 200 * sampleUnit, 0);
|
||||
|
||||
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("But"));
|
||||
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
|
||||
ASSERT(t.size() == 2 && (*t.begin()).keys.begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).keys.end == LiteralStringRef("But"));
|
||||
ASSERT(t.at(1).keys.begin == LiteralStringRef("Cat") && t.at(1).keys.end == LiteralStringRef("Dah"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ struct ReadHotDetectionWorkload : TestWorkload {
|
|||
// TraceEvent("RHDCheckPhaseLog")
|
||||
// .detail("KeyRangeSize", sm.bytes)
|
||||
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
|
||||
Standalone<VectorRef<KeyRangeRef>> keyRanges = wait(tr.getReadHotRanges(self->wholeRange));
|
||||
Standalone<VectorRef<ReadHotRangeWithMetrics>> keyRanges = wait(tr.getReadHotRanges(self->wholeRange));
|
||||
// TraceEvent("RHDCheckPhaseLog")
|
||||
// .detail("KeyRangesSize", keyRanges.size())
|
||||
// .detail("ReadKey", self->readKey.printable().c_str())
|
||||
|
@ -107,7 +107,7 @@ struct ReadHotDetectionWorkload : TestWorkload {
|
|||
// .detail("KeyRangesBackEndKey", keyRanges.back().end);
|
||||
// Loose check.
|
||||
for (auto kr : keyRanges) {
|
||||
if (kr.contains(self->readKey)) {
|
||||
if (kr.keys.contains(self->readKey)) {
|
||||
self->passed = true;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue