Added the function used to determin sub-ranges within a read hot shard that has a readSize-to-byteSize ratio higher than the knob value. Alos added unit tests for that function.
This commit is contained in:
parent
1d6cd1007b
commit
7708034cc9
|
@ -131,7 +131,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( MAX_SHARD_BYTES, 500000000 );
|
||||
init( KEY_SERVER_SHARD_BYTES, 500000000 );
|
||||
bool buggifySmallReadBandwidth = randomize && BUGGIFY;
|
||||
init( SHARD_MAX_READ_DENSITY_RATIO, 2.0); // The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 2.0 was chosen to avoid reporting table scan as read hot.
|
||||
init( SHARD_MAX_READ_DENSITY_RATIO, 2.0); if (buggifySmallReadBandwidth ) SHARD_MAX_READ_DENSITY_RATIO = 0.1;
|
||||
/*
|
||||
The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 2.0 was chosen to avoid reporting table scan as read hot.
|
||||
*/
|
||||
init( SHARD_MAX_BYTES_READ_PER_KSEC_JITTER, 0.1 );
|
||||
bool buggifySmallBandwidthSplit = randomize && BUGGIFY;
|
||||
init( SHARD_MAX_BYTES_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000;
|
||||
|
|
|
@ -410,6 +410,58 @@ struct StorageServerMetrics {
|
|||
|
||||
Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay);
|
||||
|
||||
static vector<KeyRangeRef> getReadHotRanges(KeyRangeRef shard, StorageMetricSample& readSample,
|
||||
StorageMetricSample& byteSample, double readDensityRatio) {
|
||||
int64_t baseChunkSize = 200; // a fixed 10MB chunk. TODO: knob-ify it?
|
||||
int64_t chunkSize = baseChunkSize;
|
||||
vector<KeyRangeRef> toReturn;
|
||||
if (byteSample.getEstimate(shard) <= chunkSize) {
|
||||
// Shard is small, use it as is
|
||||
toReturn.push_back(shard);
|
||||
return toReturn;
|
||||
}
|
||||
KeyRef beginKey = shard.begin;
|
||||
IndexedSet<Key, int64_t>::iterator endKey =
|
||||
byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) + chunkSize);
|
||||
// printf("++++++++++++ Processing shard (%s, %s)\n", shard.begin.printable().c_str(),
|
||||
// shard.end.printable().c_str());
|
||||
while (endKey != byteSample.sample.end()) {
|
||||
// printf("+++++++++++++ Looking at range (%s, %s)\n",beginKey.printable().c_str(),
|
||||
// (*endKey).printable().c_str());
|
||||
if (*endKey > shard.end) endKey = byteSample.sample.lower_bound(shard.end);
|
||||
if (*endKey == beginKey) {
|
||||
chunkSize += baseChunkSize;
|
||||
endKey = byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) +
|
||||
chunkSize);
|
||||
continue;
|
||||
}
|
||||
// printf("+++++++++++++ This range has size %d and read sample size %d\n",
|
||||
// byteSample.getEstimate(KeyRangeRef(beginKey, *endKey)), readSample.getEstimate(KeyRangeRef(beginKey,
|
||||
// *endKey)));
|
||||
if (readSample.getEstimate(KeyRangeRef(beginKey, *endKey)) / chunkSize > readDensityRatio) {
|
||||
auto range = KeyRangeRef(beginKey, *endKey);
|
||||
if (!toReturn.empty() && toReturn.back().end == range.begin) {
|
||||
auto updatedTail =
|
||||
KeyRangeRef(toReturn.back().begin,
|
||||
*endKey); // in case two consecutive chunks both are over the ratio, merge them.
|
||||
toReturn.pop_back();
|
||||
toReturn.push_back(updatedTail);
|
||||
} else {
|
||||
toReturn.push_back(range);
|
||||
}
|
||||
}
|
||||
beginKey = *endKey;
|
||||
chunkSize = baseChunkSize;
|
||||
endKey =
|
||||
byteSample.sample.index(byteSample.sample.sumTo(byteSample.sample.lower_bound(beginKey)) + chunkSize);
|
||||
}
|
||||
// printf("Hot range vector contains [%d] ranges.\n", toReturn.size());
|
||||
// for (auto i = toReturn.begin(); i < toReturn.end(); i++) {
|
||||
// printf("Range [%s, %s) is a read dense range.\n", i->begin.printable().c_str(), i->end.printable().c_str());
|
||||
// }
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
private:
|
||||
static void collapse( KeyRangeMap<int>& map, KeyRef const& key ) {
|
||||
auto range = map.rangeContaining(key);
|
||||
|
@ -430,6 +482,97 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/simple") {
|
||||
|
||||
TransientStorageMetricSample readSample(1000);
|
||||
readSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Banana"), 2000);
|
||||
readSample.sample.insert(LiteralStringRef("Cat"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Cathode"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Dog"), 1000);
|
||||
|
||||
StorageMetricSample byteSample(10);
|
||||
byteSample.sample.insert(LiteralStringRef("A"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Absolute"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
byteSample.sample.insert(LiteralStringRef("Bah"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Banana"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Bob"), 200);
|
||||
byteSample.sample.insert(LiteralStringRef("But"), 100);
|
||||
byteSample.sample.insert(LiteralStringRef("Cat"), 300);
|
||||
|
||||
vector<KeyRangeRef> t = StorageServerMetrics::getReadHotRanges(
|
||||
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), readSample, byteSample, 2.0);
|
||||
|
||||
ASSERT(t.size() == 1 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("Bob"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/moreThanOneRange") {
|
||||
|
||||
TransientStorageMetricSample readSample(1000);
|
||||
readSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Banana"), 2000);
|
||||
readSample.sample.insert(LiteralStringRef("Cat"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Cathode"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Dog"), 5000);
|
||||
readSample.sample.insert(LiteralStringRef("Final"), 2000);
|
||||
|
||||
StorageMetricSample byteSample(10);
|
||||
byteSample.sample.insert(LiteralStringRef("A"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Absolute"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
byteSample.sample.insert(LiteralStringRef("Bah"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Banana"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Bob"), 200);
|
||||
byteSample.sample.insert(LiteralStringRef("But"), 100);
|
||||
byteSample.sample.insert(LiteralStringRef("Cat"), 300);
|
||||
byteSample.sample.insert(LiteralStringRef("Dah"), 300);
|
||||
|
||||
vector<KeyRangeRef> t = StorageServerMetrics::getReadHotRanges(
|
||||
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), readSample, byteSample, 2.0);
|
||||
|
||||
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("Bob"));
|
||||
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/StorageMetricSample/readHotDetect/consecutiveRanges") {
|
||||
|
||||
TransientStorageMetricSample readSample(1000);
|
||||
readSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Banana"), 2000);
|
||||
readSample.sample.insert(LiteralStringRef("Bucket"), 2000);
|
||||
readSample.sample.insert(LiteralStringRef("Cat"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Cathode"), 1000);
|
||||
readSample.sample.insert(LiteralStringRef("Dog"), 5000);
|
||||
readSample.sample.insert(LiteralStringRef("Final"), 2000);
|
||||
|
||||
StorageMetricSample byteSample(10);
|
||||
byteSample.sample.insert(LiteralStringRef("A"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Absolute"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Apple"), 1000);
|
||||
byteSample.sample.insert(LiteralStringRef("Bah"), 20);
|
||||
byteSample.sample.insert(LiteralStringRef("Banana"), 80);
|
||||
byteSample.sample.insert(LiteralStringRef("Bob"), 200);
|
||||
byteSample.sample.insert(LiteralStringRef("But"), 100);
|
||||
byteSample.sample.insert(LiteralStringRef("Cat"), 300);
|
||||
byteSample.sample.insert(LiteralStringRef("Dah"), 300);
|
||||
|
||||
vector<KeyRangeRef> t = StorageServerMetrics::getReadHotRanges(
|
||||
KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), readSample, byteSample, 2.0);
|
||||
|
||||
ASSERT(t.size() == 2 && (*t.begin()).begin == LiteralStringRef("Bah") &&
|
||||
(*t.begin()).end == LiteralStringRef("But"));
|
||||
ASSERT(t.at(1).begin == LiteralStringRef("Cat") && t.at(1).end == LiteralStringRef("Dah"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
//Contains information about whether or not a key-value pair should be included in a byte sample
|
||||
//Also contains size information about the byte sample
|
||||
struct ByteSampleInfo {
|
||||
|
|
|
@ -76,6 +76,7 @@ else()
|
|||
endif()
|
||||
add_fdb_test(TEST_FILES SlowTask.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES StorageMetricsSampleTests.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES StreamingWrite.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES ThreadSafety.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES Throttling.txt IGNORE)
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
testTitle=UnitTests
|
||||
testName=UnitTests
|
||||
startDelay=0
|
||||
useDB=false
|
||||
maxTestCases=0
|
||||
testsMatching=/fdbserver/StorageMetricSample/readHotDetect/
|
Loading…
Reference in New Issue