Adding boundaries to rocksdb read iterator pool. (#8584)
This commit is contained in:
parent
15625da4ce
commit
1daa346cb4
|
@ -407,6 +407,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||||
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
|
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
|
||||||
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
|
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
|
||||||
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
|
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
|
||||||
|
init( ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS, false ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS = deterministicRandom()->coinflip() ? true : false;
|
||||||
|
init( ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT, 200 );
|
||||||
// Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second.
|
// Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second.
|
||||||
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
|
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
|
||||||
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
|
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
|
||||||
|
|
|
@ -334,6 +334,8 @@ public:
|
||||||
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
|
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
|
||||||
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
|
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
|
||||||
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
|
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
|
||||||
|
bool ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS;
|
||||||
|
int ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT;
|
||||||
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
|
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
|
||||||
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
|
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
|
||||||
std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY;
|
std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY;
|
||||||
|
|
|
@ -397,13 +397,23 @@ struct Counters {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ReadIterator {
|
struct ReadIterator {
|
||||||
CF& cf;
|
|
||||||
uint64_t index; // incrementing counter to uniquely identify read iterator.
|
uint64_t index; // incrementing counter to uniquely identify read iterator.
|
||||||
bool inUse;
|
bool inUse;
|
||||||
std::shared_ptr<rocksdb::Iterator> iter;
|
std::shared_ptr<rocksdb::Iterator> iter;
|
||||||
double creationTime;
|
double creationTime;
|
||||||
|
KeyRange keyRange;
|
||||||
|
std::shared_ptr<rocksdb::Slice> beginSlice, endSlice;
|
||||||
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options)
|
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options)
|
||||||
: cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
|
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
|
||||||
|
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions options, KeyRange keyRange)
|
||||||
|
: index(index), inUse(true), creationTime(now()), keyRange(keyRange) {
|
||||||
|
beginSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
|
||||||
|
options.iterate_lower_bound = beginSlice.get();
|
||||||
|
endSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
|
||||||
|
options.iterate_upper_bound = endSlice.get();
|
||||||
|
|
||||||
|
iter = std::shared_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -426,42 +436,84 @@ public:
|
||||||
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
|
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
|
||||||
TraceEvent("ReadIteratorPool", id)
|
TraceEvent("ReadIteratorPool", id)
|
||||||
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
|
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
|
||||||
|
.detail("KnobRocksDBReadRangeReuseBoundedIterators",
|
||||||
|
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS)
|
||||||
|
.detail("KnobRocksDBReadRangeBoundedIteratorsMaxLimit",
|
||||||
|
SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT)
|
||||||
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
|
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
|
||||||
|
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS &&
|
||||||
|
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
|
||||||
|
TraceEvent(SevWarn, "ReadIteratorKnobsMismatch");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called on every db commit.
|
// Called on every db commit.
|
||||||
void update() {
|
void update() {
|
||||||
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
|
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS ||
|
||||||
|
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
iteratorsMap.clear();
|
iteratorsMap.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called on every read operation.
|
// Called on every read operation.
|
||||||
ReadIterator getIterator() {
|
ReadIterator getIterator(KeyRange keyRange) {
|
||||||
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
|
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
mutex.lock();
|
||||||
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
|
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
|
||||||
if (!it->second.inUse) {
|
if (!it->second.inUse) {
|
||||||
it->second.inUse = true;
|
it->second.inUse = true;
|
||||||
iteratorsReuseCount++;
|
iteratorsReuseCount++;
|
||||||
return it->second;
|
ReadIterator iter = it->second;
|
||||||
|
mutex.unlock();
|
||||||
|
return iter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
index++;
|
index++;
|
||||||
ReadIterator iter(cf, index, db, readRangeOptions);
|
uint64_t readIteratorIndex = index;
|
||||||
iteratorsMap.insert({ index, iter });
|
mutex.unlock();
|
||||||
|
|
||||||
|
ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions);
|
||||||
|
mutex.lock();
|
||||||
|
iteratorsMap.insert({ readIteratorIndex, iter });
|
||||||
|
mutex.unlock();
|
||||||
|
return iter;
|
||||||
|
} else if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
|
||||||
|
// TODO: Based on the datasize in the keyrange, decide whether to store the iterator for reuse.
|
||||||
|
mutex.lock();
|
||||||
|
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
|
||||||
|
if (!it->second.inUse && it->second.keyRange.contains(keyRange)) {
|
||||||
|
it->second.inUse = true;
|
||||||
|
iteratorsReuseCount++;
|
||||||
|
ReadIterator iter = it->second;
|
||||||
|
mutex.unlock();
|
||||||
|
return iter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
index++;
|
||||||
|
uint64_t readIteratorIndex = index;
|
||||||
|
mutex.unlock();
|
||||||
|
|
||||||
|
ReadIterator iter(cf, readIteratorIndex, db, readRangeOptions, keyRange);
|
||||||
|
if (iteratorsMap.size() < SERVER_KNOBS->ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT) {
|
||||||
|
// Not storing more than ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT of iterators
|
||||||
|
// to avoid 'out of memory' issues.
|
||||||
|
mutex.lock();
|
||||||
|
iteratorsMap.insert({ readIteratorIndex, iter });
|
||||||
|
mutex.unlock();
|
||||||
|
}
|
||||||
return iter;
|
return iter;
|
||||||
} else {
|
} else {
|
||||||
index++;
|
index++;
|
||||||
ReadIterator iter(cf, index, db, readRangeOptions);
|
ReadIterator iter(cf, index, db, readRangeOptions, keyRange);
|
||||||
return iter;
|
return iter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called on every read operation, after the keys are collected.
|
// Called on every read operation, after the keys are collected.
|
||||||
void returnIterator(ReadIterator& iter) {
|
void returnIterator(ReadIterator& iter) {
|
||||||
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
|
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS ||
|
||||||
|
SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
|
||||||
std::lock_guard<std::mutex> lock(mutex);
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
it = iteratorsMap.find(iter.index);
|
it = iteratorsMap.find(iter.index);
|
||||||
// iterator found: put the iterator back to the pool(inUse=false).
|
// iterator found: put the iterator back to the pool(inUse=false).
|
||||||
|
@ -768,7 +820,7 @@ uint64_t PerfContextMetrics::getRocksdbPerfcontextMetric(int metric) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
|
ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> readIterPool) {
|
||||||
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
|
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS || SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS) {
|
||||||
loop {
|
loop {
|
||||||
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
|
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
|
||||||
readIterPool->refreshIterators();
|
readIterPool->refreshIterators();
|
||||||
|
@ -1559,7 +1611,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
rocksdb::Status s;
|
rocksdb::Status s;
|
||||||
if (a.rowLimit >= 0) {
|
if (a.rowLimit >= 0) {
|
||||||
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||||
ReadIterator readIter = readIterPool->getIterator();
|
ReadIterator readIter = readIterPool->getIterator(a.keys);
|
||||||
if (a.getHistograms) {
|
if (a.getHistograms) {
|
||||||
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
|
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
|
||||||
timer_monotonic() - iterCreationBeginTime));
|
timer_monotonic() - iterCreationBeginTime));
|
||||||
|
@ -1588,7 +1640,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||||
readIterPool->returnIterator(readIter);
|
readIterPool->returnIterator(readIter);
|
||||||
} else {
|
} else {
|
||||||
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
double iterCreationBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||||
ReadIterator readIter = readIterPool->getIterator();
|
ReadIterator readIter = readIterPool->getIterator(a.keys);
|
||||||
if (a.getHistograms) {
|
if (a.getHistograms) {
|
||||||
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
|
metricPromiseStream->send(std::make_pair(ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM.toString(),
|
||||||
timer_monotonic() - iterCreationBeginTime));
|
timer_monotonic() - iterCreationBeginTime));
|
||||||
|
|
Loading…
Reference in New Issue