Support periodic compaction for sharded rocksdb. (#10815)

This commit is contained in:
Yao Xiao 2023-08-25 15:38:01 -07:00 committed by GitHub
parent adcedb9301
commit b20dcf23a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 11 deletions

View File

@ -424,6 +424,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_READ_RANGE_ROW_LIMIT, 65535 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ROW_LIMIT = deterministicRandom()->randomInt(2, 10);
init( ROCKSDB_READER_THREAD_PRIORITY, 0 );
init( ROCKSDB_WRITER_THREAD_PRIORITY, 0 );
init( ROCKSDB_COMPACTION_THREAD_PRIORITY, 0 );
init( ROCKSDB_BACKGROUND_PARALLELISM, 2 );
init( ROCKSDB_READ_PARALLELISM, isSimulated? 2: 4 );
init( ROCKSDB_CHECKPOINT_READER_PARALLELISM, 4 );
@ -541,6 +542,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init (SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
init (ROCKSDB_MAX_MANIFEST_FILE_SIZE, 100 << 20 ); if (isSimulated) ROCKSDB_MAX_MANIFEST_FILE_SIZE = 500 << 20; // 500MB in simulation
init (ROCKSDB_MAX_WRITE_BUFFER_NUMBER, 6 ); // RocksDB default.
init (SHARDED_ROCKSDB_AVERAGE_FILE_SIZE, 8 << 20 ); // 8MB
init (SHARDED_ROCKSDB_COMPACTION_PERIOD, isSimulated? 3600 : 2592000 ); // 30d
init (SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY, 3600 ); // 1h
init (SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT, 1 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -393,6 +393,7 @@ public:
int ROCKSDB_READ_RANGE_ROW_LIMIT;
int ROCKSDB_READER_THREAD_PRIORITY;
int ROCKSDB_WRITER_THREAD_PRIORITY;
int ROCKSDB_COMPACTION_THREAD_PRIORITY;
int ROCKSDB_BACKGROUND_PARALLELISM;
int ROCKSDB_READ_PARALLELISM;
int ROCKSDB_CHECKPOINT_READER_PARALLELISM;
@ -489,6 +490,10 @@ public:
int SHARD_METADATA_SCAN_BYTES_LIMIT;
int ROCKSDB_MAX_MANIFEST_FILE_SIZE;
int ROCKSDB_MAX_WRITE_BUFFER_NUMBER;
int SHARDED_ROCKSDB_AVERAGE_FILE_SIZE;
double SHARDED_ROCKSDB_COMPACTION_PERIOD;
double SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY;
int SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -58,6 +58,7 @@ const std::string METADATA_SHARD_ID = "kvs-metadata";
const std::string DEFAULT_CF_NAME = "default"; // `specialKeys` is stored in this culoumn family.
const std::string manifestFilePrefix = "MANIFEST-";
const KeyRef shardMappingPrefix("\xff\xff/ShardMapping/"_sr);
const KeyRef compactionTimestampPrefix("\xff\xff/CompactionTimestamp/"_sr);
// TODO: move constants to a header file.
const KeyRef persistVersion = "\xff\xffVersion"_sr;
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = "RocksDBStorage"_sr;
@ -1112,7 +1113,8 @@ struct PhysicalShard {
bool deletePending = false;
std::atomic<bool> isInitialized;
uint64_t numRangeDeletions = 0;
double deleteTimeSec;
double deleteTimeSec = 0.0;
double lastCompactionTime = 0.0;
};
int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, int byteLimit, RangeResult* result) {
@ -1217,10 +1219,8 @@ public:
if (rState->closing) {
break;
}
TraceEvent(SevInfo, "ShardedRocksKVSPhysialShardMetrics")
.detail("NumActiveShards", shardManager->numActiveShards())
.detail("TotalPhysicalShards", shardManager->numPhysicalShards());
uint64_t numSstFiles = 0;
for (auto& [id, shard] : *physicalShards) {
if (!shard->initialized()) {
continue;
@ -1228,15 +1228,15 @@ public:
uint64_t liveDataSize = 0;
ASSERT(shard->db->GetIntProperty(
shard->cf, rocksdb::DB::Properties::kEstimateLiveDataSize, &liveDataSize));
TraceEvent(SevInfo, "PhysicalShardCFSize")
.detail("ShardId", id)
.detail("LiveDataSize", liveDataSize);
TraceEvent e(SevInfo, "PhysicalShardStats");
e.detail("ShardId", id).detail("LiveDataSize", liveDataSize);
// Get compression ratio for each level.
rocksdb::ColumnFamilyMetaData cfMetadata;
shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata);
TraceEvent e(SevInfo, "PhysicalShardLevelStats");
e.detail("ShardId", id).detail("NumFiles", cfMetadata.file_count);
e.detail("NumFiles", cfMetadata.file_count);
numSstFiles += cfMetadata.file_count;
std::string levelProp;
int numLevels = 0;
for (auto it = cfMetadata.levels.begin(); it != cfMetadata.levels.end(); ++it) {
@ -1253,6 +1253,10 @@ public:
}
e.detail("NumLevels", numLevels);
}
TraceEvent(SevInfo, "KVSPhysialShardMetrics")
.detail("NumActiveShards", shardManager->numActiveShards())
.detail("TotalPhysicalShards", shardManager->numPhysicalShards())
.detail("NumSstFiles", numSstFiles);
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
@ -2608,6 +2612,73 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return Void();
}
struct CompactionWorker : IThreadPoolReceiver {
const UID logId;
explicit CompactionWorker(UID logId) : logId(logId) {}
void init() override {}
~CompactionWorker() override {}
struct CompactShardsAction : TypedAction<CompactionWorker, CompactShardsAction> {
std::vector<std::shared_ptr<PhysicalShard>> shards;
std::shared_ptr<PhysicalShard> metadataShard;
ThreadReturnPromise<Void> done;
CompactShardsAction(std::vector<std::shared_ptr<PhysicalShard>> shards, PhysicalShard* metadataShard)
: shards(shards), metadataShard(metadataShard) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(CompactShardsAction& a) {
auto start = now();
ASSERT(a.metadataShard);
auto db = a.metadataShard->db;
int skipped = 0;
for (auto& shard : a.shards) {
if (shard->deletePending) {
++skipped;
continue;
}
std::string value;
// TODO: Consider load last compaction time during shard init once rocksdb's start time is reduced.
if (shard->lastCompactionTime <= 0.0) {
auto s = db->Get(rocksdb::ReadOptions(),
a.metadataShard->cf,
compactionTimestampPrefix.toString() + shard->id,
&value);
if (s.ok()) {
auto lastComapction = std::stod(value);
if (start - lastComapction < SERVER_KNOBS->SHARDED_ROCKSDB_COMPACTION_PERIOD) {
shard->lastCompactionTime = lastComapction;
++skipped;
continue;
}
} else if (!s.IsNotFound()) {
TraceEvent(SevError, "ShardedRocksDBReadValueError", logId).detail("Description", s.ToString());
a.done.sendError(internal_error());
return;
}
}
rocksdb::CompactRangeOptions compactOptions;
// Force RocksDB to rewrite file to last level.
compactOptions.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForceOptimized;
shard->db->CompactRange(compactOptions, shard->cf, /*begin=*/nullptr, /*end=*/nullptr);
shard->db->Put(rocksdb::WriteOptions(),
a.metadataShard->cf,
compactionTimestampPrefix.toString() + shard->id,
std::to_string(start));
shard->lastCompactionTime = start;
TraceEvent("ManualCompaction", logId).detail("ShardId", shard->id);
}
TraceEvent("CompactionCompleted", logId)
.detail("NumShards", a.shards.size())
.detail("Skipped", skipped)
.detail("Duration", now() - start);
a.done.send(Void());
}
};
struct Writer : IThreadPoolReceiver {
const UID logId;
int threadIndex;
@ -2686,9 +2757,11 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
struct RemoveShardAction : TypedAction<Writer, RemoveShardAction> {
std::vector<std::shared_ptr<PhysicalShard>> shards;
PhysicalShard* metadataShard;
ThreadReturnPromise<Void> done;
RemoveShardAction(std::vector<std::shared_ptr<PhysicalShard>>& shards) : shards(shards) {}
RemoveShardAction(std::vector<std::shared_ptr<PhysicalShard>>& shards, PhysicalShard* metadataShard)
: shards(shards), metadataShard(metadataShard) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
@ -2696,6 +2769,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
for (auto& shard : a.shards) {
shard->deletePending = true;
columnFamilyMap->erase(shard->cf->GetID());
a.metadataShard->db->Delete(
rocksdb::WriteOptions(), a.metadataShard->cf, compactionTimestampPrefix.toString() + shard->id);
}
a.shards.clear();
a.done.send(Void());
@ -3598,12 +3673,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
if (g_network->isSimulated()) {
TraceEvent(SevDebug, "ShardedRocksDB").detail("Info", "Use Coro threads in simulation.");
writeThread = CoroThreadPool::createThreadPool();
compactionThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
} else {
writeThread = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_WRITER_THREAD_PRIORITY);
compactionThread = createGenericThreadPool(0, SERVER_KNOBS->ROCKSDB_COMPACTION_THREAD_PRIORITY);
readThreads = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_READER_THREAD_PRIORITY);
}
writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr");
compactionThread->addThread(new CompactionWorker(id), "fdb-rocksdb-cw");
TraceEvent("ShardedRocksDBReadThreads", id)
.detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
@ -3640,6 +3718,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
try {
wait(self->writeThread->stop());
wait(self->compactionThread->stop());
} catch (Error& e) {
TraceEvent(SevError, "ShardedRocksCloseWriteThreadError").errorUnsuppressed(e);
}
@ -3677,6 +3756,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
this->metrics =
ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path);
this->compactionJob = compactShards(this->rState, openFuture, &shardManager, compactionThread);
this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards());
this->refreshRocksDBBackgroundWorkHolder = refreshRocksDBBackgroundEventCounter(this->eventListener);
this->cleanUpJob = emptyShardCleaner(this->rState, openFuture, &shardManager, writeThread);
@ -3887,6 +3967,71 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
ACTOR static Future<Void> compactShards(std::shared_ptr<ShardedRocksDBState> rState,
Future<Void> openFuture,
ShardManager* shardManager,
Reference<IThreadPool> thread) {
try {
wait(openFuture);
state std::unordered_map<std::string, std::shared_ptr<PhysicalShard>>* physicalShards =
shardManager->getAllShards();
loop {
if (rState->closing) {
break;
}
wait(delay(SERVER_KNOBS->SHARDED_ROCKSDB_COMPACTION_ACTOR_DELAY));
int count = 0;
double start = now();
std::vector<std::shared_ptr<PhysicalShard>> shards;
for (auto& [id, shard] : *physicalShards) {
if (count > SERVER_KNOBS->SHARDED_ROCKSDB_COMPACTION_SHARD_LIMIT) {
break;
}
if (!shard->initialized() || shard->deletePending) {
continue;
}
if (shard->lastCompactionTime > 0.0 &&
start - shard->lastCompactionTime < SERVER_KNOBS->SHARDED_ROCKSDB_COMPACTION_PERIOD) {
continue;
}
uint64_t liveDataSize = 0;
ASSERT(shard->db->GetIntProperty(
shard->cf, rocksdb::DB::Properties::kEstimateLiveDataSize, &liveDataSize));
rocksdb::ColumnFamilyMetaData cfMetadata;
shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata);
if (cfMetadata.file_count <= 5) {
continue;
}
if (liveDataSize / cfMetadata.file_count >= SERVER_KNOBS->SHARDED_ROCKSDB_AVERAGE_FILE_SIZE) {
continue;
}
shards.push_back(shard);
TraceEvent("CompactionScheduled")
.detail("ShardId", id)
.detail("NumFiles", cfMetadata.file_count)
.detail("ShardSize", liveDataSize);
++count;
}
if (shards.size() > 0) {
auto a = new CompactionWorker::CompactShardsAction(shards, shardManager->getMetaDataShard());
auto res = a->done.getFuture();
thread->post(a);
wait(res);
} else {
TraceEvent("CompactionSkipped").detail("Reason", "NoCandidate");
}
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "ShardedRocksDBCompactionActorError").errorUnsuppressed(e);
}
}
return Void();
}
ACTOR static Future<Void> emptyShardCleaner(std::shared_ptr<ShardedRocksDBState> rState,
Future<Void> openFuture,
ShardManager* shardManager,
@ -3902,7 +4047,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
auto shards = shardManager->getPendingDeletionShards(cleanUpDelay);
if (shards.size() > 0) {
auto a = new Writer::RemoveShardAction(shards);
auto a = new Writer::RemoveShardAction(shards, shardManager->getMetaDataShard());
Future<Void> f = a->done.getFuture();
writeThread->post(a);
TraceEvent(SevInfo, "ShardedRocksDB").detail("DeleteEmptyShards", shards.size());
@ -3970,6 +4115,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
UID id;
std::set<Key> keysSet;
Reference<IThreadPool> writeThread;
Reference<IThreadPool> compactionThread;
Reference<IThreadPool> readThreads;
Future<Void> errorFuture;
Promise<Void> closePromise;
@ -3980,6 +4126,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
FlowLock fetchSemaphore;
int numFetchWaiters;
Counters counters;
Future<Void> compactionJob;
Future<Void> refreshHolder;
Future<Void> refreshRocksDBBackgroundWorkHolder;
Future<Void> cleanUpJob;

View File

@ -1,3 +1,7 @@
[[knobs]]
rocksdb_cf_range_deletion_limit=1000
rocksdb_use_point_delete_for_system_keys=true
[[test]]
testTitle = 'UnitTests'
useDB = false

View File

@ -1,6 +1,7 @@
[[knobs]]
rocksdb_disable_auto_compactions = true
rocksdb_suggest_compact_clear_range = false
rocksdb_empty_range_check = false
[[test]]
testTitle = 'UnitTests'