Add error handling in RocksDB KVS. (#6277)
This commit is contained in:
parent
bc2c3df61f
commit
c605226a56
|
@ -3,6 +3,7 @@
|
|||
#include <rocksdb/cache.h>
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/filter_policy.h>
|
||||
#include <rocksdb/listener.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/slice_transform.h>
|
||||
#include <rocksdb/statistics.h>
|
||||
|
@ -36,6 +37,75 @@ static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 :
|
|||
"Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
|
||||
|
||||
namespace {
|
||||
using rocksdb::BackgroundErrorReason;
|
||||
|
||||
// Returns string representation of RocksDB background error reason.
|
||||
// Error reason code:
|
||||
// https://github.com/facebook/rocksdb/blob/12d798ac06bcce36be703b057d5f5f4dab3b270c/include/rocksdb/listener.h#L125
|
||||
// This function needs to be updated when error code changes.
|
||||
std::string getErrorReason(BackgroundErrorReason reason) {
|
||||
switch (reason) {
|
||||
case BackgroundErrorReason::kFlush:
|
||||
return format("%d Flush", reason);
|
||||
case BackgroundErrorReason::kCompaction:
|
||||
return format("%d Compaction", reason);
|
||||
case BackgroundErrorReason::kWriteCallback:
|
||||
return format("%d WriteCallback", reason);
|
||||
case BackgroundErrorReason::kMemTable:
|
||||
return format("%d MemTable", reason);
|
||||
case BackgroundErrorReason::kManifestWrite:
|
||||
return format("%d ManifestWrite", reason);
|
||||
case BackgroundErrorReason::kFlushNoWAL:
|
||||
return format("%d FlushNoWAL", reason);
|
||||
case BackgroundErrorReason::kManifestWriteNoWAL:
|
||||
return format("%d ManifestWriteNoWAL", reason);
|
||||
default:
|
||||
return format("%d Unknown", reason);
|
||||
}
|
||||
}
|
||||
// Background error handling is tested with Chaos test.
|
||||
// TODO: Test background error in simulation. RocksDB doesn't use flow IO in simulation, which limits our ability to
|
||||
// inject IO errors. We could implement rocksdb::FileSystem using flow IO to unblock simulation. Also, trace event is
|
||||
// not available on background threads because trace event requires setting up special thread locals. Using trace event
|
||||
// could potentially cause segmentation fault.
|
||||
class RocksDBErrorListener : public rocksdb::EventListener {
|
||||
public:
|
||||
RocksDBErrorListener(){};
|
||||
void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* bg_error) override {
|
||||
TraceEvent(SevError, "RocksDBBGError")
|
||||
.detail("Reason", getErrorReason(reason))
|
||||
.detail("RocksDBSeverity", bg_error->severity())
|
||||
.detail("Status", bg_error->ToString());
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (!errorPromise.isValid())
|
||||
return;
|
||||
// RocksDB generates two types of background errors, IO Error and Corruption
|
||||
// Error type and severity map could be found at
|
||||
// https://github.com/facebook/rocksdb/blob/2e09a54c4fb82e88bcaa3e7cfa8ccbbbbf3635d5/db/error_handler.cc#L138.
|
||||
// All background errors will be treated as storage engine failure. Send the error to storage server.
|
||||
if (bg_error->IsIOError()) {
|
||||
errorPromise.sendError(io_error());
|
||||
} else if (bg_error->IsCorruption()) {
|
||||
errorPromise.sendError(file_corrupt());
|
||||
} else {
|
||||
errorPromise.sendError(unknown_error());
|
||||
}
|
||||
}
|
||||
Future<Void> getFuture() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
return errorPromise.getFuture();
|
||||
}
|
||||
~RocksDBErrorListener() {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
if (!errorPromise.isValid())
|
||||
return;
|
||||
errorPromise.send(Never());
|
||||
}
|
||||
|
||||
private:
|
||||
ThreadReturnPromise<Void> errorPromise;
|
||||
std::mutex mutex;
|
||||
};
|
||||
using DB = rocksdb::DB*;
|
||||
|
||||
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
|
||||
|
@ -432,11 +502,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
Optional<Future<Void>>& metrics;
|
||||
const FlowLock* readLock;
|
||||
const FlowLock* fetchLock;
|
||||
std::shared_ptr<RocksDBErrorListener> errorListener;
|
||||
OpenAction(std::string path,
|
||||
Optional<Future<Void>>& metrics,
|
||||
const FlowLock* readLock,
|
||||
const FlowLock* fetchLock)
|
||||
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {}
|
||||
const FlowLock* fetchLock,
|
||||
std::shared_ptr<RocksDBErrorListener> errorListener)
|
||||
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
|
||||
errorListener(errorListener) {}
|
||||
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
|
@ -445,6 +518,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
auto options = getOptions();
|
||||
options.listeners.push_back(a.errorListener);
|
||||
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
||||
options.rate_limiter = rateLimiter;
|
||||
}
|
||||
|
@ -912,7 +986,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
UID id;
|
||||
Reference<IThreadPool> writeThread;
|
||||
Reference<IThreadPool> readThreads;
|
||||
Promise<Void> errorPromise;
|
||||
std::shared_ptr<RocksDBErrorListener> errorListener;
|
||||
Future<Void> errorFuture;
|
||||
Promise<Void> closePromise;
|
||||
Future<Void> openFuture;
|
||||
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
|
||||
|
@ -939,7 +1014,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
|
||||
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
|
||||
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
|
||||
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX) {
|
||||
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
|
||||
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(errorListener->getFuture()) {
|
||||
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
|
||||
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
|
||||
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
|
||||
|
@ -964,7 +1040,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> getError() const override { return errorPromise.getFuture(); }
|
||||
Future<Void> getError() const override { return errorFuture; }
|
||||
|
||||
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
|
||||
// The metrics future retains a reference to the DB, so stop it before we delete it.
|
||||
|
@ -979,8 +1055,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
wait(self->writeThread->stop());
|
||||
if (self->closePromise.canBeSet())
|
||||
self->closePromise.send(Void());
|
||||
if (self->errorPromise.canBeSet())
|
||||
self->errorPromise.send(Never());
|
||||
delete self;
|
||||
}
|
||||
|
||||
|
@ -996,7 +1070,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (openFuture.isValid()) {
|
||||
return openFuture;
|
||||
}
|
||||
auto a = std::make_unique<Writer::OpenAction>(path, metrics, &readSemaphore, &fetchSemaphore);
|
||||
auto a = std::make_unique<Writer::OpenAction>(path, metrics, &readSemaphore, &fetchSemaphore, errorListener);
|
||||
openFuture = a->done.getFuture();
|
||||
writeThread->post(a.release());
|
||||
return openFuture;
|
||||
|
|
|
@ -92,20 +92,22 @@ public:
|
|||
return promise.getFuture();
|
||||
}
|
||||
|
||||
void send(T const& t) { // Can be called safely from another thread. Call send or sendError at most once.
|
||||
template <class U>
|
||||
void send(U&& t) { // Can be called safely from another thread. Call send or sendError at most once.
|
||||
Promise<Void> signal;
|
||||
tagAndForward(&promise, t, signal.getFuture());
|
||||
g_network->onMainThread(std::move(signal),
|
||||
g_network->isOnMainThread() ? incrementPriorityIfEven(g_network->getCurrentTask())
|
||||
: TaskPriority::DefaultOnMainThread);
|
||||
}
|
||||
void sendError(Error const& e) { // Can be called safely from another thread. Call send or sendError at most once.
|
||||
void sendError(Error e) { // Can be called safely from another thread. Call send or sendError at most once.
|
||||
Promise<Void> signal;
|
||||
tagAndForwardError(&promise, e, signal.getFuture());
|
||||
g_network->onMainThread(std::move(signal),
|
||||
g_network->isOnMainThread() ? incrementPriorityIfEven(g_network->getCurrentTask())
|
||||
: TaskPriority::DefaultOnMainThread);
|
||||
}
|
||||
bool isValid() { return promise.isValid(); }
|
||||
|
||||
private:
|
||||
Promise<T> promise;
|
||||
|
|
|
@ -31,7 +31,9 @@ struct ThreadNameReceiver final : IThreadPoolReceiver {
|
|||
return;
|
||||
}
|
||||
std::string s = name;
|
||||
ASSERT(a.name.isValid());
|
||||
a.name.send(std::move(s));
|
||||
ASSERT(!a.name.isValid());
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1202,17 +1202,17 @@ Future<T> brokenPromiseToMaybeDelivered(Future<T> in) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
void tagAndForward(Promise<T>* pOutputPromise, T value, Future<Void> signal) {
|
||||
ACTOR template <class T, class U>
|
||||
void tagAndForward(Promise<T>* pOutputPromise, U value, Future<Void> signal) {
|
||||
state Promise<T> out(std::move(*pOutputPromise));
|
||||
wait(signal);
|
||||
out.send(value);
|
||||
out.send(std::move(value));
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
void tagAndForward(PromiseStream<T>* pOutput, T value, Future<Void> signal) {
|
||||
wait(signal);
|
||||
pOutput->send(value);
|
||||
pOutput->send(std::move(value));
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
|
|
Loading…
Reference in New Issue