Use a single iterator pool for all physical shards. (#11699)

* Rewrite iterator pool.

* simulation fix
This commit is contained in:
Yao Xiao 2024-10-15 17:28:54 -07:00 committed by GitHub
parent c146ee0869
commit 7290369aac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 154 additions and 166 deletions

View File

@ -499,7 +499,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
init( ROCKSDB_PROBABILITY_REUSE_ITERATOR_SIM, 0.01 );
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip();
init( SHARDED_ROCKSDB_REUSE_ITERATORS, false );
init( SHARDED_ROCKSDB_REUSE_ITERATORS, false ); if (isSimulated) SHARDED_ROCKSDB_REUSE_ITERATORS = deterministicRandom()->coinflip();
init( ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS, false ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_BOUNDED_ITERATORS = deterministicRandom()->coinflip();
init( ROCKSDB_READ_RANGE_BOUNDED_ITERATORS_MAX_LIMIT, 200 );
// Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second.

View File

@ -574,7 +574,6 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
}
rocksdb::BlockBasedTableOptions bbOpts;
// TODO: Add a knob for the block cache size. (Default is 8 MB)
if (SERVER_KNOBS->SHARDED_ROCKSDB_PREFIX_LEN > 0) {
// Prefix blooms are used during Seek.
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(SERVER_KNOBS->SHARDED_ROCKSDB_PREFIX_LEN));
@ -715,121 +714,104 @@ rocksdb::ReadOptions getReadOptions() {
}
struct ReadIterator {
uint64_t index; // incrementing counter to uniquely identify read iterator.
bool inUse;
std::shared_ptr<rocksdb::Iterator> iter;
std::unique_ptr<rocksdb::Iterator> iter;
double creationTime;
KeyRange keyRange;
std::shared_ptr<rocksdb::Slice> beginSlice, endSlice;
std::unique_ptr<rocksdb::Slice> beginSlice, endSlice;
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db)
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(getReadOptions(), cf)) {}
ReadIterator(rocksdb::ColumnFamilyHandle* cf, uint64_t index, rocksdb::DB* db, const KeyRange& range)
: index(index), inUse(true), creationTime(now()), keyRange(range) {
ReadIterator(rocksdb::ColumnFamilyHandle* cf, rocksdb::DB* db)
: creationTime(now()), iter(db->NewIterator(getReadOptions(), cf)) {}
ReadIterator(rocksdb::ColumnFamilyHandle* cf, rocksdb::DB* db, const KeyRange& range)
: creationTime(now()), keyRange(range) {
auto options = getReadOptions();
beginSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
beginSlice = std::unique_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.begin)));
options.iterate_lower_bound = beginSlice.get();
endSlice = std::shared_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
endSlice = std::unique_ptr<rocksdb::Slice>(new rocksdb::Slice(toSlice(keyRange.end)));
options.iterate_upper_bound = endSlice.get();
iter = std::shared_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
iter = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options, cf));
}
};
/*
ReadIteratorPool: Collection of iterators. Reuses iterators on non-concurrent multiple read operations,
instead of creating and deleting for every read.
Read: IteratorPool provides an unused iterator if exists or creates and gives a new iterator.
Returns back the iterator after the read is done.
Write: Iterators in the pool are deleted, forcing new iterator creation on next reads. The iterators
which are currently used by the reads can continue using the iterator as it is a shared_ptr. Once
the read is processed, shared_ptr goes out of scope and gets deleted. Eventually the iterator object
gets deleted as the ref count becomes 0.
*/
class ReadIteratorPool {
// Stores iterators for all shards for future reuse. One iterator is stored per shard.
class IteratorPool {
public:
ReadIteratorPool(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, const std::string& path)
: db(db), cf(cf), index(0), iteratorsReuseCount(0) {
ASSERT(db);
ASSERT(cf);
TraceEvent(SevVerbose, "ShardedRocksReadIteratorPool")
.detail("Path", path)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->SHARDED_ROCKSDB_PREFIX_LEN);
}
IteratorPool() {}
// Called on every db commit.
void update() {
if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
iteratorsMap.clear();
}
}
// Called on every read operation.
ReadIterator getIterator(const KeyRange& range) {
// Shared iterators are not bounded.
if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
for (it = iteratorsMap.begin(); it != iteratorsMap.end(); it++) {
if (!it->second.inUse) {
it->second.inUse = true;
iteratorsReuseCount++;
return it->second;
}
}
index++;
ReadIterator iter(cf, index, db);
iteratorsMap.insert({ index, iter });
return iter;
std::shared_ptr<ReadIterator> getIterator(const std::string& id) {
std::unique_lock<std::mutex> lock(mu);
auto it = pool.find(id);
if (it == pool.end()) {
++numNewIterators;
return nullptr;
} else {
index++;
ReadIterator iter(cf, index, db, range);
return iter;
auto ret = it->second;
pool.erase(it);
++numReusedIters;
return ret;
}
}
// Called on every read operation, after the keys are collected.
void returnIterator(ReadIterator& iter) {
if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.find(iter.index);
// iterator found: put the iterator back to the pool(inUse=false).
// iterator not found: update would have removed the iterator from pool, so nothing to do.
if (it != iteratorsMap.end()) {
ASSERT(it->second.inUse);
it->second.inUse = false;
}
void returnIterator(const std::string& id, std::shared_ptr<ReadIterator> iterator) {
ASSERT(iterator != nullptr);
std::unique_lock<std::mutex> lock(mu);
auto it = pool.find(id);
if (it != pool.end()) {
// An iterator already exist in the pool, replace it any way.
++numReplacedIters;
}
pool[id] = iterator;
}
// Called for every ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME seconds in a loop.
void refreshIterators() {
std::lock_guard<std::mutex> lock(mutex);
it = iteratorsMap.begin();
while (it != iteratorsMap.end()) {
if (now() - it->second.creationTime > SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME) {
it = iteratorsMap.erase(it);
void refresh() {
std::unique_lock<std::mutex> lock(mu);
auto poolSize = pool.size();
auto it = pool.begin();
auto currTime = now();
int refreshedIterCount = 0;
while (it != pool.end()) {
if (currTime - it->second->creationTime > SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME) {
it = pool.erase(it);
++refreshedIterCount;
} else {
it++;
++it;
}
}
TraceEvent("RefreshIterators")
.detail("NumReplacedIterators", numReplacedIters)
.detail("NumReusedIterators", numReusedIters)
.detail("NumNewIterators", numNewIterators)
.detail("PoolSize", poolSize)
.detail("RefreshedIterators", refreshedIterCount);
numReplacedIters = 0;
numReusedIters = 0;
numNewIterators = 0;
}
void clear() {
std::unique_lock<std::mutex> lock(mu);
pool.clear();
}
void update(const std::string& id) {
std::unique_lock<std::mutex> lock(mu);
auto it = pool.find(id);
if (it != pool.end()) {
it->second->iter->Refresh();
}
}
uint64_t numReadIteratorsCreated() { return index; }
uint64_t numTimesReadIteratorsReused() { return iteratorsReuseCount; }
void erase(const std::string& id) {
std::unique_lock<std::mutex> lock(mu);
pool.erase(id);
}
private:
std::unordered_map<int, ReadIterator> iteratorsMap;
std::unordered_map<int, ReadIterator>::iterator it;
rocksdb::DB* db;
rocksdb::ColumnFamilyHandle* cf;
std::mutex mutex;
// incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator().
uint64_t index;
uint64_t iteratorsReuseCount;
std::mutex mu;
std::unordered_map<std::string, std::shared_ptr<ReadIterator>> pool;
uint64_t numReplacedIters = 0;
uint64_t numReusedIters = 0;
uint64_t numNewIterators = 0;
};
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
@ -863,7 +845,6 @@ struct PhysicalShard {
PhysicalShard(rocksdb::DB* db, std::string id, rocksdb::ColumnFamilyHandle* handle)
: db(db), id(id), cf(handle), isInitialized(true) {
ASSERT(cf);
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
}
rocksdb::Status init() {
@ -876,7 +857,6 @@ struct PhysicalShard {
return status;
}
logShardEvent(id, ShardOp::OPEN);
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
this->isInitialized.store(true);
return status;
}
@ -941,10 +921,7 @@ struct PhysicalShard {
.detail("Checkpoint", checkpoint.toString());
if (status.ok()) {
if (!this->isInitialized) {
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
this->isInitialized.store(true);
} else if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
this->readIterPool->update();
}
}
@ -953,11 +930,6 @@ struct PhysicalShard {
bool initialized() { return this->isInitialized.load(); }
void refreshReadIteratorPool() {
ASSERT(this->readIterPool != nullptr);
this->readIterPool->refreshIterators();
}
std::vector<KeyRange> getAllRanges() const {
std::vector<KeyRange> res;
for (const auto& [key, shard] : dataShards) {
@ -986,7 +958,6 @@ struct PhysicalShard {
~PhysicalShard() {
logShardEvent(id, ShardOp::CLOSE);
isInitialized.store(false);
readIterPool.reset();
// Deleting default column family is not allowed.
if (deletePending && id != DEFAULT_CF_NAME) {
@ -1011,7 +982,6 @@ struct PhysicalShard {
rocksdb::ColumnFamilyOptions cfOptions;
rocksdb::ColumnFamilyHandle* cf = nullptr;
std::unordered_map<std::string, std::unique_ptr<DataShard>> dataShards;
std::shared_ptr<ReadIteratorPool> readIterPool;
bool deletePending = false;
std::atomic<bool> isInitialized;
uint64_t numRangeDeletions = 0;
@ -1019,19 +989,40 @@ struct PhysicalShard {
double lastCompactionTime = 0.0;
};
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) {
int readRangeInDb(PhysicalShard* shard,
const KeyRangeRef range,
int rowLimit,
int byteLimit,
RangeResult* result,
std::shared_ptr<IteratorPool> iteratorPool) {
if (rowLimit == 0 || byteLimit == 0) {
return 0;
}
int accumulatedBytes = 0;
rocksdb::Status s;
std::shared_ptr<ReadIterator> readIter = nullptr;
bool reuseIterator = SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS && iteratorPool != nullptr;
if (g_network->isSimulated() &&
deterministicRandom()->random01() > SERVER_KNOBS->ROCKSDB_PROBABILITY_REUSE_ITERATOR_SIM) {
// Reduce probability of reusing iterators in simulation.
reuseIterator = false;
}
if (reuseIterator) {
readIter = iteratorPool->getIterator(shard->id);
if (readIter == nullptr) {
readIter = std::make_shared<ReadIterator>(shard->cf, shard->db);
}
} else {
readIter = std::make_shared<ReadIterator>(shard->cf, shard->db, range);
}
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
if (rowLimit >= 0) {
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
auto* cursor = readIter->iter.get();
cursor->Seek(toSlice(range.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < range.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
@ -1044,10 +1035,8 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
cursor->Next();
}
s = cursor->status();
shard->readIterPool->returnIterator(readIter);
} else {
ReadIterator readIter = shard->readIterPool->getIterator(range);
auto cursor = readIter.iter;
auto* cursor = readIter->iter.get();
cursor->SeekForPrev(toSlice(range.end));
if (cursor->Valid() && toStringRef(cursor->key()) == range.end) {
cursor->Prev();
@ -1063,7 +1052,6 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
cursor->Prev();
}
s = cursor->status();
shard->readIterPool->returnIterator(readIter);
}
if (!s.ok()) {
@ -1072,6 +1060,9 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
// should never be returned to user.
return -1;
}
if (reuseIterator) {
iteratorPool->returnIterator(shard->id, readIter);
}
return accumulatedBytes;
}
@ -1094,9 +1085,10 @@ public:
const rocksdb::DBOptions& options,
std::shared_ptr<RocksDBErrorListener> errorListener,
std::shared_ptr<RocksDBEventListener> eventListener,
Counters* cc)
Counters* cc,
std::shared_ptr<IteratorPool> iteratorPool)
: path(path), logId(logId), dbOptions(options), cfOptions(getCFOptions()), dataShardMap(nullptr, specialKeys.end),
counters(cc) {
counters(cc), iteratorPool(iteratorPool) {
if (!g_network->isSimulated()) {
// Generating trace events in non-FDB thread will cause errors. The event listener is tested with local FDB
// cluster.
@ -1250,7 +1242,8 @@ public:
keyRange,
std::max(2, SERVER_KNOBS->ROCKSDB_READ_RANGE_ROW_LIMIT),
SERVER_KNOBS->SHARD_METADATA_SCAN_BYTES_LIMIT,
&metadata);
&metadata,
iteratorPool);
if (bytes <= 0) {
break;
}
@ -1344,7 +1337,6 @@ public:
if (!status.ok()) {
return status;
}
metadataShard->readIterPool->update();
TraceEvent(SevInfo, "ShardedRocksInitializeMetaDataShard", this->logId)
.detail("MetadataShardCF", metadataShard->cf->GetID());
}
@ -1355,6 +1347,7 @@ public:
SERVER_KNOBS->ROCKSDB_WRITEBATCH_PROTECTION_BYTES_PER_KEY, // protection_bytes_per_key
0 /* default_cf_ts_sz default:0 */);
dirtyShards = std::make_unique<std::set<PhysicalShard*>>();
iteratorPool->update(getMetaDataShard()->id);
TraceEvent(SevInfo, "ShardedRocksDBInitEnd", this->logId)
.detail("DataPath", path)
@ -1493,7 +1486,7 @@ public:
if (SERVER_KNOBS->ROCKSDB_EMPTY_RANGE_CHECK && existingShard->initialized()) {
// Enable consistency validation.
RangeResult rangeResult;
auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult);
auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult, iteratorPool);
if (bytesRead > 0) {
TraceEvent(SevError, "ShardedRocksDBRangeNotEmpty")
.detail("ShardId", existingShard->toString())
@ -1913,6 +1906,7 @@ private:
KeyRangeMap<DataShard*> dataShardMap;
std::deque<std::string> pendingDeletionShards;
Counters* counters;
std::shared_ptr<IteratorPool> iteratorPool;
};
class RocksDBMetrics {
@ -1942,8 +1936,6 @@ public:
Reference<Histogram> getCommitQueueWaitHistogram();
Reference<Histogram> getWriteHistogram();
Reference<Histogram> getDeleteCompactRangeHistogram();
// Stat for Memory Usage
void logMemUsage(rocksdb::DB* db);
std::vector<std::pair<std::string, int64_t>> getManifestBytes(std::string manifestDirectory);
private:
@ -2085,6 +2077,8 @@ RocksDBMetrics::RocksDBMetrics(UID debugID, std::shared_ptr<rocksdb::Statistics>
{ "RowCacheHit", rocksdb::ROW_CACHE_HIT, 0 },
{ "RowCacheMiss", rocksdb::ROW_CACHE_MISS, 0 },
{ "CountIterSkippedKeys", rocksdb::NUMBER_ITER_SKIP, 0 },
{ "NoIteratorCreated", rocksdb::NO_ITERATOR_CREATED, 0 },
{ "NoIteratorDeleted", rocksdb::NO_ITERATOR_DELETED, 0 },
};
@ -2263,20 +2257,6 @@ void RocksDBMetrics::logStats(rocksdb::DB* db, std::string manifestDirectory) {
}
}
void RocksDBMetrics::logMemUsage(rocksdb::DB* db) {
TraceEvent e(SevInfo, "ShardedRocksDBMemMetrics", debugID);
uint64_t stat;
ASSERT(db != nullptr);
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kBlockCacheUsage, &stat));
e.detail("BlockCacheUsage", stat);
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kEstimateTableReadersMem, &stat));
e.detail("EstimateSstReaderBytes", stat);
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kCurSizeAllMemTables, &stat));
e.detail("AllMemtablesBytes", stat);
ASSERT(db->GetAggregatedIntProperty(rocksdb::DB::Properties::kBlockCachePinnedUsage, &stat));
e.detail("BlockCachePinnedUsage", stat);
}
void RocksDBMetrics::resetPerfContext() {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableCount);
rocksdb::get_perf_context()->Reset();
@ -2460,7 +2440,6 @@ ACTOR Future<Void> rocksDBAggregatedMetricsLogger(std::shared_ptr<ShardedRocksDB
break;
}
rocksDBMetrics->logStats(db, manifestDirectory);
rocksDBMetrics->logMemUsage(db);
if (SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE != 0) {
rocksDBMetrics->logPerfContext(true);
}
@ -2476,33 +2455,30 @@ ACTOR Future<Void> rocksDBAggregatedMetricsLogger(std::shared_ptr<ShardedRocksDB
struct ShardedRocksDBKeyValueStore : IKeyValueStore {
using CF = rocksdb::ColumnFamilyHandle*;
ACTOR static Future<Void> refreshReadIteratorPools(
std::shared_ptr<ShardedRocksDBState> rState,
Future<Void> readyToStart,
std::unordered_map<std::string, std::shared_ptr<PhysicalShard>>* physicalShards) {
ACTOR static Future<Void> refreshIteratorPool(std::shared_ptr<ShardedRocksDBState> rState,
std::shared_ptr<IteratorPool> iteratorPool,
Future<Void> readyToStart) {
if (!SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
return Void();
}
state Reference<Histogram> histogram = Histogram::getHistogram(
ROCKSDBSTORAGE_HISTOGRAM_GROUP, "TimeSpentRefreshIterators"_sr, Histogram::Unit::milliseconds);
if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
try {
wait(readyToStart);
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
if (rState->closing) {
break;
}
double startTime = timer_monotonic();
for (auto& [_, shard] : *physicalShards) {
if (shard->initialized()) {
shard->refreshReadIteratorPool();
}
}
histogram->sample(timer_monotonic() - startTime);
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "RefreshReadIteratorPoolError").errorUnsuppressed(e);
try {
wait(readyToStart);
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME));
if (rState->closing) {
break;
}
double startTime = timer_monotonic();
iteratorPool->refresh();
histogram->sample(timer_monotonic() - startTime);
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "RefreshReadIteratorPoolError").errorUnsuppressed(e);
}
}
@ -2648,14 +2624,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
int threadIndex;
std::unordered_map<uint32_t, rocksdb::ColumnFamilyHandle*>* columnFamilyMap;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::shared_ptr<IteratorPool> iteratorPool;
double sampleStartTime;
explicit Writer(UID logId,
int threadIndex,
std::unordered_map<uint32_t, rocksdb::ColumnFamilyHandle*>* columnFamilyMap,
std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
std::shared_ptr<RocksDBMetrics> rocksDBMetrics,
std::shared_ptr<IteratorPool> iteratorPool)
: logId(logId), threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics),
sampleStartTime(now()) {}
iteratorPool(iteratorPool), sampleStartTime(now()) {}
~Writer() override {}
@ -2736,6 +2714,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
columnFamilyMap->erase(shard->cf->GetID());
a.metadataShard->db->Delete(
rocksdb::WriteOptions(), a.metadataShard->cf, compactionTimestampPrefix.toString() + shard->id);
iteratorPool->erase(shard->id);
}
TraceEvent("RemoveShardTime").detail("Duration", now() - start).detail("Size", a.shards.size());
a.shards.clear();
@ -2849,7 +2828,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
if (SERVER_KNOBS->SHARDED_ROCKSDB_REUSE_ITERATORS) {
for (auto shard : *(a.dirtyShards)) {
shard->readIterPool->update();
iteratorPool->update(shard->id);
}
}
@ -3309,10 +3288,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
double readRangeTimeout;
int threadIndex;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::shared_ptr<IteratorPool> iteratorPool;
double sampleStartTime;
explicit Reader(UID logId, int threadIndex, std::shared_ptr<RocksDBMetrics> rocksDBMetrics)
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics), sampleStartTime(now()) {
explicit Reader(UID logId,
int threadIndex,
std::shared_ptr<RocksDBMetrics> rocksDBMetrics,
std::shared_ptr<IteratorPool> iteratorPool)
: logId(logId), threadIndex(threadIndex), rocksDBMetrics(rocksDBMetrics), iteratorPool(iteratorPool),
sampleStartTime(now()) {
readValueTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_TIMEOUT;
readValuePrefixTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
@ -3574,7 +3558,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
.detail("Reason", shard == nullptr ? "Not Exist" : "Not Initialized");
continue;
}
auto bytesRead = readRangeInDb(shard, range, rowLimit, byteLimit, &result);
auto bytesRead = readRangeInDb(shard, range, rowLimit, byteLimit, &result, iteratorPool);
if (bytesRead < 0) {
// Error reading an instance.
a.result.sendError(internal_error());
@ -3629,7 +3613,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
errorListener(std::make_shared<RocksDBErrorListener>()),
eventListener(std::make_shared<RocksDBEventListener>(id)),
errorFuture(forwardError(errorListener->getFuture())), dbOptions(getOptions()),
shardManager(path, id, dbOptions, errorListener, eventListener, &counters),
iteratorPool(std::make_shared<IteratorPool>()),
shardManager(path, id, dbOptions, errorListener, eventListener, &counters, iteratorPool),
rocksDBMetrics(std::make_shared<RocksDBMetrics>(id, dbOptions.statistics)) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage
// engine is still multi-threaded as background compaction threads are still present. Reads/writes to disk
@ -3652,12 +3637,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
compactionThread = createGenericThreadPool(0, SERVER_KNOBS->ROCKSDB_COMPACTION_THREAD_PRIORITY);
readThreads = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_READER_THREAD_PRIORITY);
}
writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr");
writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics, iteratorPool),
"fdb-rocksdb-wr");
compactionThread->addThread(new CompactionWorker(id), "fdb-rocksdb-cw");
TraceEvent("ShardedRocksDBReadThreads", id)
.detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(id, i, rocksDBMetrics), "fdb-rocksdb-re");
readThreads->addThread(new Reader(id, i, rocksDBMetrics, iteratorPool), "fdb-rocksdb-re");
}
}
@ -3679,6 +3665,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
TraceEvent("CloseKeyValueStore").detail("DeleteKVS", deleteOnClose);
self->iteratorPool->clear();
auto a = new Writer::CloseAction(&self->shardManager, deleteOnClose);
auto f = a->done.getFuture();
self->writeThread->post(a);
@ -3729,7 +3716,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path);
this->compactionJob = compactShards(this->rState, openFuture, &shardManager, compactionThread);
this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards());
this->refreshHolder = refreshIteratorPool(this->rState, iteratorPool, openFuture);
this->refreshRocksDBBackgroundWorkHolder =
refreshRocksDBBackgroundEventCounter(this->id, this->eventListener);
this->cleanUpJob = emptyShardCleaner(this->rState, openFuture, &shardManager, writeThread);
@ -4084,6 +4071,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksdb::DBOptions dbOptions;
std::shared_ptr<RocksDBErrorListener> errorListener;
std::shared_ptr<RocksDBEventListener> eventListener;
std::shared_ptr<IteratorPool> iteratorPool;
ShardManager shardManager;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::string path;