Enable per-level stats and enforce iterator upper/lower bounds

This commit is contained in:
Daniel Smith 2020-08-10 19:47:26 +00:00
parent 7c2e45d5a6
commit 1245143ec1
1 changed files with 17 additions and 6 deletions

View File

@ -128,11 +128,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
struct Reader : IThreadPoolReceiver {
DB& db;
rocksdb::ReadOptions readOptions;
explicit Reader(DB& db) : db(db) {}
void init() override { rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex); }
void init() override {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
rocksdb::get_perf_context()->EnablePerLevelPerfContext();
}
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
Key key;
@ -150,7 +152,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
rocksdb::PinnableSlice value;
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
traceBatch.get().dump();
@ -181,7 +183,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
@ -207,10 +209,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
auto now = timer_monotonic();
rocksdb::get_perf_context()->Reset();
rocksdb::get_iostats_context()->Reset();
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
Standalone<RangeResultRef> result;
int accumulatedBytes = 0;
rocksdb::Status s;
if (a.rowLimit >= 0) {
rocksdb::ReadOptions options;
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
accumulatedBytes < a.byteLimit) {
@ -219,7 +225,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
result.push_back_deep(result.arena(), kv);
cursor->Next();
}
s = cursor->status();
} else {
rocksdb::ReadOptions options;
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
@ -232,8 +243,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
result.push_back_deep(result.arena(), kv);
cursor->Prev();
}
s = cursor->status();
}
auto s = cursor->status();
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
}