foundationdb/fdbserver/KeyValueStoreRocksDB.actor.cpp

527 lines
18 KiB
C++

#ifdef SSD_ROCKSDB_EXPERIMENTAL
#include <rocksdb/cache.h>
#include <rocksdb/db.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/options.h>
#include <rocksdb/slice_transform.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include "flow/flow.h"
#include "flow/IThreadPool.h"
#endif // SSD_ROCKSDB_EXPERIMENTAL
#include "fdbserver/IKeyValueStore.h"
#include "flow/actorcompiler.h" // has to be last include
#ifdef SSD_ROCKSDB_EXPERIMENTAL
namespace {
rocksdb::Slice toSlice(StringRef s) {
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
}
StringRef toStringRef(rocksdb::Slice s) {
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
}
rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::ColumnFamilyOptions options;
options.level_compaction_dynamic_level_bytes = true;
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) {
options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS;
}
// Compact sstables when there's too much deleted stuff.
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
return options;
}
rocksdb::Options getOptions() {
rocksdb::Options options({}, getCFOptions());
options.avoid_unnecessary_blocking_io = true;
options.create_if_missing = true;
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
}
rocksdb::BlockBasedTableOptions bbOpts;
// TODO: Add a knob for the block cache size. (Default is 8 MB)
if (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0) {
// Prefix blooms are used during Seek.
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(SERVER_KNOBS->ROCKSDB_PREFIX_LEN));
// Also turn on bloom filters in the memtable.
// TODO: Make a knob for this as well.
options.memtable_prefix_bloom_size_ratio = 0.1;
// 5 -- Can be read by RocksDB's versions since 6.6.0. Full and partitioned
// filters use a generally faster and more accurate Bloom filter
// implementation, with a different schema.
// https://github.com/facebook/rocksdb/blob/b77569f18bfc77fb1d8a0b3218f6ecf571bc4988/include/rocksdb/table.h#L391
bbOpts.format_version = 5;
// Create and apply a bloom filter using the 10 bits
// which should yield a ~1% false positive rate:
// https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
bbOpts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10));
// The whole key blooms are only used for point lookups.
// https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
bbOpts.whole_key_filtering = false;
}
if (SERVER_KNOBS->ROCKSDB_BLOCK_CACHE_SIZE > 0) {
bbOpts.block_cache = rocksdb::NewLRUCache(SERVER_KNOBS->ROCKSDB_BLOCK_CACHE_SIZE);
}
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbOpts));
return options;
}
// Set some useful defaults desired for all reads.
rocksdb::ReadOptions getReadOptions() {
rocksdb::ReadOptions options;
options.background_purge_on_iterator_cleanup = true;
return options;
}
struct RocksDBKeyValueStore : IKeyValueStore {
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*;
struct Writer : IThreadPoolReceiver {
DB& db;
UID id;
explicit Writer(DB& db, UID id) : db(db), id(id) {}
~Writer() override {
if (db) {
delete db;
}
}
void init() override {}
Error statusToError(const rocksdb::Status& s) {
if (s == rocksdb::Status::IOError()) {
return io_error();
} else {
return unknown_error();
}
}
struct OpenAction : TypedAction<Writer, OpenAction> {
std::string path;
ThreadReturnPromise<Void> done;
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(OpenAction& a) {
// If the DB has already been initialized, this should be a no-op.
if (db != nullptr) {
TraceEvent(SevInfo, "RocksDB")
.detail("Path", a.path)
.detail("Method", "Open")
.detail("Skipping", "Already Open");
a.done.send(Void());
return;
}
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
"default", getCFOptions() } };
std::vector<rocksdb::ColumnFamilyHandle*> handle;
auto status = rocksdb::DB::Open(getOptions(), a.path, defaultCF, &handle, &db);
if (!status.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
a.done.sendError(statusToError(status));
} else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open");
a.done.send(Void());
}
}
struct DeleteVisitor : public rocksdb::WriteBatch::Handler {
VectorRef<KeyRangeRef>& deletes;
Arena& arena;
DeleteVisitor(VectorRef<KeyRangeRef>& deletes, Arena& arena) : deletes(deletes), arena(arena) {}
rocksdb::Status DeleteRangeCF(uint32_t /*column_family_id*/,
const rocksdb::Slice& begin,
const rocksdb::Slice& end) override {
KeyRangeRef kr(toStringRef(begin), toStringRef(end));
deletes.push_back_deep(arena, kr);
return rocksdb::Status::OK();
}
};
struct CommitAction : TypedAction<Writer, CommitAction> {
std::unique_ptr<rocksdb::WriteBatch> batchToCommit;
ThreadReturnPromise<Void> done;
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(CommitAction& a) {
Standalone<VectorRef<KeyRangeRef>> deletes;
DeleteVisitor dv(deletes, deletes.arena());
ASSERT(a.batchToCommit->Iterate(&dv).ok());
// If there are any range deletes, we should have added them to be deleted.
ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange());
rocksdb::WriteOptions options;
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
auto s = db->Write(options, a.batchToCommit.get());
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit");
a.done.sendError(statusToError(s));
} else {
a.done.send(Void());
for (const auto& keyRange : deletes) {
auto begin = toSlice(keyRange.begin);
auto end = toSlice(keyRange.end);
ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok());
}
}
}
struct CloseAction : TypedAction<Writer, CloseAction> {
ThreadReturnPromise<Void> done;
std::string path;
bool deleteOnClose;
CloseAction(std::string path, bool deleteOnClose) : path(path), deleteOnClose(deleteOnClose) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
void action(CloseAction& a) {
if (db == nullptr) {
a.done.send(Void());
return;
}
auto s = db->Close();
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close");
}
if (a.deleteOnClose) {
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
"default", getCFOptions() } };
s = rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Destroy");
} else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy");
}
}
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close");
a.done.send(Void());
}
};
struct Reader : IThreadPoolReceiver {
DB& db;
explicit Reader(DB& db) : db(db) {}
void init() override {}
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
Key key;
Optional<UID> debugID;
ThreadReturnPromise<Optional<Value>> result;
ReadValueAction(KeyRef key, Optional<UID> debugID) : key(key), debugID(debugID) {}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValueAction& a) {
Optional<TraceBatch> traceBatch;
if (a.debugID.present()) {
traceBatch = { TraceBatch{} };
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
rocksdb::PinnableSlice value;
auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
traceBatch.get().dump();
}
if (s.ok()) {
a.result.send(Value(toStringRef(value)));
} else {
if (!s.IsNotFound()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue");
}
a.result.send(Optional<Value>());
}
}
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
Key key;
int maxLength;
Optional<UID> debugID;
ThreadReturnPromise<Optional<Value>> result;
ReadValuePrefixAction(Key key, int maxLength, Optional<UID> debugID)
: key(key), maxLength(maxLength), debugID(debugID){};
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
};
void action(ReadValuePrefixAction& a) {
rocksdb::PinnableSlice value;
Optional<TraceBatch> traceBatch;
if (a.debugID.present()) {
traceBatch = { TraceBatch{} };
traceBatch.get().addEvent("GetValuePrefixDebug",
a.debugID.get().first(),
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValuePrefixDebug",
a.debugID.get().first(),
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
traceBatch.get().dump();
}
if (s.ok()) {
a.result.send(Value(StringRef(reinterpret_cast<const uint8_t*>(value.data()),
std::min(value.size(), size_t(a.maxLength)))));
} else {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix");
a.result.send(Optional<Value>());
}
}
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
KeyRange keys;
int rowLimit, byteLimit;
ThreadReturnPromise<RangeResult> result;
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit)
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
RangeResult result;
if (a.rowLimit == 0 || a.byteLimit == 0) {
a.result.send(result);
}
int accumulatedBytes = 0;
rocksdb::Status s;
auto options = getReadOptions();
// When using a prefix extractor, ensure that keys are returned in order even if they cross
// a prefix boundary.
options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
if (a.rowLimit >= 0) {
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Next();
}
s = cursor->status();
} else {
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
}
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Prev();
}
s = cursor->status();
}
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
}
result.more =
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
if (result.more) {
result.readThrough = result[result.size() - 1].key;
}
a.result.send(result);
}
};
DB db = nullptr;
std::string path;
UID id;
Reference<IThreadPool> writeThread;
Reference<IThreadPool> readThreads;
Promise<Void> errorPromise;
Promise<Void> closePromise;
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr");
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db), "fdb-rocksdb-re");
}
}
Future<Void> getError() override { return errorPromise.getFuture(); }
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
wait(self->readThreads->stop());
auto a = new Writer::CloseAction(self->path, deleteOnClose);
auto f = a->done.getFuture();
self->writeThread->post(a);
wait(f);
wait(self->writeThread->stop());
if (self->closePromise.canBeSet())
self->closePromise.send(Void());
if (self->errorPromise.canBeSet())
self->errorPromise.send(Never());
delete self;
}
Future<Void> onClosed() override { return closePromise.getFuture(); }
void dispose() override { doClose(this, true); }
void close() override { doClose(this, false); }
KeyValueStoreType getType() const override { return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); }
Future<Void> init() override {
std::unique_ptr<Writer::OpenAction> a(new Writer::OpenAction());
a->path = path;
auto res = a->done.getFuture();
writeThread->post(a.release());
return res;
}
void set(KeyValueRef kv, const Arena*) override {
if (writeBatch == nullptr) {
writeBatch.reset(new rocksdb::WriteBatch());
}
writeBatch->Put(toSlice(kv.key), toSlice(kv.value));
}
void clear(KeyRangeRef keyRange, const Arena*) override {
if (writeBatch == nullptr) {
writeBatch.reset(new rocksdb::WriteBatch());
}
writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end));
}
Future<Void> commit(bool) override {
// If there is nothing to write, don't write.
if (writeBatch == nullptr) {
return Void();
}
auto a = new Writer::CommitAction();
a->batchToCommit = std::move(writeBatch);
auto res = a->done.getFuture();
writeThread->post(a);
return res;
}
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override {
auto a = new Reader::ReadValueAction(key, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override {
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override {
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto res = a->result.getFuture();
readThreads->post(a);
return res;
}
StorageBytes getStorageBytes() const override {
uint64_t live = 0;
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kEstimateLiveDataSize, &live));
int64_t free;
int64_t total;
g_network->getDiskBytes(path, free, total);
return StorageBytes(free, total, live, free);
}
};
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL
IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
UID logID,
KeyValueStoreType storeType,
bool checkChecksums,
bool checkIntegrity) {
#ifdef SSD_ROCKSDB_EXPERIMENTAL
return new RocksDBKeyValueStore(path, logID);
#else
TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB");
ASSERT(false);
return nullptr;
#endif // SSD_ROCKSDB_EXPERIMENTAL
}
#ifdef SSD_ROCKSDB_EXPERIMENTAL
#include "flow/UnitTest.h"
namespace {
TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") {
state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") });
wait(kvStore->commit(false));
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
Future<Void> closed = kvStore->onClosed();
kvStore->close();
wait(closed);
kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
// Confirm that `init()` is idempotent.
wait(kvStore->init());
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
Future<Void> closed = kvStore->onClosed();
kvStore->close();
wait(closed);
platform::eraseDirectoryRecursive(rocksDBTestDir);
return Void();
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL