addRocksDBPerfContextMetrics

This commit is contained in:
Zhe Wang 2022-02-17 17:04:01 -05:00
parent 344a14b010
commit f14e08a991
3 changed files with 349 additions and 11 deletions

View File

@ -364,7 +364,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
init( ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE, true );
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -295,6 +295,8 @@ public:
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
bool ROCKSDB_PERFCONTEXT_ENABLE; // Enable rocks perf context metrics. May cause performance overhead
double ROCKSDB_PERFCONTEXT_SAMPLE_RATE;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -11,6 +11,8 @@
#include <rocksdb/version.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include <rocksdb/rate_limiter.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/c.h>
#if defined __has_include
#if __has_include(<liburing.h>)
#include <liburing.h>
@ -312,6 +314,271 @@ private:
uint64_t iteratorsReuseCount;
};
class PerfContextMetrics {
public:
PerfContextMetrics();
void reset();
void set(int index);
void log(bool ignoreZeroMetric);
private:
std::vector<std::tuple<const char*, int, std::vector<uint64_t>>> metrics;
uint64_t getRocksdbPerfcontextMetric(int metric);
};
PerfContextMetrics::PerfContextMetrics() {
metrics = {
{ "UserKeyComparisonCount", rocksdb_user_key_comparison_count, {} },
{ "BlockCacheHitCount", rocksdb_block_cache_hit_count, {} },
{ "BlockReadCount", rocksdb_block_read_count, {} },
{ "BlockReadByte", rocksdb_block_read_byte, {} },
{ "BlockReadTime", rocksdb_block_read_time, {} },
{ "BlockChecksumTime", rocksdb_block_checksum_time, {} },
{ "BlockDecompressTime", rocksdb_block_decompress_time, {} },
{ "GetReadBytes", rocksdb_get_read_bytes, {} },
{ "MultigetReadBytes", rocksdb_multiget_read_bytes, {} },
{ "IterReadBytes", rocksdb_iter_read_bytes, {} },
{ "InternalKeySkippedCount", rocksdb_internal_key_skipped_count, {} },
{ "InternalDeleteSkippedCount", rocksdb_internal_delete_skipped_count, {} },
{ "InternalRecentSkippedCount", rocksdb_internal_recent_skipped_count, {} },
{ "InternalMergeCount", rocksdb_internal_merge_count, {} },
{ "GetSnapshotTime", rocksdb_get_snapshot_time, {} },
{ "GetFromMemtableTime", rocksdb_get_from_memtable_time, {} },
{ "GetFromMemtableCount", rocksdb_get_from_memtable_count, {} },
{ "GetPostProcessTime", rocksdb_get_post_process_time, {} },
{ "GetFromOutputFilesTime", rocksdb_get_from_output_files_time, {} },
{ "SeekOnMemtableTime", rocksdb_seek_on_memtable_time, {} },
{ "SeekOnMemtableCount", rocksdb_seek_on_memtable_count, {} },
{ "NextOnMemtableCount", rocksdb_next_on_memtable_count, {} },
{ "PrevOnMemtableCount", rocksdb_prev_on_memtable_count, {} },
{ "SeekChildSeekTime", rocksdb_seek_child_seek_time, {} },
{ "SeekChildSeekCount", rocksdb_seek_child_seek_count, {} },
{ "SeekMinHeapTime", rocksdb_seek_min_heap_time, {} },
{ "SeekMaxHeapTime", rocksdb_seek_max_heap_time, {} },
{ "SeekInternalSeekTime", rocksdb_seek_internal_seek_time, {} },
{ "FindNextUserEntryTime", rocksdb_find_next_user_entry_time, {} },
{ "WriteWalTime", rocksdb_write_wal_time, {} },
{ "WriteMemtableTime", rocksdb_write_memtable_time, {} },
{ "WriteDelayTime", rocksdb_write_delay_time, {} },
{ "WritePreAndPostProcessTime", rocksdb_write_pre_and_post_process_time, {} },
{ "DbMutexLockNanos", rocksdb_db_mutex_lock_nanos, {} },
{ "DbConditionWaitNanos", rocksdb_db_condition_wait_nanos, {} },
{ "MergeOperatorTimeNanos", rocksdb_merge_operator_time_nanos, {} },
{ "ReadIndexBlockNanos", rocksdb_read_index_block_nanos, {} },
{ "ReadFilterBlockNanos", rocksdb_read_filter_block_nanos, {} },
{ "NewTableBlockIterNanos", rocksdb_new_table_block_iter_nanos, {} },
{ "NewTableIteratorNanos", rocksdb_new_table_iterator_nanos, {} },
{ "BlockSeekNanos", rocksdb_block_seek_nanos, {} },
{ "FindTableNanos", rocksdb_find_table_nanos, {} },
{ "BloomMemtableHitCount", rocksdb_bloom_memtable_hit_count, {} },
{ "BloomMemtableMissCount", rocksdb_bloom_memtable_miss_count, {} },
{ "BloomSstHitCount", rocksdb_bloom_sst_hit_count, {} },
{ "BloomSstMissCount", rocksdb_bloom_sst_miss_count, {} },
{ "KeyLockWaitTime", rocksdb_key_lock_wait_time, {} },
{ "KeyLockWaitCount", rocksdb_key_lock_wait_count, {} },
{ "EnvNewSequentialFileNanos", rocksdb_env_new_sequential_file_nanos, {} },
{ "EnvNewRandomAccessFileNanos", rocksdb_env_new_random_access_file_nanos, {} },
{ "EnvNewWritableFileNanos", rocksdb_env_new_writable_file_nanos, {} },
{ "EnvReuseWritableFileNanos", rocksdb_env_reuse_writable_file_nanos, {} },
{ "EnvNewRandomRwFileNanos", rocksdb_env_new_random_rw_file_nanos, {} },
{ "EnvNewDirectoryNanos", rocksdb_env_new_directory_nanos, {} },
{ "EnvFileExistsNanos", rocksdb_env_file_exists_nanos, {} },
{ "EnvGetChildrenNanos", rocksdb_env_get_children_nanos, {} },
{ "EnvGetChildrenFileAttributesNanos", rocksdb_env_get_children_file_attributes_nanos, {} },
{ "EnvDeleteFileNanos", rocksdb_env_delete_file_nanos, {} },
{ "EnvCreateDirNanos", rocksdb_env_create_dir_nanos, {} },
{ "EnvCreateDirIfMissingNanos", rocksdb_env_create_dir_if_missing_nanos, {} },
{ "EnvDeleteDirNanos", rocksdb_env_delete_dir_nanos, {} },
{ "EnvGetFileSizeNanos", rocksdb_env_get_file_size_nanos, {} },
{ "EnvGetFileModificationTimeNanos", rocksdb_env_get_file_modification_time_nanos, {} },
{ "EnvRenameFileNanos", rocksdb_env_rename_file_nanos, {} },
{ "EnvLinkFileNanos", rocksdb_env_link_file_nanos, {} },
{ "EnvLockFileNanos", rocksdb_env_lock_file_nanos, {} },
{ "EnvUnlockFileNanos", rocksdb_env_unlock_file_nanos, {} },
{ "EnvNewLoggerNanos", rocksdb_env_new_logger_nanos, {} },
};
for (auto& [name, metric, vals] : metrics) { // readers, then writer
for (int i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; i++) {
vals.push_back(0); // add reader
}
vals.push_back(0); // add writer
}
}
void PerfContextMetrics::reset() {
rocksdb::get_perf_context()->Reset();
}
void PerfContextMetrics::set(int index) {
for (auto& [name, metric, vals] : metrics) {
vals[index] = getRocksdbPerfcontextMetric(metric);
}
}
void PerfContextMetrics::log(bool ignoreZeroMetric) {
TraceEvent e("RocksDBPerfContextMetrics");
e.setMaxEventLength(20000);
for (auto& [name, metric, vals] : metrics) {
uint64_t s = 0;
for (auto& v : vals) {
s = s + v;
}
if (ignoreZeroMetric && s == 0)
continue;
e.detail("Sum" + (std::string)name, s);
for (int i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; i++) {
if (vals[i] != 0)
e.detail("RD" + std::to_string(i) + name, vals[i]);
}
if (vals[SERVER_KNOBS->ROCKSDB_READ_PARALLELISM] != 0)
e.detail("WR" + (std::string)name, vals[SERVER_KNOBS->ROCKSDB_READ_PARALLELISM]);
}
}
uint64_t PerfContextMetrics::getRocksdbPerfcontextMetric(int metric) {
switch (metric) {
case rocksdb_user_key_comparison_count:
return rocksdb::get_perf_context()->user_key_comparison_count;
case rocksdb_block_cache_hit_count:
return rocksdb::get_perf_context()->block_cache_hit_count;
case rocksdb_block_read_count:
return rocksdb::get_perf_context()->block_read_count;
case rocksdb_block_read_byte:
return rocksdb::get_perf_context()->block_read_byte;
case rocksdb_block_read_time:
return rocksdb::get_perf_context()->block_read_time;
case rocksdb_block_checksum_time:
return rocksdb::get_perf_context()->block_checksum_time;
case rocksdb_block_decompress_time:
return rocksdb::get_perf_context()->block_decompress_time;
case rocksdb_get_read_bytes:
return rocksdb::get_perf_context()->get_read_bytes;
case rocksdb_multiget_read_bytes:
return rocksdb::get_perf_context()->multiget_read_bytes;
case rocksdb_iter_read_bytes:
return rocksdb::get_perf_context()->iter_read_bytes;
case rocksdb_internal_key_skipped_count:
return rocksdb::get_perf_context()->internal_key_skipped_count;
case rocksdb_internal_delete_skipped_count:
return rocksdb::get_perf_context()->internal_delete_skipped_count;
case rocksdb_internal_recent_skipped_count:
return rocksdb::get_perf_context()->internal_recent_skipped_count;
case rocksdb_internal_merge_count:
return rocksdb::get_perf_context()->internal_merge_count;
case rocksdb_get_snapshot_time:
return rocksdb::get_perf_context()->get_snapshot_time;
case rocksdb_get_from_memtable_time:
return rocksdb::get_perf_context()->get_from_memtable_time;
case rocksdb_get_from_memtable_count:
return rocksdb::get_perf_context()->get_from_memtable_count;
case rocksdb_get_post_process_time:
return rocksdb::get_perf_context()->get_post_process_time;
case rocksdb_get_from_output_files_time:
return rocksdb::get_perf_context()->get_from_output_files_time;
case rocksdb_seek_on_memtable_time:
return rocksdb::get_perf_context()->seek_on_memtable_time;
case rocksdb_seek_on_memtable_count:
return rocksdb::get_perf_context()->seek_on_memtable_count;
case rocksdb_next_on_memtable_count:
return rocksdb::get_perf_context()->next_on_memtable_count;
case rocksdb_prev_on_memtable_count:
return rocksdb::get_perf_context()->prev_on_memtable_count;
case rocksdb_seek_child_seek_time:
return rocksdb::get_perf_context()->seek_child_seek_time;
case rocksdb_seek_child_seek_count:
return rocksdb::get_perf_context()->seek_child_seek_count;
case rocksdb_seek_min_heap_time:
return rocksdb::get_perf_context()->seek_min_heap_time;
case rocksdb_seek_max_heap_time:
return rocksdb::get_perf_context()->seek_max_heap_time;
case rocksdb_seek_internal_seek_time:
return rocksdb::get_perf_context()->seek_internal_seek_time;
case rocksdb_find_next_user_entry_time:
return rocksdb::get_perf_context()->find_next_user_entry_time;
case rocksdb_write_wal_time:
return rocksdb::get_perf_context()->write_wal_time;
case rocksdb_write_memtable_time:
return rocksdb::get_perf_context()->write_memtable_time;
case rocksdb_write_delay_time:
return rocksdb::get_perf_context()->write_delay_time;
case rocksdb_write_pre_and_post_process_time:
return rocksdb::get_perf_context()->write_pre_and_post_process_time;
case rocksdb_db_mutex_lock_nanos:
return rocksdb::get_perf_context()->db_mutex_lock_nanos;
case rocksdb_db_condition_wait_nanos:
return rocksdb::get_perf_context()->db_condition_wait_nanos;
case rocksdb_merge_operator_time_nanos:
return rocksdb::get_perf_context()->merge_operator_time_nanos;
case rocksdb_read_index_block_nanos:
return rocksdb::get_perf_context()->read_index_block_nanos;
case rocksdb_read_filter_block_nanos:
return rocksdb::get_perf_context()->read_filter_block_nanos;
case rocksdb_new_table_block_iter_nanos:
return rocksdb::get_perf_context()->new_table_block_iter_nanos;
case rocksdb_new_table_iterator_nanos:
return rocksdb::get_perf_context()->new_table_iterator_nanos;
case rocksdb_block_seek_nanos:
return rocksdb::get_perf_context()->block_seek_nanos;
case rocksdb_find_table_nanos:
return rocksdb::get_perf_context()->find_table_nanos;
case rocksdb_bloom_memtable_hit_count:
return rocksdb::get_perf_context()->bloom_memtable_hit_count;
case rocksdb_bloom_memtable_miss_count:
return rocksdb::get_perf_context()->bloom_memtable_miss_count;
case rocksdb_bloom_sst_hit_count:
return rocksdb::get_perf_context()->bloom_sst_hit_count;
case rocksdb_bloom_sst_miss_count:
return rocksdb::get_perf_context()->bloom_sst_miss_count;
case rocksdb_key_lock_wait_time:
return rocksdb::get_perf_context()->key_lock_wait_time;
case rocksdb_key_lock_wait_count:
return rocksdb::get_perf_context()->key_lock_wait_count;
case rocksdb_env_new_sequential_file_nanos:
return rocksdb::get_perf_context()->env_new_sequential_file_nanos;
case rocksdb_env_new_random_access_file_nanos:
return rocksdb::get_perf_context()->env_new_random_access_file_nanos;
case rocksdb_env_new_writable_file_nanos:
return rocksdb::get_perf_context()->env_new_writable_file_nanos;
case rocksdb_env_reuse_writable_file_nanos:
return rocksdb::get_perf_context()->env_reuse_writable_file_nanos;
case rocksdb_env_new_random_rw_file_nanos:
return rocksdb::get_perf_context()->env_new_random_rw_file_nanos;
case rocksdb_env_new_directory_nanos:
return rocksdb::get_perf_context()->env_new_directory_nanos;
case rocksdb_env_file_exists_nanos:
return rocksdb::get_perf_context()->env_file_exists_nanos;
case rocksdb_env_get_children_nanos:
return rocksdb::get_perf_context()->env_get_children_nanos;
case rocksdb_env_get_children_file_attributes_nanos:
return rocksdb::get_perf_context()->env_get_children_file_attributes_nanos;
case rocksdb_env_delete_file_nanos:
return rocksdb::get_perf_context()->env_delete_file_nanos;
case rocksdb_env_create_dir_nanos:
return rocksdb::get_perf_context()->env_create_dir_nanos;
case rocksdb_env_create_dir_if_missing_nanos:
return rocksdb::get_perf_context()->env_create_dir_if_missing_nanos;
case rocksdb_env_delete_dir_nanos:
return rocksdb::get_perf_context()->env_delete_dir_nanos;
case rocksdb_env_get_file_size_nanos:
return rocksdb::get_perf_context()->env_get_file_size_nanos;
case rocksdb_env_get_file_modification_time_nanos:
return rocksdb::get_perf_context()->env_get_file_modification_time_nanos;
case rocksdb_env_rename_file_nanos:
return rocksdb::get_perf_context()->env_rename_file_nanos;
case rocksdb_env_link_file_nanos:
return rocksdb::get_perf_context()->env_link_file_nanos;
case rocksdb_env_lock_file_nanos:
return rocksdb::get_perf_context()->env_lock_file_nanos;
case rocksdb_env_unlock_file_nanos:
return rocksdb::get_perf_context()->env_unlock_file_nanos;
case rocksdb_env_new_logger_nanos:
return rocksdb::get_perf_context()->env_new_logger_nanos;
default:
break;
}
return 0;
}
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
loop {
@ -336,6 +603,7 @@ ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetc
}
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics,
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
rocksdb::DB* db,
std::shared_ptr<ReadIteratorPool> readIterPool) {
state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = {
@ -431,6 +699,10 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
stat = readIterPool->numTimesReadIteratorsReused();
e.detail("NumTimesReadIteratorsReused", stat - readIteratorPoolStats["NumTimesReadIteratorsReused"]);
readIteratorPoolStats["NumTimesReadIteratorsReused"] = stat;
if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE) {
perfContextMetrics->log(true);
}
}
}
@ -458,6 +730,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
struct Writer : IThreadPoolReceiver {
DB& db;
UID id;
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
Reference<Histogram> commitLatencyHistogram;
@ -466,9 +739,16 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> writeHistogram;
Reference<Histogram> deleteCompactRangeHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
int threadIndex;
explicit Writer(DB& db, UID id, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), id(id), readIterPool(readIterPool),
explicit Writer(DB& db,
UID id,
std::shared_ptr<ReadIteratorPool> readIterPool,
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
int threadIndex)
: db(db), id(id), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
threadIndex(threadIndex),
rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0
? rocksdb::NewGenericRateLimiter(
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, // rate_bytes_per_sec
@ -491,7 +771,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Histogram::Unit::microseconds)),
deleteCompactRangeHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM,
Histogram::Unit::microseconds)) {}
Histogram::Unit::microseconds)) {
if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE) {
// Enable perf context on the same thread with the db thread
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
perfContextMetrics->reset();
}
}
~Writer() override {
if (db) {
@ -542,11 +828,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// The current thread and main thread are same when the code runs in simulation.
// blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger.
a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
a.metrics = rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
} else {
onMainThread([&] {
a.metrics = rocksDBMetricLogger(options.statistics, db, readIterPool) &&
a.metrics = rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
return Future<bool>(true);
}).blockUntilReady();
@ -586,6 +872,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
};
void action(CommitAction& a) {
bool doPerfContextMetrics =
SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE &&
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE);
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double commitBeginTime;
if (a.getHistograms) {
commitBeginTime = timer_monotonic();
@ -632,6 +924,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
commitActionHistogram->sampleSeconds(currTime - commitBeginTime);
commitLatencyHistogram->sampleSeconds(currTime - a.startTime);
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
}
struct CloseAction : TypedAction<Writer, CloseAction> {
@ -684,9 +979,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<Histogram> readValueGetHistogram;
Reference<Histogram> readPrefixGetHistogram;
std::shared_ptr<ReadIteratorPool> readIterPool;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
int threadIndex;
explicit Reader(DB& db, std::shared_ptr<ReadIteratorPool> readIterPool)
: db(db), readIterPool(readIterPool),
explicit Reader(DB& db,
std::shared_ptr<ReadIteratorPool> readIterPool,
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
int threadIndex)
: db(db), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), threadIndex(threadIndex),
readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
Histogram::Unit::microseconds)),
@ -734,6 +1034,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
readValuePrefixTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
}
if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE) {
// Enable perf context on the same thread with the db thread
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
perfContextMetrics->reset();
}
}
void init() override {}
@ -752,6 +1057,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValueAction& a) {
bool doPerfContextMetrics =
SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE &&
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE);
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
if (a.getHistograms) {
readValueQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
@ -801,6 +1112,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
readValueActionHistogram->sampleSeconds(currTime - readBeginTime);
readValueLatencyHistogram->sampleSeconds(currTime - a.startTime);
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
}
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
@ -818,6 +1132,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValuePrefixAction& a) {
bool doPerfContextMetrics =
SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE &&
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE);
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
if (a.getHistograms) {
readPrefixQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
@ -871,6 +1191,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
readPrefixActionHistogram->sampleSeconds(currTime - readBeginTime);
readPrefixLatencyHistogram->sampleSeconds(currTime - a.startTime);
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
}
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
@ -887,6 +1210,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
bool doPerfContextMetrics =
SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE &&
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE);
if (doPerfContextMetrics) {
perfContextMetrics->reset();
}
double readBeginTime = timer_monotonic();
if (a.getHistograms) {
readRangeQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
@ -983,10 +1312,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
readRangeActionHistogram->sampleSeconds(currTime - readBeginTime);
readRangeLatencyHistogram->sampleSeconds(currTime - a.startTime);
}
if (doPerfContextMetrics) {
perfContextMetrics->set(threadIndex);
}
}
};
DB db = nullptr;
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
std::string path;
UID id;
Reference<IThreadPool> writeThread;
@ -1015,7 +1348,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Counters counters;
explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), readIterPool(new ReadIteratorPool(db, path)),
: path(path), id(id), perfContextMetrics(new PerfContextMetrics()), readIterPool(new ReadIteratorPool(db, path)),
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
@ -1038,10 +1371,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
}
writeThread->addThread(new Writer(db, id, readIterPool), "fdb-rocksdb-wr");
writeThread->addThread(
new Writer(db, id, readIterPool, perfContextMetrics, SERVER_KNOBS->ROCKSDB_READ_PARALLELISM),
"fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db, readIterPool), "fdb-rocksdb-re");
readThreads->addThread(new Reader(db, readIterPool, perfContextMetrics, i), "fdb-rocksdb-re");
}
}