Revert "Revert "Initial RocksDB""
This commit is contained in:
parent
e4fb5449fa
commit
acbfe2e4c9
|
@ -575,6 +575,7 @@
|
|||
"ssd-1",
|
||||
"ssd-2",
|
||||
"ssd-redwood-experimental",
|
||||
"ssd-rocksdb-experimental",
|
||||
"memory"
|
||||
]},
|
||||
"coordinators_count":1,
|
||||
|
|
|
@ -268,6 +268,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
result["storage_engine"] = "ssd-2";
|
||||
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_REDWOOD_V1 ) {
|
||||
result["storage_engine"] = "ssd-redwood-experimental";
|
||||
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) {
|
||||
result["storage_engine"] = "ssd-rocksdb-experimental";
|
||||
} else if( tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY ) {
|
||||
result["storage_engine"] = "memory-1";
|
||||
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE ) {
|
||||
|
@ -498,7 +500,7 @@ bool DatabaseConfiguration::isExcludedServer( NetworkAddressList a ) const {
|
|||
return get( encodeExcludedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() ||
|
||||
get( encodeExcludedServersKey( AddressExclusion(a.address.ip) ) ).present() ||
|
||||
get( encodeFailedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() ||
|
||||
get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() ||
|
||||
get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() ||
|
||||
( a.secondaryAddress.present() && (
|
||||
get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port) ) ).present() ||
|
||||
get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip) ) ).present() ||
|
||||
|
|
|
@ -670,6 +670,7 @@ struct KeyValueStoreType {
|
|||
SSD_BTREE_V2,
|
||||
SSD_REDWOOD_V1,
|
||||
MEMORY_RADIXTREE,
|
||||
SSD_ROCKSDB_V1,
|
||||
END
|
||||
};
|
||||
|
||||
|
@ -689,6 +690,7 @@ struct KeyValueStoreType {
|
|||
case SSD_BTREE_V1: return "ssd-1";
|
||||
case SSD_BTREE_V2: return "ssd-2";
|
||||
case SSD_REDWOOD_V1: return "ssd-redwood-experimental";
|
||||
case SSD_ROCKSDB_V1: return "ssd-rocksdb-experimental";
|
||||
case MEMORY: return "memory";
|
||||
case MEMORY_RADIXTREE: return "memory-radixtree-beta";
|
||||
default: return "unknown";
|
||||
|
|
|
@ -101,6 +101,9 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
|
|||
} else if (mode == "ssd-redwood-experimental") {
|
||||
logType = KeyValueStoreType::SSD_BTREE_V2;
|
||||
storeType = KeyValueStoreType::SSD_REDWOOD_V1;
|
||||
} else if (mode == "ssd-rocksdb-experimental") {
|
||||
logType = KeyValueStoreType::SSD_BTREE_V2;
|
||||
storeType = KeyValueStoreType::SSD_ROCKSDB_V1;
|
||||
} else if (mode == "memory" || mode == "memory-2") {
|
||||
logType = KeyValueStoreType::SSD_BTREE_V2;
|
||||
storeType= KeyValueStoreType::MEMORY;
|
||||
|
|
|
@ -150,7 +150,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"fractional_cost": 0.0,
|
||||
"estimated_cost":{
|
||||
"hz": 0.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
|
@ -612,6 +612,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"ssd-1",
|
||||
"ssd-2",
|
||||
"ssd-redwood-experimental",
|
||||
"ssd-rocksdb-experimental",
|
||||
"memory",
|
||||
"memory-1",
|
||||
"memory-2",
|
||||
|
|
|
@ -29,6 +29,7 @@ set(FDBSERVER_SRCS
|
|||
IVersionedStore.h
|
||||
KeyValueStoreCompressTestData.actor.cpp
|
||||
KeyValueStoreMemory.actor.cpp
|
||||
KeyValueStoreRocksDB.actor.cpp
|
||||
KeyValueStoreSQLite.actor.cpp
|
||||
Knobs.cpp
|
||||
Knobs.h
|
||||
|
|
|
@ -87,6 +87,7 @@ protected:
|
|||
|
||||
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false );
|
||||
extern IKeyValueStore* keyValueStoreRedwoodV1( std::string const& filename, UID logID);
|
||||
extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false);
|
||||
extern IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit,
|
||||
std::string ext = "fdq",
|
||||
KeyValueStoreType storeType = KeyValueStoreType::MEMORY);
|
||||
|
@ -102,8 +103,10 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con
|
|||
return keyValueStoreMemory( filename, logID, memoryLimit );
|
||||
case KeyValueStoreType::SSD_REDWOOD_V1:
|
||||
return keyValueStoreRedwoodV1( filename, logID );
|
||||
case KeyValueStoreType::MEMORY_RADIXTREE:
|
||||
return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr"
|
||||
case KeyValueStoreType::SSD_ROCKSDB_V1:
|
||||
return keyValueStoreRocksDB(filename, logID, storeType);
|
||||
case KeyValueStoreType::MEMORY_RADIXTREE:
|
||||
return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr"
|
||||
default:
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,429 @@
|
|||
#ifdef SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
||||
#include <rocksdb/env.h>
|
||||
#include <rocksdb/db.h>
|
||||
#include "flow/flow.h"
|
||||
#include "fdbrpc/AsyncFileCached.actor.h"
|
||||
#include "fdbserver/CoroFlow.h"
|
||||
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
#ifdef SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
||||
namespace {
|
||||
|
||||
class FlowLogger : public rocksdb::Logger, public FastAllocated<FlowLogger> {
|
||||
UID id;
|
||||
std::string loggerName;
|
||||
size_t logSize = 0;
|
||||
public:
|
||||
explicit FlowLogger(UID id, const std::string& loggerName, const rocksdb::InfoLogLevel log_level = rocksdb::InfoLogLevel::INFO_LEVEL)
|
||||
: rocksdb::Logger(log_level)
|
||||
, id(id)
|
||||
, loggerName(loggerName) {}
|
||||
|
||||
rocksdb::Status Close() override { return rocksdb::Status::OK(); }
|
||||
|
||||
void Logv(const char* fmtString, va_list ap) override {
|
||||
Logv(rocksdb::InfoLogLevel::INFO_LEVEL, fmtString, ap);
|
||||
}
|
||||
|
||||
void Logv(const rocksdb::InfoLogLevel log_level, const char* fmtString, va_list ap) override {
|
||||
Severity sev;
|
||||
switch (log_level) {
|
||||
case rocksdb::InfoLogLevel::DEBUG_LEVEL:
|
||||
sev = SevDebug;
|
||||
break;
|
||||
case rocksdb::InfoLogLevel::INFO_LEVEL:
|
||||
case rocksdb::InfoLogLevel::HEADER_LEVEL:
|
||||
case rocksdb::InfoLogLevel::NUM_INFO_LOG_LEVELS:
|
||||
sev = SevInfo;
|
||||
break;
|
||||
case rocksdb::InfoLogLevel::WARN_LEVEL:
|
||||
sev = SevWarn;
|
||||
break;
|
||||
case rocksdb::InfoLogLevel::ERROR_LEVEL:
|
||||
sev = SevWarnAlways;
|
||||
break;
|
||||
case rocksdb::InfoLogLevel::FATAL_LEVEL:
|
||||
sev = SevError;
|
||||
break;
|
||||
}
|
||||
std::string outStr;
|
||||
auto sz = vsformat(outStr, fmtString, ap);
|
||||
if (sz < 0) {
|
||||
TraceEvent(SevError, "RocksDBLogFormatError", id)
|
||||
.detail("Logger", loggerName)
|
||||
.detail("FormatString", fmtString);
|
||||
return;
|
||||
}
|
||||
logSize += sz;
|
||||
TraceEvent(sev, "RocksDBLogMessage", id)
|
||||
.detail("Msg", outStr);
|
||||
}
|
||||
|
||||
size_t GetLogFileSize() const override {
|
||||
return logSize;
|
||||
}
|
||||
};
|
||||
|
||||
rocksdb::Slice toSlice(StringRef s) {
|
||||
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
|
||||
}
|
||||
|
||||
StringRef toStringRef(rocksdb::Slice s) {
|
||||
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
|
||||
}
|
||||
|
||||
rocksdb::Options getOptions(const std::string& path) {
|
||||
rocksdb::Options options;
|
||||
bool exists = directoryExists(path);
|
||||
options.create_if_missing = !exists;
|
||||
return options;
|
||||
}
|
||||
|
||||
rocksdb::ColumnFamilyOptions getCFOptions() {
|
||||
return {};
|
||||
}
|
||||
|
||||
struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
using DB = rocksdb::DB*;
|
||||
using CF = rocksdb::ColumnFamilyHandle*;
|
||||
|
||||
struct Writer : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
UID id;
|
||||
|
||||
explicit Writer(DB& db, UID id) : db(db), id(id) {}
|
||||
|
||||
~Writer() {
|
||||
if (db) {
|
||||
delete db;
|
||||
}
|
||||
}
|
||||
|
||||
void init() override {}
|
||||
|
||||
Error statusToError(const rocksdb::Status& s) {
|
||||
if (s == rocksdb::Status::IOError()) {
|
||||
return io_error();
|
||||
} else {
|
||||
return unknown_error();
|
||||
}
|
||||
}
|
||||
|
||||
struct OpenAction : TypedAction<Writer, OpenAction> {
|
||||
std::string path;
|
||||
ThreadReturnPromise<Void> done;
|
||||
|
||||
double getTimeEstimate() {
|
||||
return SERVER_KNOBS->COMMIT_TIME_ESTIMATE;
|
||||
}
|
||||
};
|
||||
void action(OpenAction& a) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
auto status = rocksdb::DB::Open(getOptions(a.path), a.path, defaultCF, &handle, &db);
|
||||
if (!status.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
} else {
|
||||
a.done.send(Void());
|
||||
}
|
||||
}
|
||||
|
||||
struct CommitAction : TypedAction<Writer, CommitAction> {
|
||||
std::unique_ptr<rocksdb::WriteBatch> batchToCommit;
|
||||
ThreadReturnPromise<Void> done;
|
||||
double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(CommitAction& a) {
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = true;
|
||||
auto s = db->Write(options, a.batchToCommit.get());
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit");
|
||||
a.done.sendError(statusToError(s));
|
||||
} else {
|
||||
a.done.send(Void());
|
||||
}
|
||||
}
|
||||
|
||||
struct CloseAction : TypedAction<Writer, CloseAction> {
|
||||
ThreadReturnPromise<Void> done;
|
||||
double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(CloseAction& a) {
|
||||
auto s = db->Close();
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close");
|
||||
a.done.send(Void());
|
||||
}
|
||||
};
|
||||
|
||||
struct Reader : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
rocksdb::ReadOptions readOptions;
|
||||
std::unique_ptr<rocksdb::Iterator> cursor = nullptr;
|
||||
|
||||
explicit Reader(DB& db)
|
||||
: db(db)
|
||||
{
|
||||
readOptions.total_order_seek = true;
|
||||
}
|
||||
|
||||
void init() override {}
|
||||
|
||||
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
|
||||
Key key;
|
||||
Optional<UID> debugID;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValueAction(KeyRef key, Optional<UID> debugID)
|
||||
: key(key), debugID(debugID)
|
||||
{}
|
||||
double getTimeEstimate() override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValueAction& a) {
|
||||
Optional<TraceBatch> traceBatch;
|
||||
if (a.debugID.present()) {
|
||||
traceBatch = { TraceBatch{} };
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
|
||||
}
|
||||
rocksdb::PinnableSlice value;
|
||||
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
|
||||
traceBatch.get().dump();
|
||||
}
|
||||
if (s.ok()) {
|
||||
a.result.send(Value(toStringRef(value)));
|
||||
} else {
|
||||
if (!s.IsNotFound()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue");
|
||||
}
|
||||
a.result.send(Optional<Value>());
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadValuePrefixAction : TypedAction<Reader, ReadValuePrefixAction> {
|
||||
Key key;
|
||||
int maxLength;
|
||||
Optional<UID> debugID;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValuePrefixAction(Key key, int maxLength, Optional<UID> debugID) : key(key), maxLength(maxLength), debugID(debugID) {};
|
||||
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValuePrefixAction& a) {
|
||||
rocksdb::PinnableSlice value;
|
||||
Optional<TraceBatch> traceBatch;
|
||||
if (a.debugID.present()) {
|
||||
traceBatch = { TraceBatch{} };
|
||||
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
|
||||
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
}
|
||||
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
|
||||
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
traceBatch.get().dump();
|
||||
}
|
||||
if (s.ok()) {
|
||||
a.result.send(Value(StringRef(reinterpret_cast<const uint8_t*>(value.data()),
|
||||
std::min(value.size(), size_t(a.maxLength)))));
|
||||
} else {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix");
|
||||
a.result.send(Optional<Value>());
|
||||
}
|
||||
}
|
||||
|
||||
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
|
||||
KeyRange keys;
|
||||
int rowLimit, byteLimit;
|
||||
ThreadReturnPromise<Standalone<RangeResultRef>> result;
|
||||
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {}
|
||||
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadRangeAction& a) {
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
||||
Standalone<RangeResultRef> result;
|
||||
int accumulatedBytes = 0;
|
||||
if (a.rowLimit >= 0) {
|
||||
cursor->Seek(toSlice(a.keys.begin));
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
|
||||
accumulatedBytes < a.byteLimit) {
|
||||
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
|
||||
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
cursor->Next();
|
||||
}
|
||||
} else {
|
||||
cursor->Seek(toSlice(a.keys.end));
|
||||
if (!cursor->Valid()) {
|
||||
cursor->SeekToLast();
|
||||
} else {
|
||||
cursor->Prev();
|
||||
}
|
||||
|
||||
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit &&
|
||||
accumulatedBytes < a.byteLimit) {
|
||||
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
|
||||
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
|
||||
result.push_back_deep(result.arena(), kv);
|
||||
cursor->Prev();
|
||||
}
|
||||
}
|
||||
auto s = cursor->status();
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
|
||||
}
|
||||
result.more = (result.size() == a.rowLimit);
|
||||
if (result.more) {
|
||||
result.readThrough = result[result.size()-1].key;
|
||||
}
|
||||
a.result.send(result);
|
||||
}
|
||||
};
|
||||
|
||||
DB db = nullptr;
|
||||
std::string path;
|
||||
UID id;
|
||||
size_t diskBytesUsed = 0;
|
||||
Reference<IThreadPool> writeThread;
|
||||
Reference<IThreadPool> readThreads;
|
||||
unsigned nReaders = 2;
|
||||
Promise<Void> errorPromise;
|
||||
Promise<Void> closePromise;
|
||||
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
|
||||
|
||||
explicit RocksDBKeyValueStore(const std::string& path, UID id)
|
||||
: path(path)
|
||||
, id(id)
|
||||
{
|
||||
writeThread = createGenericThreadPool();
|
||||
readThreads = createGenericThreadPool();
|
||||
writeThread->addThread(new Writer(db, id));
|
||||
for (unsigned i = 0; i < nReaders; ++i) {
|
||||
readThreads->addThread(new Reader(db));
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> getError() override {
|
||||
return errorPromise.getFuture();
|
||||
}
|
||||
|
||||
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
|
||||
wait(self->readThreads->stop());
|
||||
auto a = new Writer::CloseAction{};
|
||||
auto f = a->done.getFuture();
|
||||
self->writeThread->post(a);
|
||||
wait(f);
|
||||
wait(self->writeThread->stop());
|
||||
// TODO: delete data on close
|
||||
if (self->closePromise.canBeSet()) self->closePromise.send(Void());
|
||||
if (self->errorPromise.canBeSet()) self->errorPromise.send(Never());
|
||||
if (deleteOnClose) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
rocksdb::DestroyDB(self->path, getOptions(self->path), defaultCF);
|
||||
}
|
||||
delete self;
|
||||
}
|
||||
|
||||
Future<Void> onClosed() override {
|
||||
return closePromise.getFuture();
|
||||
}
|
||||
|
||||
void dispose() override {
|
||||
doClose(this, true);
|
||||
}
|
||||
|
||||
void close() override {
|
||||
doClose(this, false);
|
||||
}
|
||||
|
||||
KeyValueStoreType getType() override {
|
||||
return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||
}
|
||||
|
||||
Future<Void> init() override {
|
||||
std::unique_ptr<Writer::OpenAction> a(new Writer::OpenAction());
|
||||
a->path = path;
|
||||
auto res = a->done.getFuture();
|
||||
writeThread->post(a.release());
|
||||
return res;
|
||||
}
|
||||
|
||||
void set(KeyValueRef kv, const Arena*) override {
|
||||
if (writeBatch == nullptr) {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
}
|
||||
writeBatch->Put(toSlice(kv.key), toSlice(kv.value));
|
||||
}
|
||||
|
||||
void clear(KeyRangeRef keyRange, const Arena*) override {
|
||||
if (writeBatch == nullptr) {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
}
|
||||
|
||||
writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
}
|
||||
|
||||
Future<Void> commit(bool) override {
|
||||
// If there is nothing to write, don't write.
|
||||
if (writeBatch == nullptr) {
|
||||
return Void();
|
||||
}
|
||||
auto a = new Writer::CommitAction();
|
||||
a->batchToCommit = std::move(writeBatch);
|
||||
auto res = a->done.getFuture();
|
||||
writeThread->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override {
|
||||
auto a = new Reader::ReadValueAction(key, debugID);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override {
|
||||
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override {
|
||||
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
StorageBytes getStorageBytes() override {
|
||||
int64_t free;
|
||||
int64_t total;
|
||||
|
||||
g_network->getDiskBytes(path, free, total);
|
||||
|
||||
return StorageBytes(free, total, diskBytesUsed, free);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
||||
IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity) {
|
||||
#ifdef SSD_ROCKSDB_EXPERIMENTAL
|
||||
return new RocksDBKeyValueStore(path, logID);
|
||||
#else
|
||||
TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB");
|
||||
ASSERT(false);
|
||||
return nullptr;
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
}
|
|
@ -254,6 +254,7 @@ std::pair<KeyValueStoreType, std::string> bTreeV2Suffix = std::make_pair(KeyValu
|
|||
std::pair<KeyValueStoreType, std::string> memorySuffix = std::make_pair( KeyValueStoreType::MEMORY, "-0.fdq" );
|
||||
std::pair<KeyValueStoreType, std::string> memoryRTSuffix = std::make_pair( KeyValueStoreType::MEMORY_RADIXTREE, "-0.fdr" );
|
||||
std::pair<KeyValueStoreType, std::string> redwoodSuffix = std::make_pair( KeyValueStoreType::SSD_REDWOOD_V1, ".redwood" );
|
||||
std::pair<KeyValueStoreType, std::string> rocksdbSuffix = std::make_pair( KeyValueStoreType::SSD_ROCKSDB_V1, ".rocksdb" );
|
||||
|
||||
std::string validationFilename = "_validate";
|
||||
|
||||
|
@ -266,6 +267,8 @@ std::string filenameFromSample( KeyValueStoreType storeType, std::string folder,
|
|||
return joinPath( folder, sample_filename.substr(0, sample_filename.size() - 5) );
|
||||
else if ( storeType == KeyValueStoreType::SSD_REDWOOD_V1 )
|
||||
return joinPath(folder, sample_filename);
|
||||
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
|
||||
return joinPath(folder, sample_filename);
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
||||
|
@ -278,6 +281,8 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std
|
|||
return joinPath( folder, prefix + id.toString() + "-" );
|
||||
else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1)
|
||||
return joinPath(folder, prefix + id.toString() + ".redwood");
|
||||
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
|
||||
return joinPath(folder, prefix + id.toString() + ".rocksdb");
|
||||
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
@ -423,8 +428,10 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
|
|||
result.insert( result.end(), result2.begin(), result2.end() );
|
||||
auto result3 = getDiskStores( folder, redwoodSuffix.second, redwoodSuffix.first);
|
||||
result.insert( result.end(), result3.begin(), result3.end() );
|
||||
auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first );
|
||||
result.insert( result.end(), result4.begin(), result4.end() );
|
||||
auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first );
|
||||
result.insert( result.end(), result4.begin(), result4.end() );
|
||||
auto result5 = getDiskStores( folder, rocksdbSuffix.second, rocksdbSuffix.first);
|
||||
result.insert( result.end(), result3.begin(), result3.end() );
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1078,7 +1085,7 @@ ACTOR Future<Void> workerServer(
|
|||
notUpdated = interf.updateServerDBInfo.getEndpoint();
|
||||
}
|
||||
else if(localInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get()) {
|
||||
|
||||
|
||||
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
|
||||
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
|
||||
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
|
||||
|
@ -1347,7 +1354,7 @@ ACTOR Future<Void> workerServer(
|
|||
DUMPTOKEN( recruited.getQueuingMetrics );
|
||||
DUMPTOKEN( recruited.confirmRunning );
|
||||
|
||||
errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(),
|
||||
errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(),
|
||||
logRouter( recruited, req, dbInfo ) ) ) );
|
||||
req.reply.send(recruited);
|
||||
}
|
||||
|
@ -1386,6 +1393,9 @@ ACTOR Future<Void> workerServer(
|
|||
}
|
||||
else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) {
|
||||
included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog");
|
||||
}
|
||||
else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
|
||||
included = fileExists(joinPath(d.filename, "CURRENT")) && fileExists(joinPath(d.filename, "IDENTITY"));
|
||||
} else if (d.storeType == KeyValueStoreType::MEMORY) {
|
||||
included = fileExists(d.filename + "1.fdq");
|
||||
} else {
|
||||
|
|
|
@ -373,6 +373,8 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
|
|||
test.store = keyValueStoreSQLite(fn, id, KeyValueStoreType::SSD_REDWOOD_V1);
|
||||
else if (workload->storeType == "ssd-redwood-experimental")
|
||||
test.store = keyValueStoreRedwoodV1(fn, id);
|
||||
else if (workload->storeType == "ssd-rocksdb-experimental")
|
||||
test.store = keyValueStoreRocksDB(fn, id, KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||
else if (workload->storeType == "memory")
|
||||
test.store = keyValueStoreMemory(fn, id, 500e6);
|
||||
else if (workload->storeType == "memory-radixtree-beta")
|
||||
|
@ -398,4 +400,4 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
|
|||
wait(c);
|
||||
if (err.code() != invalid_error_code) throw err;
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,6 +75,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES RedwoodPerfSet.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES RedwoodPerfPrefixCompression.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES RedwoodPerfSequentialInsert.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES RocksDBTest.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE)
|
||||
if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved
|
||||
add_fdb_test(TEST_FILES SimpleExternalTest.txt)
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
testTitle=Insert
|
||||
testName=KVStoreTest
|
||||
testDuration=0.0
|
||||
operationsPerSecond=28000
|
||||
commitFraction=0.001
|
||||
setFraction=0.01
|
||||
nodeCount=20000000
|
||||
keyBytes=16
|
||||
valueBytes=96
|
||||
filename=bttest
|
||||
storeType=ssd-rocksdb-experimental
|
||||
setup=true
|
||||
clear=false
|
||||
count=false
|
||||
useDB=false
|
||||
|
||||
testTitle=RandomWriteSaturation
|
||||
testName=KVStoreTest
|
||||
testDuration=20.0
|
||||
saturation=true
|
||||
operationsPerSecond=10000
|
||||
commitFraction=0.00005
|
||||
setFraction=1.0
|
||||
nodeCount=20000000
|
||||
keyBytes=16
|
||||
valueBytes=96
|
||||
filename=bttest
|
||||
storeType=ssd-rocksdb-experimental
|
||||
setup=false
|
||||
clear=false
|
||||
count=false
|
||||
useDB=false
|
||||
|
||||
testTitle=Scan
|
||||
testName=KVStoreTest
|
||||
testDuration=20.0
|
||||
operationsPerSecond=28000
|
||||
commitFraction=0.0001
|
||||
setFraction=0.01
|
||||
nodeCount=20000000
|
||||
keyBytes=16
|
||||
valueBytes=96
|
||||
filename=bttest
|
||||
storeType=ssd-rocksdb-experimental
|
||||
setup=false
|
||||
clear=false
|
||||
count=true
|
||||
useDB=false
|
Loading…
Reference in New Issue