Fix metrics and add knobs
This commit is contained in:
parent
2c9e1dc754
commit
fcfe6a8c29
|
@ -401,6 +401,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
|
||||
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
|
||||
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
|
||||
init( ROCKSDB_METRICS_SAMPLE_INTERVAL, 0.0);
|
||||
init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 );
|
||||
init( ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT, 64000000000 ); // 64GB, Rocksdb option, Writes will slow down.
|
||||
init( ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT, 100000000000 ); // 100GB, Rocksdb option, Writes will stall.
|
||||
|
@ -413,6 +414,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ROCKSDB_COMPACTION_READAHEAD_SIZE, 32768 ); // 32 KB, performs bigger reads when doing compaction.
|
||||
init( ROCKSDB_BLOCK_SIZE, 32768 ); // 32 KB, size of the block in rocksdb cache.
|
||||
init( ENABLE_SHARDED_ROCKSDB, false );
|
||||
init( ROCKSDB_WRITE_BUFFER_SIZE, 1 << 30 ); // 1G
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
|
|
@ -330,6 +330,7 @@ public:
|
|||
std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY;
|
||||
bool ROCKSDB_PERFCONTEXT_ENABLE; // Enable rocks perf context metrics. May cause performance overhead
|
||||
double ROCKSDB_PERFCONTEXT_SAMPLE_RATE;
|
||||
double ROCKSDB_METRICS_SAMPLE_INTERVAL;
|
||||
int ROCKSDB_MAX_SUBCOMPACTIONS;
|
||||
int64_t ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT;
|
||||
int64_t ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT;
|
||||
|
@ -339,6 +340,7 @@ public:
|
|||
int64_t ROCKSDB_COMPACTION_READAHEAD_SIZE;
|
||||
int64_t ROCKSDB_BLOCK_SIZE;
|
||||
bool ENABLE_SHARDED_ROCKSDB;
|
||||
int64_t ROCKSDB_WRITE_BUFFER_SIZE;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
|
|
@ -219,7 +219,7 @@ const char* ShardOpToString(ShardOp op) {
|
|||
}
|
||||
}
|
||||
void logShardEvent(StringRef name, ShardOp op, Severity severity = SevInfo, const std::string& message = "") {
|
||||
TraceEvent e(severity, "ShardedRocksKVSShardEvent");
|
||||
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
||||
e.detail("Name", name).detail("Action", ShardOpToString(op));
|
||||
if (!message.empty()) {
|
||||
e.detail("Message", message);
|
||||
|
@ -230,7 +230,7 @@ void logShardEvent(StringRef name,
|
|||
ShardOp op,
|
||||
Severity severity = SevInfo,
|
||||
const std::string& message = "") {
|
||||
TraceEvent e(severity, "ShardedRocksKVSShardEvent");
|
||||
TraceEvent e(severity, "ShardedRocksDBKVSShardEvent");
|
||||
e.detail("Name", name).detail("Action", ShardOpToString(op)).detail("Begin", range.begin).detail("End", range.end);
|
||||
if (message != "") {
|
||||
e.detail("Message", message);
|
||||
|
@ -284,7 +284,7 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
|
|||
}
|
||||
|
||||
if (rocksdb_block_cache == nullptr) {
|
||||
rocksdb_block_cache = rocksdb::NewLRUCache(128);
|
||||
rocksdb_block_cache = rocksdb::NewLRUCache(SERVER_KNOBS->ROCKSDB_BLOCK_CACHE_SIZE);
|
||||
}
|
||||
bbOpts.block_cache = rocksdb_block_cache;
|
||||
|
||||
|
@ -301,6 +301,7 @@ rocksdb::Options getOptions() {
|
|||
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
|
||||
}
|
||||
|
||||
options.db_write_buffer_size = SERVER_KNOBS->ROCKSDB_WRITE_BUFFER_SIZE;
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
options.statistics->set_stats_level(rocksdb::kExceptHistogramOrTimers);
|
||||
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
|
||||
|
@ -673,7 +674,7 @@ public:
|
|||
std::shared_ptr<PhysicalShard> defaultShard = std::make_shared<PhysicalShard>(db, "default", handles[0]);
|
||||
columnFamilyMap[defaultShard->cf->GetID()] = defaultShard->cf;
|
||||
std::unique_ptr<DataShard> dataShard = std::make_unique<DataShard>(specialKeys, defaultShard.get());
|
||||
dataShardMap.insert(dataShard->range, dataShard.get());
|
||||
dataShardMap.insert(specialKeys, dataShard.get());
|
||||
defaultShard->dataShards[specialKeys.begin.toString()] = std::move(dataShard);
|
||||
physicalShards[defaultShard->id] = defaultShard;
|
||||
|
||||
|
@ -1045,7 +1046,7 @@ public:
|
|||
void logStats(rocksdb::DB* db);
|
||||
// PerfContext
|
||||
void resetPerfContext();
|
||||
void setPerfContext(int index);
|
||||
void collectPerfContext(int index);
|
||||
void logPerfContext(bool ignoreZeroMetric);
|
||||
// For Readers
|
||||
Reference<Histogram> getReadRangeLatencyHistogram(int index);
|
||||
|
@ -1349,9 +1350,9 @@ void RocksDBMetrics::logStats(rocksdb::DB* db) {
|
|||
e.detail(name, stat - cumulation);
|
||||
cumulation = stat;
|
||||
}
|
||||
for (auto& [name, property] : propertyStats) { // Zhe: TODO aggregation
|
||||
for (auto& [name, property] : propertyStats) {
|
||||
stat = 0;
|
||||
ASSERT(db->GetIntProperty(property, &stat));
|
||||
ASSERT(db->GetAggregatedIntProperty(property, &stat));
|
||||
e.detail(name, stat);
|
||||
}
|
||||
}
|
||||
|
@ -1360,22 +1361,22 @@ void RocksDBMetrics::logMemUsage(rocksdb::DB* db) {
|
|||
TraceEvent e(SevInfo, "ShardedRocksDBMemMetrics", debugID);
|
||||
uint64_t stat;
|
||||
ASSERT(db != nullptr);
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kBlockCacheUsage, &stat));
|
||||
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kBlockCacheUsage, &stat));
|
||||
e.detail("BlockCacheUsage", stat);
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kEstimateTableReadersMem, &stat));
|
||||
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kEstimateTableReadersMem, &stat));
|
||||
e.detail("EstimateSstReaderBytes", stat);
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kCurSizeAllMemTables, &stat));
|
||||
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kCurSizeAllMemTables, &stat));
|
||||
e.detail("AllMemtablesBytes", stat);
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kBlockCachePinnedUsage, &stat));
|
||||
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kBlockCachePinnedUsage, &stat));
|
||||
e.detail("BlockCachePinnedUsage", stat);
|
||||
}
|
||||
|
||||
void RocksDBMetrics::resetPerfContext() {
|
||||
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
|
||||
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableCount);
|
||||
rocksdb::get_perf_context()->Reset();
|
||||
}
|
||||
|
||||
void RocksDBMetrics::setPerfContext(int index) {
|
||||
void RocksDBMetrics::collectPerfContext(int index) {
|
||||
for (auto& [name, metric, vals] : perfContextMetrics) {
|
||||
vals[index] = getRocksdbPerfcontextMetric(metric);
|
||||
}
|
||||
|
@ -1391,12 +1392,7 @@ void RocksDBMetrics::logPerfContext(bool ignoreZeroMetric) {
|
|||
}
|
||||
if (ignoreZeroMetric && s == 0)
|
||||
continue;
|
||||
for (int i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; i++) {
|
||||
if (vals[i] != 0)
|
||||
e.detail("RD" + std::to_string(i) + name, vals[i]);
|
||||
}
|
||||
if (vals[SERVER_KNOBS->ROCKSDB_READ_PARALLELISM] != 0)
|
||||
e.detail("WR" + (std::string)name, vals[SERVER_KNOBS->ROCKSDB_READ_PARALLELISM]);
|
||||
e.detail(name, s);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1612,6 +1608,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
std::unordered_map<uint32_t, rocksdb::ColumnFamilyHandle*>* columnFamilyMap;
|
||||
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
|
||||
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
|
||||
double sampleStartTime;
|
||||
|
||||
explicit Writer(UID logId,
|
||||
int threadIndex,
|
||||
|
@ -1625,12 +1622,22 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
10, // fairness
|
||||
rocksdb::RateLimiter::Mode::kWritesOnly,
|
||||
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE)
|
||||
: nullptr) {}
|
||||
: nullptr),
|
||||
sampleStartTime(now()) {}
|
||||
|
||||
~Writer() override {}
|
||||
|
||||
void init() override {}
|
||||
|
||||
void sample() {
|
||||
if (SERVER_KNOBS->ROCKSDB_METRICS_SAMPLE_INTERVAL > 0 &&
|
||||
now() - sampleStartTime >= SERVER_KNOBS->ROCKSDB_METRICS_SAMPLE_INTERVAL) {
|
||||
rocksDBMetrics->collectPerfContext(threadIndex);
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
sampleStartTime = now();
|
||||
}
|
||||
}
|
||||
|
||||
struct OpenAction : TypedAction<Writer, OpenAction> {
|
||||
ShardManager* shardManager;
|
||||
rocksdb::Options dbOptions;
|
||||
|
@ -1709,7 +1716,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
ThreadReturnPromise<Void> done;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
bool getPerfContext;
|
||||
bool logShardMemUsage;
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
CommitAction(rocksdb::DB* db,
|
||||
|
@ -1724,12 +1730,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
} else {
|
||||
getHistograms = false;
|
||||
}
|
||||
if ((SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) &&
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE)) {
|
||||
getPerfContext = true;
|
||||
} else {
|
||||
getPerfContext = false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1798,9 +1798,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
}
|
||||
|
||||
void action(CommitAction& a) {
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
}
|
||||
double commitBeginTime;
|
||||
if (a.getHistograms) {
|
||||
commitBeginTime = timer_monotonic();
|
||||
|
@ -1832,9 +1829,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
rocksDBMetrics->getCommitLatencyHistogram()->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->setPerfContext(threadIndex);
|
||||
}
|
||||
sample();
|
||||
}
|
||||
|
||||
struct CloseAction : TypedAction<Writer, CloseAction> {
|
||||
|
@ -1864,9 +1859,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
double readRangeTimeout;
|
||||
int threadIndex;
|
||||
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
|
||||
double sampleStartTime;
|
||||
|
||||
explicit Reader(UID logId, int threadIndex, std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
|
||||
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics) {
|
||||
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics), sampleStartTime(now()) {
|
||||
if (g_network->isSimulated()) {
|
||||
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
|
||||
// very high load and single read thread cannot process all the load within the timeouts.
|
||||
|
@ -1882,33 +1878,33 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
|
||||
void init() override {}
|
||||
|
||||
void sample() {
|
||||
if (SERVER_KNOBS->ROCKSDB_METRICS_SAMPLE_INTERVAL > 0 &&
|
||||
now() - sampleStartTime >= SERVER_KNOBS->ROCKSDB_METRICS_SAMPLE_INTERVAL) {
|
||||
rocksDBMetrics->collectPerfContext(threadIndex);
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
sampleStartTime = now();
|
||||
}
|
||||
}
|
||||
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
|
||||
Key key;
|
||||
PhysicalShard* shard;
|
||||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
bool getPerfContext;
|
||||
bool logShardMemUsage;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
|
||||
ReadValueAction(KeyRef key, PhysicalShard* shard, Optional<UID> debugID)
|
||||
: key(key), shard(shard), debugID(debugID), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
|
||||
getPerfContext(
|
||||
(SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) &&
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE)
|
||||
? true
|
||||
: false) {}
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
|
||||
}
|
||||
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
|
||||
void action(ReadValueAction& a) {
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
}
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
rocksDBMetrics->getReadValueQueueWaitHistogram(threadIndex)->sampleSeconds(readBeginTime - a.startTime);
|
||||
|
@ -1961,9 +1957,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
rocksDBMetrics->getReadValueActionHistogram(threadIndex)->sampleSeconds(currTime - readBeginTime);
|
||||
rocksDBMetrics->getReadValueLatencyHistogram(threadIndex)->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->setPerfContext(threadIndex);
|
||||
}
|
||||
|
||||
sample();
|
||||
}
|
||||
|
||||
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
|
||||
|
@ -1973,26 +1968,18 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
bool getPerfContext;
|
||||
bool logShardMemUsage;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
|
||||
ReadValuePrefixAction(Key key, int maxLength, PhysicalShard* shard, Optional<UID> debugID)
|
||||
: key(key), maxLength(maxLength), shard(shard), debugID(debugID), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
|
||||
getPerfContext(
|
||||
(SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) &&
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE)
|
||||
? true
|
||||
: false){};
|
||||
getHistograms((deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE)
|
||||
? true
|
||||
: false){};
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
|
||||
void action(ReadValuePrefixAction& a) {
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
}
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
rocksDBMetrics->getReadPrefixQueueWaitHistogram(threadIndex)
|
||||
|
@ -2050,9 +2037,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
rocksDBMetrics->getReadPrefixActionHistogram(threadIndex)->sampleSeconds(currTime - readBeginTime);
|
||||
rocksDBMetrics->getReadPrefixLatencyHistogram(threadIndex)->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->setPerfContext(threadIndex);
|
||||
}
|
||||
|
||||
sample();
|
||||
}
|
||||
|
||||
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
|
||||
|
@ -2061,18 +2047,12 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
int rowLimit, byteLimit;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
bool getPerfContext;
|
||||
bool logShardMemUsage;
|
||||
ThreadReturnPromise<RangeResult> result;
|
||||
ReadRangeAction(KeyRange keys, std::vector<DataShard*> shards, int rowLimit, int byteLimit)
|
||||
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false),
|
||||
getPerfContext(
|
||||
(SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) &&
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE)
|
||||
? true
|
||||
: false) {
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
|
||||
for (const DataShard* shard : shards) {
|
||||
if (shard != nullptr) {
|
||||
shardRanges.emplace_back(shard->physicalShard, keys & shard->range);
|
||||
|
@ -2083,9 +2063,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
};
|
||||
|
||||
void action(ReadRangeAction& a) {
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->resetPerfContext();
|
||||
}
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
rocksDBMetrics->getReadRangeQueueWaitHistogram(threadIndex)->sampleSeconds(readBeginTime - a.startTime);
|
||||
|
@ -2149,9 +2126,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
|
|||
rocksDBMetrics->getReadRangeActionHistogram(threadIndex)->sampleSeconds(currTime - readBeginTime);
|
||||
rocksDBMetrics->getReadRangeLatencyHistogram(threadIndex)->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
if (a.getPerfContext) {
|
||||
rocksDBMetrics->setPerfContext(threadIndex);
|
||||
}
|
||||
|
||||
sample();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue