Merge pull request #6149 from neethuhaneesha/rocksdbHistograms
KeyValueStoreRocksDB histograms to track latencies
This commit is contained in:
commit
3086941c12
|
@ -354,6 +354,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ROCKSDB_READ_QUEUE_SOFT_MAX, 500 );
|
||||
init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 );
|
||||
init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 );
|
||||
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
|
|
@ -286,6 +286,9 @@ public:
|
|||
int ROCKSDB_READ_QUEUE_HARD_MAX;
|
||||
int ROCKSDB_FETCH_QUEUE_SOFT_MAX;
|
||||
int ROCKSDB_FETCH_QUEUE_HARD_MAX;
|
||||
// These histograms are in read and write path which can cause performance overhead.
|
||||
// Set to 0 to disable histograms.
|
||||
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "flow/flow.h"
|
||||
#include "flow/IThreadPool.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/Histogram.h"
|
||||
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
|
@ -35,6 +36,25 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 :
|
|||
|
||||
namespace {
|
||||
|
||||
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
|
||||
const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency");
|
||||
const StringRef ROCKSDB_COMMIT_ACTION_HISTOGRAM = LiteralStringRef("RocksDBCommitAction");
|
||||
const StringRef ROCKSDB_COMMIT_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBCommitQueueWait");
|
||||
const StringRef ROCKSDB_WRITE_HISTOGRAM = LiteralStringRef("RocksDBWrite");
|
||||
const StringRef ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM = LiteralStringRef("RocksDBDeleteCompactRange");
|
||||
const StringRef ROCKSDB_READRANGE_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBReadRangeLatency");
|
||||
const StringRef ROCKSDB_READVALUE_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBReadValueLatency");
|
||||
const StringRef ROCKSDB_READPREFIX_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixLatency");
|
||||
const StringRef ROCKSDB_READRANGE_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadRangeAction");
|
||||
const StringRef ROCKSDB_READVALUE_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadValueAction");
|
||||
const StringRef ROCKSDB_READPREFIX_ACTION_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixAction");
|
||||
const StringRef ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadRangeQueueWait");
|
||||
const StringRef ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadValueQueueWait");
|
||||
const StringRef ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixQueueWait");
|
||||
const StringRef ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM = LiteralStringRef("RocksDBReadRangeNewIterator");
|
||||
const StringRef ROCKSDB_READVALUE_GET_HISTOGRAM = LiteralStringRef("RocksDBReadValueGet");
|
||||
const StringRef ROCKSDB_READPREFIX_GET_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixGet");
|
||||
|
||||
rocksdb::Slice toSlice(StringRef s) {
|
||||
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
|
||||
}
|
||||
|
@ -231,8 +251,28 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
struct Writer : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
UID id;
|
||||
Reference<Histogram> commitLatencyHistogram;
|
||||
Reference<Histogram> commitActionHistogram;
|
||||
Reference<Histogram> commitQueueWaitHistogram;
|
||||
Reference<Histogram> writeHistogram;
|
||||
Reference<Histogram> deleteCompactRangeHistogram;
|
||||
|
||||
explicit Writer(DB& db, UID id) : db(db), id(id) {}
|
||||
explicit Writer(DB& db, UID id)
|
||||
: db(db), id(id), commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_COMMIT_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
commitActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_COMMIT_ACTION_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
commitQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_COMMIT_QUEUEWAIT_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
writeHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_WRITE_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
deleteCompactRangeHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_DELETE_COMPACTRANGE_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)) {}
|
||||
|
||||
~Writer() override {
|
||||
if (db) {
|
||||
|
@ -301,9 +341,24 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
struct CommitAction : TypedAction<Writer, CommitAction> {
|
||||
std::unique_ptr<rocksdb::WriteBatch> batchToCommit;
|
||||
ThreadReturnPromise<Void> done;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
CommitAction() {
|
||||
if (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) {
|
||||
getHistograms = true;
|
||||
startTime = timer_monotonic();
|
||||
} else {
|
||||
getHistograms = false;
|
||||
}
|
||||
}
|
||||
};
|
||||
void action(CommitAction& a) {
|
||||
double commitBeginTime;
|
||||
if (a.getHistograms) {
|
||||
commitBeginTime = timer_monotonic();
|
||||
commitQueueWaitHistogram->sampleSeconds(commitBeginTime - a.startTime);
|
||||
}
|
||||
Standalone<VectorRef<KeyRangeRef>> deletes;
|
||||
DeleteVisitor dv(deletes, deletes.arena());
|
||||
ASSERT(a.batchToCommit->Iterate(&dv).ok());
|
||||
|
@ -311,17 +366,33 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange());
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
|
||||
|
||||
double writeBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto s = db->Write(options, a.batchToCommit.get());
|
||||
if (a.getHistograms) {
|
||||
writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime);
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
logRocksDBError(s, "Commit");
|
||||
a.done.sendError(statusToError(s));
|
||||
} else {
|
||||
a.done.send(Void());
|
||||
|
||||
double compactRangeBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
for (const auto& keyRange : deletes) {
|
||||
auto begin = toSlice(keyRange.begin);
|
||||
auto end = toSlice(keyRange.end);
|
||||
ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok());
|
||||
}
|
||||
if (a.getHistograms) {
|
||||
deleteCompactRangeHistogram->sampleSeconds(timer_monotonic() - compactRangeBeginTime);
|
||||
}
|
||||
}
|
||||
if (a.getHistograms) {
|
||||
double currTime = timer_monotonic();
|
||||
commitActionHistogram->sampleSeconds(currTime - commitBeginTime);
|
||||
commitLatencyHistogram->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -361,8 +432,56 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
double readValueTimeout;
|
||||
double readValuePrefixTimeout;
|
||||
double readRangeTimeout;
|
||||
Reference<Histogram> readRangeLatencyHistogram;
|
||||
Reference<Histogram> readValueLatencyHistogram;
|
||||
Reference<Histogram> readPrefixLatencyHistogram;
|
||||
Reference<Histogram> readRangeActionHistogram;
|
||||
Reference<Histogram> readValueActionHistogram;
|
||||
Reference<Histogram> readPrefixActionHistogram;
|
||||
Reference<Histogram> readRangeQueueWaitHistogram;
|
||||
Reference<Histogram> readValueQueueWaitHistogram;
|
||||
Reference<Histogram> readPrefixQueueWaitHistogram;
|
||||
Reference<Histogram> readRangeNewIteratorHistogram;
|
||||
Reference<Histogram> readValueGetHistogram;
|
||||
Reference<Histogram> readPrefixGetHistogram;
|
||||
|
||||
explicit Reader(DB& db) : db(db) {
|
||||
explicit Reader(DB& db)
|
||||
: db(db), readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readValueLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READVALUE_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readPrefixLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READPREFIX_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readRangeActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READRANGE_ACTION_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readValueActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READVALUE_ACTION_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readPrefixActionHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READPREFIX_ACTION_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readRangeQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readValueQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READVALUE_QUEUEWAIT_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readPrefixQueueWaitHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READPREFIX_QUEUEWAIT_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readRangeNewIteratorHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readValueGetHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READVALUE_GET_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
readPrefixGetHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READPREFIX_GET_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)) {
|
||||
if (g_network->isSimulated()) {
|
||||
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
|
||||
// very high load and single read thread cannot process all the load within the timeouts.
|
||||
|
@ -382,18 +501,26 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
Key key;
|
||||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValueAction(KeyRef key, Optional<UID> debugID)
|
||||
: key(key), debugID(debugID), startTime(timer_monotonic()) {}
|
||||
: key(key), debugID(debugID), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
|
||||
}
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValueAction& a) {
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
readValueQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
|
||||
}
|
||||
Optional<TraceBatch> traceBatch;
|
||||
if (a.debugID.present()) {
|
||||
traceBatch = { TraceBatch{} };
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
|
||||
}
|
||||
if (timer_monotonic() - a.startTime > readValueTimeout) {
|
||||
if (readBeginTime - a.startTime > readValueTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read value request timedout")
|
||||
.detail("Method", "ReadValueAction")
|
||||
|
@ -401,13 +528,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
|
||||
rocksdb::PinnableSlice value;
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readValueTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
db->GetEnv()->NowMicros() + (readValueTimeout - (readBeginTime - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
|
||||
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.getHistograms) {
|
||||
readValueGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime);
|
||||
}
|
||||
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
|
||||
traceBatch.get().dump();
|
||||
|
@ -420,6 +554,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
logRocksDBError(s, "ReadValue");
|
||||
a.result.sendError(statusToError(s));
|
||||
}
|
||||
|
||||
if (a.getHistograms) {
|
||||
double currTime = timer_monotonic();
|
||||
readValueActionHistogram->sampleSeconds(currTime - readBeginTime);
|
||||
readValueLatencyHistogram->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
|
||||
|
@ -427,12 +567,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
int maxLength;
|
||||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValuePrefixAction(Key key, int maxLength, Optional<UID> debugID)
|
||||
: key(key), maxLength(maxLength), debugID(debugID), startTime(timer_monotonic()){};
|
||||
: key(key), maxLength(maxLength), debugID(debugID), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
|
||||
}
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValuePrefixAction& a) {
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
readPrefixQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
|
||||
}
|
||||
Optional<TraceBatch> traceBatch;
|
||||
if (a.debugID.present()) {
|
||||
traceBatch = { TraceBatch{} };
|
||||
|
@ -440,7 +588,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
a.debugID.get().first(),
|
||||
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
}
|
||||
if (timer_monotonic() - a.startTime > readValuePrefixTimeout) {
|
||||
if (readBeginTime - a.startTime > readValuePrefixTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read value prefix request timedout")
|
||||
.detail("Method", "ReadValuePrefixAction")
|
||||
|
@ -448,13 +596,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
|
||||
rocksdb::PinnableSlice value;
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (readBeginTime - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
|
||||
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.getHistograms) {
|
||||
readPrefixGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime);
|
||||
}
|
||||
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValuePrefixDebug",
|
||||
a.debugID.get().first(),
|
||||
|
@ -470,19 +625,32 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
logRocksDBError(s, "ReadValuePrefix");
|
||||
a.result.sendError(statusToError(s));
|
||||
}
|
||||
if (a.getHistograms) {
|
||||
double currTime = timer_monotonic();
|
||||
readPrefixActionHistogram->sampleSeconds(currTime - readBeginTime);
|
||||
readPrefixLatencyHistogram->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
|
||||
KeyRange keys;
|
||||
int rowLimit, byteLimit;
|
||||
double startTime;
|
||||
bool getHistograms;
|
||||
ThreadReturnPromise<RangeResult> result;
|
||||
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit)
|
||||
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()) {}
|
||||
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()),
|
||||
getHistograms(
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) ? true : false) {
|
||||
}
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadRangeAction& a) {
|
||||
if (timer_monotonic() - a.startTime > readRangeTimeout) {
|
||||
double readBeginTime = timer_monotonic();
|
||||
if (a.getHistograms) {
|
||||
readRangeQueueWaitHistogram->sampleSeconds(readBeginTime - a.startTime);
|
||||
}
|
||||
if (readBeginTime - a.startTime > readRangeTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read range request timedout")
|
||||
.detail("Method", "ReadRangeAction")
|
||||
|
@ -499,7 +667,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
rocksdb::Status s;
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readRangeTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
db->GetEnv()->NowMicros() + (readRangeTimeout - (readBeginTime - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
// When using a prefix extractor, ensure that keys are returned in order even if they cross
|
||||
|
@ -508,7 +676,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (a.rowLimit >= 0) {
|
||||
auto endSlice = toSlice(a.keys.end);
|
||||
options.iterate_upper_bound = &endSlice;
|
||||
|
||||
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
|
||||
if (a.getHistograms) {
|
||||
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
|
||||
}
|
||||
|
||||
cursor->Seek(toSlice(a.keys.begin));
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
|
||||
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
|
||||
|
@ -532,7 +706,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
} else {
|
||||
auto beginSlice = toSlice(a.keys.begin);
|
||||
options.iterate_lower_bound = &beginSlice;
|
||||
|
||||
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
|
||||
if (a.getHistograms) {
|
||||
readRangeNewIteratorHistogram->sampleSeconds(timer_monotonic() - iterCreationBeginTime);
|
||||
}
|
||||
|
||||
cursor->SeekForPrev(toSlice(a.keys.end));
|
||||
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
|
||||
cursor->Prev();
|
||||
|
@ -569,6 +749,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
result.readThrough = result[result.size() - 1].key;
|
||||
}
|
||||
a.result.send(result);
|
||||
if (a.getHistograms) {
|
||||
double currTime = timer_monotonic();
|
||||
readRangeActionHistogram->sampleSeconds(currTime - readBeginTime);
|
||||
readRangeLatencyHistogram->sampleSeconds(currTime - a.startTime);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue