Short circuit reads

This commit is contained in:
Daniel Smith 2020-08-11 18:49:40 +00:00
parent 9065fb3127
commit 8c20124612
1 changed files with 14 additions and 5 deletions

View File

@ -209,10 +209,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
Standalone<RangeResultRef> result;
if (a.rowLimit == 0 || a.byteLimit == 0) {
a.result.send(result);
}
auto now = timer_monotonic();
rocksdb::get_perf_context()->Reset();
rocksdb::get_iostats_context()->Reset();
Standalone<RangeResultRef> result;
int accumulatedBytes = 0;
rocksdb::Status s;
if (a.rowLimit >= 0) {
@ -221,11 +224,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Next();
}
s = cursor->status();
@ -239,11 +245,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
cursor->Prev();
}
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Prev();
}
s = cursor->status();