Rocksdb knobs for compaction, storageserver canCommit() waiting if rocksdb overloaded.
This commit is contained in:
parent
49bd0c34cc
commit
8796a763a5
|
@ -367,6 +367,14 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
|
||||
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
|
||||
init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 );
|
||||
init( ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT, 64000000000 ); // 64GB, Rocksdb option, Writes will slow down.
|
||||
init( ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT, 100000000000 ); // 100GB, Rocksdb option, Writes will stall.
|
||||
init( ROCKSDB_CAN_COMMIT_COMPACT_BYTES_LIMIT, 50000000000 ); // 50GB, Commit waits.
|
||||
// Can commit will delay ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD seconds for
|
||||
// ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD times, if rocksdb overloaded.
|
||||
// Set ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD to 0, to disable
|
||||
init( ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD, 1 );
|
||||
init( ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD, 5 );
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
|
|
@ -298,6 +298,11 @@ public:
|
|||
bool ROCKSDB_PERFCONTEXT_ENABLE; // Enable rocks perf context metrics. May cause performance overhead
|
||||
double ROCKSDB_PERFCONTEXT_SAMPLE_RATE;
|
||||
int ROCKSDB_MAX_SUBCOMPACTIONS;
|
||||
int64_t ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT;
|
||||
int64_t ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT;
|
||||
int64_t ROCKSDB_CAN_COMMIT_COMPACT_BYTES_LIMIT;
|
||||
int ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD;
|
||||
int ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
|
|
@ -46,6 +46,7 @@ public:
|
|||
virtual KeyValueStoreType getType() const = 0;
|
||||
virtual void set(KeyValueRef keyValue, const Arena* arena = nullptr) = 0;
|
||||
virtual void clear(KeyRangeRef range, const Arena* arena = nullptr) = 0;
|
||||
virtual Future<Void> canCommit() { return Void(); }
|
||||
virtual Future<Void> commit(
|
||||
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable
|
||||
|
||||
|
|
|
@ -149,6 +149,12 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
|
|||
if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) {
|
||||
options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS;
|
||||
}
|
||||
if (SERVER_KNOBS->ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT > 0) {
|
||||
options.soft_pending_compaction_bytes_limit = SERVER_KNOBS->ROCKSDB_SOFT_PENDING_COMPACT_BYTES_LIMIT;
|
||||
}
|
||||
if (SERVER_KNOBS->ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT > 0) {
|
||||
options.hard_pending_compaction_bytes_limit = SERVER_KNOBS->ROCKSDB_HARD_PENDING_COMPACT_BYTES_LIMIT;
|
||||
}
|
||||
// Compact sstables when there's too much deleted stuff.
|
||||
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
|
||||
|
||||
|
@ -1438,6 +1444,22 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
}
|
||||
}
|
||||
|
||||
// Checks and waits for few seconds if rocskdb is overloaded.
|
||||
ACTOR Future<Void> checkRocksdbState(RocksDBKeyValueStore* self) {
|
||||
state uint64_t estPendCompactBytes;
|
||||
state int count = SERVER_KNOBS->ROCKSDB_CAN_COMMIT_DELAY_TIMES_ON_OVERLOAD;
|
||||
self->db->GetIntProperty(rocksdb::DB::Properties::kEstimatePendingCompactionBytes, &estPendCompactBytes);
|
||||
while (count && estPendCompactBytes > SERVER_KNOBS->ROCKSDB_CAN_COMMIT_COMPACT_BYTES_LIMIT) {
|
||||
wait(delay(SERVER_KNOBS->ROCKSDB_CAN_COMMIT_DELAY_ON_OVERLOAD));
|
||||
count--;
|
||||
self->db->GetIntProperty(rocksdb::DB::Properties::kEstimatePendingCompactionBytes, &estPendCompactBytes);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> canCommit() override { return checkRocksdbState(this); }
|
||||
|
||||
Future<Void> commit(bool) override {
|
||||
// If there is nothing to write, don't write.
|
||||
if (writeBatch == nullptr) {
|
||||
|
|
|
@ -204,6 +204,7 @@ struct StorageServerDisk {
|
|||
|
||||
Future<Void> getError() { return storage->getError(); }
|
||||
Future<Void> init() { return storage->init(); }
|
||||
Future<Void> canCommit() { return storage->canCommit(); }
|
||||
Future<Void> commit() { return storage->commit(); }
|
||||
|
||||
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
|
||||
|
@ -5588,6 +5589,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
|
||||
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
|
||||
state double beforeStorageCommit = now();
|
||||
wait(data->storage.canCommit());
|
||||
state Future<Void> durable = data->storage.commit();
|
||||
++data->counters.kvCommits;
|
||||
state Future<Void> durableDelay = Void();
|
||||
|
|
Loading…
Reference in New Issue