Rocksdb write rate limiter.
This commit is contained in:
parent
034b934ecd
commit
162bce7a58
|
@ -357,6 +357,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ROCKSDB_HISTOGRAMS_SAMPLE_RATE, 0.001 ); if( randomize && BUGGIFY ) ROCKSDB_HISTOGRAMS_SAMPLE_RATE = 0;
|
||||
init( ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME, 30.0 ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME = 0.1;
|
||||
init( ROCKSDB_READ_RANGE_REUSE_ITERATORS, true ); if( randomize && BUGGIFY ) ROCKSDB_READ_RANGE_REUSE_ITERATORS = deterministicRandom()->coinflip() ? true : false;
|
||||
// Set to 0 to disable rocksdb write rate limiting. Rate limiter unit: bytes per second.
|
||||
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
|
||||
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
|
||||
init( ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE, true );
|
||||
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
|
|
@ -291,6 +291,8 @@ public:
|
|||
double ROCKSDB_HISTOGRAMS_SAMPLE_RATE;
|
||||
double ROCKSDB_READ_RANGE_ITERATOR_REFRESH_TIME;
|
||||
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
|
||||
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
|
||||
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <rocksdb/table.h>
|
||||
#include <rocksdb/version.h>
|
||||
#include <rocksdb/utilities/table_properties_collectors.h>
|
||||
#include <rocksdb/rate_limiter.h>
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/CoroFlow.h"
|
||||
#include "flow/flow.h"
|
||||
|
@ -383,6 +384,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
struct Writer : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
UID id;
|
||||
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
|
||||
Reference<Histogram> commitLatencyHistogram;
|
||||
Reference<Histogram> commitActionHistogram;
|
||||
Reference<Histogram> commitQueueWaitHistogram;
|
||||
|
@ -392,6 +394,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
|
||||
explicit Writer(DB& db, UID id, std::shared_ptr<ReadIteratorPool> readIterPool)
|
||||
: db(db), id(id), readIterPool(readIterPool),
|
||||
rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0
|
||||
? rocksdb::NewGenericRateLimiter(
|
||||
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, // rate_bytes_per_sec
|
||||
100 * 1000, // refill_period_us
|
||||
10, // fairness
|
||||
rocksdb::RateLimiter::Mode::kWritesOnly,
|
||||
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE)
|
||||
: nullptr),
|
||||
commitLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_COMMIT_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
|
@ -435,12 +445,20 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
auto options = getOptions();
|
||||
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
||||
options.rate_limiter = rateLimiter;
|
||||
}
|
||||
auto status = rocksdb::DB::Open(options, a.path, defaultCF, &handle, &db);
|
||||
if (!status.ok()) {
|
||||
logRocksDBError(status, "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
} else {
|
||||
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Open");
|
||||
TraceEvent(SevInfo, "RocksDB")
|
||||
.detail("Path", a.path)
|
||||
.detail("Method", "Open")
|
||||
.detail("KnobRocksDBWriteRateLimiterBytesPerSec",
|
||||
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC)
|
||||
.detail("KnobRocksDBWriteRateLimiterAutoTune", SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE);
|
||||
if (g_network->isSimulated()) {
|
||||
// The current thread and main thread are same when the code runs in simulation.
|
||||
// blockUntilReady() is getting the thread into deadlock state, so directly calling
|
||||
|
@ -503,6 +521,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
|
||||
|
||||
double writeBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
if (rateLimiter) {
|
||||
// Controls the total write rate of compaction and flush in bytes per second.
|
||||
// Request for batchToCommit bytes. If this request cannot be satisfied, the call is blocked.
|
||||
rateLimiter->Request(a.batchToCommit->GetDataSize() /* bytes */, rocksdb::Env::IO_HIGH);
|
||||
}
|
||||
auto s = db->Write(options, a.batchToCommit.get());
|
||||
readIterPool->update();
|
||||
if (a.getHistograms) {
|
||||
|
|
Loading…
Reference in New Issue