Utilize read types to do selective throttling

This commit is contained in:
Daniel Smith 2021-10-15 16:31:01 -04:00
parent 470896bdc4
commit 66520eb1c1
3 changed files with 134 additions and 19 deletions

View File

@ -349,6 +349,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 );
init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 );
init( ROCKSDB_READ_QUEUE_WAIT, 1. );
init( ROCKSDB_READ_QUEUE_HARD_MAX, 128 );
init( ROCKSDB_READ_QUEUE_SOFT_MAX, 64 );
init( ROCKSDB_FETCH_QUEUE_HARD_MAX, 16 );
init( ROCKSDB_FETCH_QUEUE_SOFT_MAX, 8 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -281,6 +281,11 @@ public:
double ROCKSDB_READ_VALUE_TIMEOUT;
double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
double ROCKSDB_READ_RANGE_TIMEOUT;
double ROCKSDB_READ_QUEUE_WAIT;
int ROCKSDB_READ_QUEUE_SOFT_MAX;
int ROCKSDB_READ_QUEUE_HARD_MAX;
int ROCKSDB_FETCH_QUEUE_SOFT_MAX;
int ROCKSDB_FETCH_QUEUE_HARD_MAX;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -109,6 +109,19 @@ rocksdb::ReadOptions getReadOptions() {
return options;
}
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e(SevInfo, "RocksDBFlowLock");
e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters());
e.detail("FetchAvailable", fetchLock->available());
e.detail("FetchActivePermits", fetchLock->activePermits());
e.detail("FetchWaiters", fetchLock->waiters());
}
}
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics, rocksdb::DB* db) {
state std::vector<std::tuple<const char*, uint32_t, uint64_t>> tickerStats = {
{ "StallMicros", rocksdb::STALL_MICROS, 0 },
@ -231,7 +244,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
std::string path;
ThreadReturnPromise<Void> done;
Optional<Future<Void>>& metrics;
OpenAction(std::string path, Optional<Future<Void>>& metrics) : path(std::move(path)), metrics(metrics) {}
const FlowLock* readLock;
const FlowLock* fetchLock;
OpenAction(std::string path,
Optional<Future<Void>>& metrics,
const FlowLock* readLock,
const FlowLock* fetchLock)
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
@ -251,7 +270,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// metric logger in simulation.
if (!g_network->isSimulated()) {
onMainThread([&] {
a.metrics = rocksDBMetricLogger(options.statistics, db);
a.metrics =
rocksDBMetricLogger(options.statistics, db) && flowLockLogger(a.readLock, a.fetchLock);
return Future<bool>(true);
}).blockUntilReady();
}
@ -558,8 +578,16 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Future<Void> openFuture;
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
Optional<Future<Void>> metrics;
FlowLock readSemaphore;
int readWaiters;
FlowLock fetchSemaphore;
int fetchWaiters;
explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) {
explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
readWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
@ -614,7 +642,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (openFuture.isValid()) {
return openFuture;
}
auto a = std::make_unique<Writer::OpenAction>(path, metrics);
auto a = std::make_unique<Writer::OpenAction>(path, metrics, &readSemaphore, &fetchSemaphore);
openFuture = a->done.getFuture();
writeThread->post(a.release());
return openFuture;
@ -647,28 +675,105 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return res;
}
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
void checkWaiters(const FlowLock& semaphore, int maxWaiters) {
if (semaphore.waiters() > maxWaiters) {
throw server_overloaded();
}
}
// We don't throttle eager reads and reads to the FF keyspace because FDB struggles when those reads fail.
// Thus far, they have been low enough volume to not cause an issue.
static bool shouldThrottle(IKeyValueStore::ReadType type, KeyRef key) {
return type != IKeyValueStore::ReadType::EAGER && !(key.size() && key[0] == 0xFF);
}
ACTOR template <class Action>
static Future<Optional<Value>> read(Action* action, FlowLock* semaphore, IThreadPool* pool) {
state std::unique_ptr<Action> a(action);
state Optional<Void> slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT));
if (!slot.present()) {
throw server_overloaded();
}
state FlowLock::Releaser release(*semaphore);
auto fut = a->result.getFuture();
pool->post(a.release());
Optional<Value> result = wait(fut);
return result;
}
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType type, Optional<UID> debugID) override {
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValueAction>(key, debugID);
return read(a.release(), &semaphore, readThreads.getPtr());
}
Future<Optional<Value>> readValuePrefix(KeyRef key,
int maxLength,
IKeyValueStore::ReadType,
IKeyValueStore::ReadType type,
Optional<UID> debugID) override {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
if (!shouldThrottle(type, key)) {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadValuePrefixAction>(key, maxLength, debugID);
return read(a.release(), &semaphore, readThreads.getPtr());
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
ACTOR static Future<Standalone<RangeResultRef>> read(Reader::ReadRangeAction* action,
FlowLock* semaphore,
IThreadPool* pool) {
state std::unique_ptr<Reader::ReadRangeAction> a(action);
state Optional<Void> slot = wait(timeout(semaphore->take(), SERVER_KNOBS->ROCKSDB_READ_QUEUE_WAIT));
if (!slot.present()) {
throw server_overloaded();
}
state FlowLock::Releaser release(*semaphore);
auto fut = a->result.getFuture();
pool->post(a.release());
Standalone<RangeResultRef> result = wait(fut);
return result;
}
Future<RangeResult> readRange(KeyRangeRef keys,
int rowLimit,
int byteLimit,
IKeyValueStore::ReadType type) override {
if (!shouldThrottle(type, keys.begin)) {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
auto& semaphore = (type == IKeyValueStore::ReadType::FETCH) ? fetchSemaphore : readSemaphore;
int maxWaiters = (type == IKeyValueStore::ReadType::FETCH) ? fetchWaiters : readWaiters;
checkWaiters(semaphore, maxWaiters);
auto a = std::make_unique<Reader::ReadRangeAction>(keys, rowLimit, byteLimit);
return read(a.release(), &semaphore, readThreads.getPtr());
}
StorageBytes getStorageBytes() const override {