Merge pull request #5433 from Daniel-B-Smith/rocksdb-range-error

Fix error handling of reads.
This commit is contained in:
Daniel Smith 2021-08-23 13:21:46 -04:00 committed by GitHub
commit 452400eb83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 18 deletions

View File

@ -181,6 +181,16 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
}
}
Error statusToError(const rocksdb::Status& s) {
if (s.IsIOError()) {
return io_error();
} else if (s.IsTimedOut()) {
return transaction_too_old();
} else {
return unknown_error();
}
}
struct RocksDBKeyValueStore : IKeyValueStore {
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*;
@ -199,14 +209,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
void init() override {}
Error statusToError(const rocksdb::Status& s) {
if (s == rocksdb::Status::IOError()) {
return io_error();
} else {
return unknown_error();
}
}
struct OpenAction : TypedAction<Writer, OpenAction> {
std::string path;
ThreadReturnPromise<Void> done;
@ -365,11 +367,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
if (s.ok()) {
a.result.send(Value(toStringRef(value)));
} else {
if (!s.IsNotFound()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue");
}
} else if (s.IsNotFound()) {
a.result.send(Optional<Value>());
} else {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue");
a.result.sendError(statusToError(s));
}
}
@ -415,13 +417,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (s.ok()) {
a.result.send(Value(StringRef(reinterpret_cast<const uint8_t*>(value.data()),
std::min(value.size(), size_t(a.maxLength)))));
} else {
if (!s.IsNotFound()) {
TraceEvent(SevError, "RocksDBError")
.detail("Error", s.ToString())
.detail("Method", "ReadValuePrefix");
}
} else if (s.IsNotFound()) {
a.result.send(Optional<Value>());
} else {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix");
a.result.sendError(statusToError(s));
}
}
@ -513,6 +513,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
a.result.sendError(statusToError(s));
return;
}
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);