Merge pull request #5788 from Daniel-B-Smith/rocks-throttle

Throttle the number of concurrent reads to RocksDB
This commit is contained in:
Daniel Smith 2021-11-10 15:20:30 -05:00 committed by GitHub
commit 481cf9bb55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 152 additions and 20 deletions

View File

@ -349,6 +349,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 );
init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_QUEUE_WAIT, 1.0 );
init( ROCKSDB_READ_QUEUE_HARD_MAX, 1000 );
init( ROCKSDB_READ_QUEUE_SOFT_MAX, 500 );
init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 100 );
init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 50 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -281,6 +281,11 @@ public:
double ROCKSDB_READ_VALUE_TIMEOUT;
double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
double ROCKSDB_READ_RANGE_TIMEOUT;
double ROCKSDB_READ_QUEUE_WAIT;
int ROCKSDB_READ_QUEUE_SOFT_MAX;
int ROCKSDB_READ_QUEUE_HARD_MAX;
int ROCKSDB_FETCH_QUEUE_SOFT_MAX;
int ROCKSDB_FETCH_QUEUE_HARD_MAX;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -9,6 +9,7 @@
#include <rocksdb/table.h>
#include <rocksdb/version.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include "fdbclient/SystemData.h"
#include "fdbserver/CoroFlow.h"
#include "flow/flow.h"
#include "flow/IThreadPool.h"
@ -109,6 +110,19 @@ rocksdb::ReadOptions getReadOptions() {
return options;
}
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e(SevInfo, "RocksDBFlowLock");
e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters());
e.detail("FetchAvailable", fetchLock->available());
e.detail("FetchActivePermits", fetchLock->activePermits());
e.detail("FetchWaiters", fetchLock->waiters());
}
}
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics, rocksdb::DB* db) {
state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = {
{ "StallMicros", rocksdb::STALL_MICROS, 0 },
@ -231,7 +245,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
std::string path;
ThreadReturnPromise<Void> done;
Optional<Future<Void>>& metrics;
OpenAction(std::string path, Optional<Future<Void>>& metrics) : path(std::move(path)), metrics(metrics) {}
const FlowLock* readLock;
const FlowLock* fetchLock;
OpenAction(std::string path,
Optional<Future<Void>>& metrics,
const FlowLock* readLock,
const FlowLock* fetchLock)
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
@ -251,7 +271,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// metric logger in simulation.
if (!g_network->isSimulated()) {
onMainThread([&] {
a.metrics = rocksDBMetricLogger(options.statistics, db);
a.metrics =
rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock);
return Future<bool>(true);
}).blockUntilReady();
}
@ -558,8 +579,27 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Future<Void> openFuture;
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
Optional<Future<Void>> metrics;
FlowLock readSemaphore;
int numReadWaiters;
FlowLock fetchSemaphore;
int numFetchWaiters;
explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) {
struct Counters {
CounterCollection cc;
Counter immediateThrottle;
Counter failedToAcquire;
Counters()
: cc("RocksDBThrottle"), immediateThrottle("ImmediateThrottle", cc), failedToAcquire("failedToAcquire", cc) {}
};
Counters counters;
explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
@ -614,7 +654,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (openFuture.isValid()) {
return openFuture;
}
auto a = std::make_unique<Writer::OpenAction>(path, metrics);
auto a = std::make_unique<Writer::OpenAction>(path, metrics, &readSemaphore, &fetchSemaphore);
openFuture = a->done.getFuture();
writeThread->post(a.release());
return openFuture;
@ -647,28 +687,109 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return res;
}
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
void checkWaiters(const FlowLock& semaphore, int maxWaiters) {
if (semaphore.waiters() > maxWaiters) {
++counters.immediateThrottle;
throw server_overloaded();
}
}
// We don't throttle eager reads and reads to the FF keyspace because FDB struggles when those reads fail.
// Thus far, they have been low enough volume to not cause an issue.
static bool shouldThrottle(IKeyValueStore::ReadType type, KeyRef key) {
return type != IKeyValueStore::ReadType::EAGER && !(key.startsWith(systemKeys.begin));
}
ACTOR template <class Action>
static Future<Optional<Value>> read(Action* action, FlowLock* semaphore, IThreadPool* pool, Counter* counter) {
state std::unique_ptr<Action> a(action);
state Optional<Void> slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT));
if (!slot.present()) {
++(*counter);
throw server_overloaded();
}
state FlowLock::Releaser release(*semaphore);
auto fut = a->result.getFuture();
pool->post(a.release());
Optional<Value> result = wait(fut);
return result;
}
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType type, Optional<UID> debugID) override {
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValueAction>(key, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
IKeyValueStore::ReadType,
IKeyValueStore::ReadType type,
Optional<UID> debugID) override {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, debugID);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
ACTOR static Future<Standalone<RangeResultRef>> read(Reader::ReadRangeAction* action,
FlowLock* semaphore,
IThreadPool* pool,
Counter* counter) {
state std::unique_ptr<Reader::ReadRangeAction> a(action);
state Optional<Void> slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT));
if (!slot.present()) {
++(*counter);
throw server_overloaded();
}
state FlowLock::Releaser release(*semaphore);
auto fut = a->result.getFuture();
pool->post(a.release());
Standalone<RangeResultRef> result = wait(fut);
return result;
}
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
IKeyValueStore::ReadType type) override {
if (!shouldThrottle(type, keys.begin)) {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? numFetchWaiters : numReadWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadRangeAction>(keys, rowLimit, byteLimit);
return read(a.release(), &semaphore, readThreads.getPtr(), &counters.failedToAcquire);
}
StorageBytes getStorageBytes() const override {

View File

@ -84,6 +84,7 @@ bool canReplyWith(Error e) {
case error_code_process_behind:
case error_code_watch_cancelled:
case error_code_unknown_change_feed:
case error_code_server_overloaded:
// case error_code_all_alternatives_failed:
return true;
default:
@ -4093,7 +4094,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
} catch (Error& e) {
if (e.code() != error_code_end_of_stream && e.code() != error_code_connection_failed &&
e.code() != error_code_transaction_too_old && e.code() != error_code_future_version &&
e.code() != error_code_process_behind) {
e.code() != error_code_process_behind && e.code() != error_code_server_overloaded) {
throw;
}
if (nfk == keys.begin) {