diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 6ee837b8a1..0356cde2e8 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -349,6 +349,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 ); init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 ); init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 ); + init( ROCKSDB_READ_QUEUE_WAIT, 1. ); + init( ROCKSDB_READ_QUEUE_HARD_MAX, 128 ); + init( ROCKSDB_READ_QUEUE_SOFT_MAX, 64 ); + init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 16 ); + init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 8 ); // Leader election bool longLeaderElection = randomize && BUGGIFY; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index b8357535c8..33d528adba 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -281,6 +281,11 @@ public: double ROCKSDB_READ_VALUE_TIMEOUT; double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT; double ROCKSDB_READ_RANGE_TIMEOUT; + double ROCKSDB_READ_QUEUE_WAIT; + int ROCKSDB_READ_QUEUE_SOFT_MAX; + int ROCKSDB_READ_QUEUE_HARD_MAX; + int ROCKSDB_FETCH_QUEUE_SOFT_MAX; + int ROCKSDB_FETCH_QUEUE_HARD_MAX; // Leader election int MAX_NOTIFICATIONS; diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 4f5f15f026..5764262f68 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -109,6 +109,19 @@ rocksdb::ReadOptions getReadOptions() { return options; } +ACTOR Future flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) { + loop { + wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY)); + TraceEvent e(SevInfo, "RocksDBFlowLock"); + e.detail("ReadAvailable", readLock->available()); + e.detail("ReadActivePermits", readLock->activePermits()); + e.detail("ReadWaiters", readLock->waiters()); + e.detail("FetchAvailable", fetchLock->available()); + e.detail("FetchActivePermits", fetchLock->activePermits()); + e.detail("FetchWaiters", fetchLock->waiters()); + } +} + ACTOR Future rocksDBMetricLogger(std::shared_ptr statistics, rocksdb::DB* db) { state std::vector> tickerStats = { { "StallMicros", rocksdb::STALL_MICROS, 0 }, @@ -231,7 +244,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { std::string path; ThreadReturnPromise done; Optional>& metrics; - OpenAction(std::string path, Optional>& metrics) : path(std::move(path)), metrics(metrics) {} + const FlowLock* readLock; + const FlowLock* fetchLock; + OpenAction(std::string path, + Optional>& metrics, + const FlowLock* readLock, + const FlowLock* fetchLock) + : path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; @@ -251,7 +270,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { // metric logger in simulation. if (!g_network->isSimulated()) { onMainThread([&] { - a.metrics = rocksDBMetricLogger(options.statistics, db); + a.metrics = + rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock); return Future(true); }).blockUntilReady(); } @@ -558,8 +578,16 @@ struct RocksDBKeyValueStore : IKeyValueStore { Future openFuture; std::unique_ptr writeBatch; Optional> metrics; + FlowLock readSemaphore; + int readWaiters; + FlowLock fetchSemaphore; + int fetchWaiters; - explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) { + explicit RocksDBKeyValueStore(const std::string& path, UID id) + : path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), + fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), + readWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), + fetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) { // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When @@ -614,7 +642,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (openFuture.isValid()) { return openFuture; } - auto a = std::make_unique(path, metrics); + auto a = std::make_unique(path, metrics, &readSemaphore, &fetchSemaphore); openFuture = a->done.getFuture(); writeThread->post(a.release()); return openFuture; @@ -647,28 +675,105 @@ struct RocksDBKeyValueStore : IKeyValueStore { return res; } - Future> readValue(KeyRef key, IKeyValueStore::ReadType, Optional debugID) override { - auto a = new Reader::ReadValueAction(key, debugID); - auto res = a->result.getFuture(); - readThreads->post(a); - return res; + void checkWaiters(const FlowLock& semaphore, int maxWaiters) { + if (semaphore.waiters() > maxWaiters) { + throw server_overloaded(); + } + } + + // We don't throttle eager reads and reads to the FF keyspace because FDB struggles when those reads fail. + // Thus far, they have been low enough volume to not cause an issue. + static bool shouldThrottle(IKeyValueStore::ReadType type, KeyRef key) { + return type != IKeyValueStore::ReadType::EAGER && !(key.size() && key[0] == 0xFF); + } + + ACTOR template + static Future> read(Action* action, FlowLock* semaphore, IThreadPool* pool) { + state std::unique_ptr a(action); + state Optional slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT)); + if (!slot.present()) { + throw server_overloaded(); + } + + state FlowLock::Releaser release(*semaphore); + + auto fut = a->result.getFuture(); + pool->post(a.release()); + Optional result = wait(fut); + + return result; + } + + Future> readValue(KeyRef key, IKeyValueStore::ReadType type, Optional debugID) override { + if (!shouldThrottle(type, key)) { + auto a = new Reader::ReadValueAction(key, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore; + int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters; + + checkWaiters(semaphore, maxWaiters); + auto a = std::make_unique(key, debugID); + return read(a.release(), &semaphore, readThreads.getPtr()); } Future> readValuePrefix(KeyRef key, int maxLength, - IKeyValueStore::ReadType, + IKeyValueStore::ReadType type, Optional debugID) override { - auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID); - auto res = a->result.getFuture(); - readThreads->post(a); - return res; + if (!shouldThrottle(type, key)) { + auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore; + int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters; + + checkWaiters(semaphore, maxWaiters); + auto a = std::make_unique(key, maxLength, debugID); + return read(a.release(), &semaphore, readThreads.getPtr()); } - Future readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override { - auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit); - auto res = a->result.getFuture(); - readThreads->post(a); - return res; + ACTOR static Future> read(Reader::ReadRangeAction* action, + FlowLock* semaphore, + IThreadPool* pool) { + state std::unique_ptr a(action); + state Optional slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT)); + if (!slot.present()) { + throw server_overloaded(); + } + + state FlowLock::Releaser release(*semaphore); + + auto fut = a->result.getFuture(); + pool->post(a.release()); + Standalone result = wait(fut); + + return result; + } + + Future readRange(KeyRangeRef keys, + int rowLimit, + int byteLimit, + IKeyValueStore::ReadType type) override { + if (!shouldThrottle(type, keys.begin)) { + auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore; + int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters; + + checkWaiters(semaphore, maxWaiters); + auto a = std::make_unique(keys, rowLimit, byteLimit); + return read(a.release(), &semaphore, readThreads.getPtr()); } StorageBytes getStorageBytes() const override {