Range deletion memory usage improvements (#10048)

This commit is contained in:
Yao Xiao 2023-05-03 11:27:55 -07:00 committed by Yao Xiao
parent fa101e1e11
commit 2d1b5d02e2
6 changed files with 266 additions and 20 deletions

View File

@ -517,6 +517,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_WAL_RECOVERY_MODE, 2 ); // kPointInTimeRecovery, RocksDB default.
init( ROCKSDB_TARGET_FILE_SIZE_BASE, 16777216 ); // 16MB, RocksDB default.
init( ROCKSDB_MAX_OPEN_FILES, 50000 ); // Should be smaller than OS's fd limit.
init( ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS, false ); if (isSimulated) ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS = deterministicRandom()->coinflip();
init( ROCKSDB_CF_RANGE_DELETION_LIMIT, 1000 );
init (ROCKSDB_WAIT_ON_CF_FLUSH, true ); if (isSimulated) ROCKSDB_WAIT_ON_CF_FLUSH = deterministicRandom()->coinflip();
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -460,6 +460,9 @@ public:
int ROCKSDB_WAL_RECOVERY_MODE;
int ROCKSDB_TARGET_FILE_SIZE_BASE;
int ROCKSDB_MAX_OPEN_FILES;
bool ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS;
int ROCKSDB_CF_RANGE_DELETION_LIMIT;
bool ROCKSDB_WAIT_ON_CF_FLUSH;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -2732,6 +2732,33 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
return Void();
}
TEST_CASE("noSim/RocksDB/RangeClear") {
state const std::string rocksDBTestDir = "rocksdb-perf-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
state KeyRef shardPrefix = "\xffprefix/"_sr;
state int i = 0;
for (; i < 50000; ++i) {
state std::string key1 = format("\xffprefix/%d", i);
state std::string key2 = format("\xffprefix/%d", i + 1);
kvStore->set({ key2, std::to_string(i) });
RangeResult result = wait(kvStore->readRange(KeyRangeRef(shardPrefix, key1), 10000, 10000));
kvStore->clear({ KeyRangeRef(shardPrefix, key1) });
wait(kvStore->commit(false));
}
// TODO: flush memtable. The process is expected to OOM.
Future<Void> closed = kvStore->onClosed();
kvStore->dispose();
wait(closed);
return Void();
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -33,6 +33,7 @@
#include "flow/IThreadPool.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Histogram.h"
#include "flow/UnitTest.h"
#include <memory>
#include <tuple>
@ -720,6 +721,11 @@ struct PhysicalShard {
return res;
}
bool shouldFlush() {
return SERVER_KNOBS->ROCKSDB_CF_RANGE_DELETION_LIMIT > 0 &&
numRangeDeletions > SERVER_KNOBS->ROCKSDB_CF_RANGE_DELETION_LIMIT;
}
std::string toString() {
std::string ret = "[ID]: " + this->id + ", [CF]: ";
if (initialized()) {
@ -765,6 +771,7 @@ struct PhysicalShard {
std::shared_ptr<ReadIteratorPool> readIterPool;
bool deletePending = false;
std::atomic<bool> isInitialized;
uint64_t numRangeDeletions;
double deleteTimeSec;
};
@ -824,15 +831,27 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
return accumulatedBytes;
}
struct Counters {
CounterCollection cc;
Counter immediateThrottle;
Counter failedToAcquire;
Counter convertedRangeDeletions;
Counters()
: cc("RocksDBThrottle"), immediateThrottle("ImmediateThrottle", cc), failedToAcquire("failedToAcquire", cc),
convertedRangeDeletions("ConvertedRangeDeletions", cc) {}
};
// Manages physical shards and maintains logical shard mapping.
class ShardManager {
public:
ShardManager(std::string path,
UID logId,
const rocksdb::Options& options,
std::shared_ptr<RocksDBErrorListener> errorListener)
: path(path), logId(logId), dbOptions(options), cfOptions(getCFOptions()),
dataShardMap(nullptr, specialKeys.end) {
std::shared_ptr<RocksDBErrorListener> errorListener,
Counters* cc)
: path(path), logId(logId), dbOptions(options), cfOptions(getCFOptions()), dataShardMap(nullptr, specialKeys.end),
counters(cc) {
dbOptions.listeners.push_back(errorListener);
}
@ -1319,7 +1338,7 @@ public:
dirtyShards->insert(it.value()->physicalShard);
}
void clearRange(KeyRangeRef range) {
void clearRange(KeyRangeRef range, std::set<Key>* keysSet) {
auto rangeIterator = dataShardMap.intersectingRanges(range);
for (auto it = rangeIterator.begin(); it != rangeIterator.end(); ++it) {
@ -1328,8 +1347,41 @@ public:
continue;
}
auto physicalShard = it.value()->physicalShard;
// TODO: Disable this once RocksDB is upgraded to a version with range delete improvement.
if (SERVER_KNOBS->ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS && systemKeys.contains(range)) {
auto scanRange = it.range() & range;
auto beginSlice = toSlice(scanRange.begin);
auto endSlice = toSlice(scanRange.end);
rocksdb::ReadOptions options = getReadOptions();
options.iterate_lower_bound = &beginSlice;
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options, physicalShard->cf));
cursor->Seek(beginSlice);
while (cursor->Valid() && toStringRef(cursor->key()) < toStringRef(endSlice)) {
writeBatch->Delete(physicalShard->cf, cursor->key());
cursor->Next();
}
if (!cursor->status().ok()) {
// if readrange iteration fails, then do a deleteRange.
writeBatch->DeleteRange(physicalShard->cf, beginSlice, endSlice);
} else {
auto key = keysSet->lower_bound(scanRange.begin);
while (key != keysSet->end() && *key < scanRange.end) {
writeBatch->Delete(physicalShard->cf, toSlice(*key));
++key;
}
}
++counters->convertedRangeDeletions;
} else {
writeBatch->DeleteRange(physicalShard->cf, toSlice(range.begin), toSlice(range.end));
++physicalShard->numRangeDeletions;
}
// TODO: Skip clear range and compaction when entire CF is cleared.
writeBatch->DeleteRange(it.value()->physicalShard->cf, toSlice(range.begin), toSlice(range.end));
dirtyShards->insert(it.value()->physicalShard);
}
}
@ -1411,6 +1463,18 @@ public:
return existingShards;
}
void flushShard(std::string shardId) {
auto it = physicalShards.find(shardId);
if (it == physicalShards.end()) {
return;
}
rocksdb::FlushOptions fOptions;
fOptions.wait = SERVER_KNOBS->ROCKSDB_WAIT_ON_CF_FLUSH;
fOptions.allow_write_stall = true;
db->Flush(fOptions, it->second->cf);
}
void closeAllShards() {
if (dbOptions.rate_limiter != nullptr) {
dbOptions.rate_limiter->SetBytesPerSecond((int64_t)5 << 30);
@ -1534,6 +1598,7 @@ private:
std::unique_ptr<std::set<PhysicalShard*>> dirtyShards;
KeyRangeMap<DataShard*> dataShardMap;
std::deque<std::string> pendingDeletionShards;
Counters* counters;
};
class RocksDBMetrics {
@ -2334,11 +2399,12 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return;
}
for (auto shard : *(a.dirtyShards)) {
shard->readIterPool->update();
if (SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS) {
for (auto shard : *(a.dirtyShards)) {
shard->readIterPool->update();
}
}
a.done.send(Void());
if (SERVER_KNOBS->ROCKSDB_SUGGEST_COMPACT_CLEAR_RANGE) {
for (const auto& [id, range] : deletes) {
auto cf = columnFamilyMap->find(id);
@ -2350,6 +2416,24 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
}
// Check for number of range deletes in shards.
// TODO: Disable this once RocksDB is upgraded to a version with range delete improvement.
if (SERVER_KNOBS->ROCKSDB_CF_RANGE_DELETION_LIMIT > 0) {
rocksdb::FlushOptions fOptions;
fOptions.wait = SERVER_KNOBS->ROCKSDB_WAIT_ON_CF_FLUSH;
fOptions.allow_write_stall = true;
for (auto shard : (*a.dirtyShards)) {
if (shard->shouldFlush()) {
TraceEvent("FlushCF")
.detail("PhysicalShardId", shard->id)
.detail("NumRangeDeletions", shard->numRangeDeletions);
a.db->Flush(fOptions, shard->cf);
shard->numRangeDeletions = 0;
}
}
}
if (a.getHistograms) {
double currTime = timer_monotonic();
rocksDBMetrics->getCommitActionHistogram()->sampleSeconds(currTime - commitBeginTime);
@ -2357,6 +2441,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
sample();
a.done.send(Void());
}
struct CloseAction : TypedAction<Writer, CloseAction> {
@ -3067,15 +3153,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
};
struct Counters {
CounterCollection cc;
Counter immediateThrottle;
Counter failedToAcquire;
Counters()
: cc("RocksDBThrottle"), immediateThrottle("ImmediateThrottle", cc), failedToAcquire("failedToAcquire", cc) {}
};
// Persist shard mappinng key range should not be in shardMap.
explicit ShardedRocksDBKeyValueStore(const std::string& path, UID id)
: rState(std::make_shared<ShardedRocksDBState>()), path(path), id(id),
@ -3084,7 +3161,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(errorListener->getFuture()),
dbOptions(getOptions()), shardManager(path, id, dbOptions, errorListener),
dbOptions(getOptions()), shardManager(path, id, dbOptions, errorListener, &counters),
rocksDBMetrics(std::make_shared<RocksDBMetrics>(id, dbOptions.statistics)) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage
// engine is still multi-threaded as background compaction threads are still present. Reads/writes to disk
@ -3172,13 +3249,19 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return res;
}
void set(KeyValueRef kv, const Arena*) override { shardManager.put(kv.key, kv.value); }
void set(KeyValueRef kv, const Arena*) override {
shardManager.put(kv.key, kv.value);
if (SERVER_KNOBS->ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS && systemKeys.contains(kv.key)) {
keysSet.insert(kv.key);
}
}
void clear(KeyRangeRef range, const Arena*) override {
if (range.singleKeyRange()) {
shardManager.clear(range.begin);
keysSet.erase(range.begin);
} else {
shardManager.clearRange(range);
shardManager.clearRange(range, &keysSet);
}
}
@ -3204,11 +3287,14 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
shardManager.getWriteBatch(),
shardManager.getDirtyShards(),
shardManager.getColumnFamilyMap());
keysSet.clear();
auto res = a->done.getFuture();
writeThread->post(a);
return res;
}
void flushShard(std::string shardId) { return shardManager.flushShard(shardId); }
void checkWaiters(const FlowLock& semaphore, int maxWaiters) {
if (semaphore.waiters() > maxWaiters) {
++counters.immediateThrottle;
@ -3429,6 +3515,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::string path;
UID id;
std::set<Key> keysSet;
Reference<IThreadPool> writeThread;
Reference<IThreadPool> readThreads;
Future<Void> errorFuture;
@ -4218,6 +4305,121 @@ TEST_CASE("noSim/ShardedRocksDB/RocksDBSstFileWriter") {
return Void();
}
TEST_CASE("perf/ShardedRocksDB/RangeClearSysKey") {
state int deleteCount = params.getInt("deleteCount").orDefault(20000);
std::cout << "delete count: " << deleteCount << "\n";
state std::string rocksDBTestDir = "sharded-rocksdb-perf-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore =
new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
state KeyRef shardPrefix = "\xffprefix/"_sr;
wait(kvStore->addRange(prefixRange(shardPrefix), "shard-1"));
kvStore->persistRangeMapping(prefixRange(shardPrefix), true);
state int i = 0;
for (; i < deleteCount; ++i) {
state std::string key1 = format("\xffprefix/%d", i);
state std::string key2 = format("\xffprefix/%d", i + 1);
kvStore->set({ key2, std::to_string(i) });
kvStore->clear({ KeyRangeRef(shardPrefix, key1) });
wait(kvStore->commit(false));
}
std::cout << "start flush\n";
auto rocksdb = (ShardedRocksDBKeyValueStore*)kvStore;
rocksdb->flushShard("shard-1");
std::cout << "flush complete\n";
{
Future<Void> closed = kvStore->onClosed();
kvStore->close();
wait(closed);
}
kvStore = new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
std::cout << "Restarted.\n";
i = 0;
for (; i < deleteCount; ++i) {
key1 = format("\xffprefix/%d", i);
key2 = format("\xffprefix/%d", i + 1);
kvStore->set({ key2, std::to_string(i) });
RangeResult result = wait(kvStore->readRange(KeyRangeRef(shardPrefix, key1), 10000, 10000));
kvStore->clear({ KeyRangeRef(shardPrefix, key1) });
wait(kvStore->commit(false));
if (i % 100 == 0) {
std::cout << "Commit: " << i << "\n";
}
}
Future<Void> closed = kvStore->onClosed();
kvStore->dispose();
wait(closed);
return Void();
}
TEST_CASE("perf/ShardedRocksDB/RangeClearUserKey") {
state int deleteCount = params.getInt("deleteCount").orDefault(20000);
std::cout << "delete count: " << deleteCount << "\n";
state std::string rocksDBTestDir = "sharded-rocksdb-perf-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore =
new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
state KeyRef shardPrefix = "prefix/"_sr;
wait(kvStore->addRange(prefixRange(shardPrefix), "shard-1"));
kvStore->persistRangeMapping(prefixRange(shardPrefix), true);
state int i = 0;
for (; i < deleteCount; ++i) {
state std::string key1 = format("prefix/%d", i);
state std::string key2 = format("prefix/%d", i + 1);
kvStore->set({ key2, std::to_string(i) });
kvStore->clear({ KeyRangeRef(shardPrefix, key1) });
wait(kvStore->commit(false));
}
std::cout << "start flush\n";
auto rocksdb = (ShardedRocksDBKeyValueStore*)kvStore;
rocksdb->flushShard("shard-1");
std::cout << "flush complete\n";
{
Future<Void> closed = kvStore->onClosed();
kvStore->close();
wait(closed);
}
kvStore = new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
std::cout << "Restarted.\n";
i = 0;
for (; i < deleteCount; ++i) {
key1 = format("prefix/%d", i);
key2 = format("prefix/%d", i + 1);
kvStore->set({ key2, std::to_string(i) });
RangeResult result = wait(kvStore->readRange(KeyRangeRef(shardPrefix, key1), 10000, 10000));
kvStore->clear({ KeyRangeRef(shardPrefix, key1) });
wait(kvStore->commit(false));
if (i % 100 == 0) {
std::cout << "Commit: " << i << "\n";
}
}
Future<Void> closed = kvStore->onClosed();
kvStore->dispose();
wait(closed);
return Void();
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -234,12 +234,14 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES noSim/PerfShardedRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE)
else()
add_fdb_test(TEST_FILES fast/ValidateStorage.toml IGNORE)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml IGNORE)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml IGNORE)
add_fdb_test(TEST_FILES noSim/PerfShardedRocksDBTest.toml IGNORE)
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml IGNORE)
add_fdb_test(TEST_FILES fast/StorageServerCheckpointRestore.toml IGNORE)
endif()

View File

@ -0,0 +1,9 @@
[[test]]
testTitle = 'UnitTests'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 10
testsMatching = 'perf/ShardedRocksDB/'