Disable compaction compaction for newly added shard. (#11238)

* Disable compaction compaction for newly added shard.
This commit is contained in:
Yao Xiao 2024-03-07 14:41:53 -08:00 committed by GitHub
parent 6d9276535c
commit 19e3f3e2dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 82 additions and 11 deletions

View File

@ -591,9 +591,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( SHARDED_ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 32 << 20 );
init( SHARDED_ROCKSDB_BACKGROUND_PARALLELISM, 2 );
init( SHARDED_ROCKSDB_MAX_SUBCOMPACTIONS, 0 );
init( SHARDED_ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER, 4 );
init( SHARDED_ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20 ); // RocksDB default.
init( SHARDED_ROCKSDB_LEVEL0_STOP_WRITES_TRIGGER, 36 ); // RocksDB default.
init( SHARDED_ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER, 4 );
init( SHARDED_ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20 ); // RocksDB default.
init( SHARDED_ROCKSDB_LEVEL0_STOP_WRITES_TRIGGER, 36 ); // RocksDB default.
init( SHARDED_ROCKSDB_DELAY_COMPACTION_FOR_DATA_MOVE, true);
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -83,7 +83,7 @@ public:
// Shard management APIs.
// Adds key range to a physical shard.
virtual Future<Void> addRange(KeyRangeRef range, std::string id) { return Void(); }
virtual Future<Void> addRange(KeyRangeRef range, std::string id, bool active = true) { return Void(); }
// Removes a key range from KVS and returns a list of empty physical shards after the removal.
virtual std::vector<std::string> removeRange(KeyRangeRef range) { return std::vector<std::string>(); }
@ -93,6 +93,9 @@ public:
return replaceRange_impl(this, range, data);
}
// Marks a key range as active and prepares it for future read.
virtual void markRangeAsActive(KeyRangeRef range) {}
// Persists key range and physical shard mapping.
virtual void persistRangeMapping(KeyRangeRef range, bool isAdd) {}
@ -178,4 +181,4 @@ ACTOR static Future<Void> replaceRange_impl(IKeyValueStore* self,
}
#include "flow/unactorcompiler.h"
#endif
#endif

View File

@ -557,6 +557,7 @@ public:
int SHARDED_ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER;
int SHARDED_ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER;
int SHARDED_ROCKSDB_LEVEL0_STOP_WRITES_TRIGGER;
bool SHARDED_ROCKSDB_DELAY_COMPACTION_FOR_DATA_MOVE;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -576,6 +576,30 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
return options;
}
rocksdb::ColumnFamilyOptions getCFOptionsForInactiveShard() {
auto options = getCFOptions();
// never slowdown ingest.
options.level0_file_num_compaction_trigger = (1 << 30);
options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = (1 << 30);
options.soft_pending_compaction_bytes_limit = 0;
options.hard_pending_compaction_bytes_limit = 0;
// no auto compactions please. The application should issue a
// manual compaction after all data is loaded into L0.
options.disable_auto_compactions = true;
// A manual compaction run should pick all files in L0 in
// a single compaction run.
options.max_compaction_bytes = (static_cast<uint64_t>(1) << 60);
// It is better to have only 2 levels, otherwise a manual
// compaction would compact at every possible level, thereby
// increasing the total time needed for compactions.
options.num_levels = 2;
return options;
}
rocksdb::DBOptions getOptions() {
rocksdb::DBOptions options;
options.avoid_unnecessary_blocking_io = true;
@ -1334,7 +1358,7 @@ public:
return result;
}
PhysicalShard* addRange(KeyRange range, std::string id) {
PhysicalShard* addRange(KeyRange range, std::string id, bool active) {
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId).detail("Range", range).detail("ShardId", id);
// Newly added range should not overlap with any existing range.
@ -1361,6 +1385,7 @@ public:
}
}
auto cfOptions = active ? getCFOptions() : getCFOptionsForInactiveShard();
auto [it, inserted] = physicalShards.emplace(id, std::make_shared<PhysicalShard>(db, id, cfOptions));
std::shared_ptr<PhysicalShard>& shard = it->second;
@ -1372,7 +1397,10 @@ public:
validate();
TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId).detail("Range", range).detail("ShardId", id);
TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId)
.detail("Range", range)
.detail("ShardId", id)
.detail("Active", active);
return shard.get();
}
@ -1466,6 +1494,37 @@ public:
return shardIds;
}
void markRangeAsActive(KeyRangeRef range) {
auto ranges = dataShardMap.intersectingRanges(range);
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
if (!it.value()) {
continue;
}
auto beginSlice = toSlice(range.begin);
auto endSlice = toSlice(range.end);
db->SuggestCompactRange(it.value()->physicalShard->cf, &beginSlice, &endSlice);
std::unordered_map<std::string, std::string> options = {
{ "level0_file_num_compaction_trigger",
std::to_string(SERVER_KNOBS->SHARDED_ROCKSDB_LEVEL0_FILENUM_COMPACTION_TRIGGER) },
{ "level0_slowdown_writes_trigger",
std::to_string(SERVER_KNOBS->SHARDED_ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER) },
{ "level0_stop_writes_trigger",
std::to_string(SERVER_KNOBS->SHARDED_ROCKSDB_LEVEL0_STOP_WRITES_TRIGGER) },
{ "soft_pending_compaction_bytes_limit",
std::to_string(SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT) },
{ "hard_pending_compaction_bytes_limit",
std::to_string(SERVER_KNOBS->SHARD_HARD_PENDING_COMPACT_BYTES_LIMIT) },
{ "disable_auto_compactions", "false" },
{ "num_levels", "-1" }
};
db->SetOptions(it.value()->physicalShard->cf, options);
TraceEvent("ShardedRocksDBRangeActive", logId).detail("ShardId", it.value()->physicalShard->id);
}
}
std::vector<std::shared_ptr<PhysicalShard>> getPendingDeletionShards(double cleanUpDelay) {
std::vector<std::shared_ptr<PhysicalShard>> emptyShards;
double currentTime = now();
@ -2426,7 +2485,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
}
for (const KeyRange& range : ranges) {
self->shardManager.addRange(range, shardId);
self->shardManager.addRange(range, shardId, true);
}
const Severity sevDm = static_cast<Severity>(SERVER_KNOBS->PHYSICAL_SHARD_MOVE_LOG_SEVERITY);
TraceEvent(sevDm, "ShardedRocksRestoreAddRange", self->id)
@ -3613,8 +3672,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
}
Future<Void> addRange(KeyRangeRef range, std::string id) override {
auto shard = shardManager.addRange(range, id);
Future<Void> addRange(KeyRangeRef range, std::string id, bool active) override {
auto shard = shardManager.addRange(range, id, active);
if (shard->initialized()) {
return Void();
}
@ -3624,6 +3683,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return res;
}
void markRangeAsActive(KeyRangeRef range) override { shardManager.markRangeAsActive(range); }
void set(KeyValueRef kv, const Arena*) override {
shardManager.put(kv.key, kv.value);
if (SERVER_KNOBS->ROCKSDB_USE_POINT_DELETE_FOR_SYSTEM_KEYS && systemKeys.contains(kv.key)) {

View File

@ -557,10 +557,14 @@ struct StorageServerDisk {
void writeKeyValue(KeyValueRef kv);
void clearRange(KeyRangeRef keys);
Future<Void> addRange(KeyRangeRef range, std::string id) { return storage->addRange(range, id); }
Future<Void> addRange(KeyRangeRef range, std::string id) {
return storage->addRange(range, id, !SERVER_KNOBS->SHARDED_ROCKSDB_DELAY_COMPACTION_FOR_DATA_MOVE);
}
std::vector<std::string> removeRange(KeyRangeRef range) { return storage->removeRange(range); }
void markRangeAsActive(KeyRangeRef range) { storage->markRangeAsActive(range); }
Future<Void> replaceRange(KeyRange range, Standalone<VectorRef<KeyValueRef>> data) {
return storage->replaceRange(range, data);
}
@ -8726,6 +8730,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// We have completed the fetch and write of the data, now we wait for MVCC window to pass.
// As we have finished this work, we will allow more work to start...
shard->fetchComplete.send(Void());
data->storage.markRangeAsActive(keys);
const double duration = now() - startTime;
TraceEvent(SevInfo, "FetchKeysStats", data->thisServerID)
.detail("TotalBytes", totalBytes)