From acbfe2e4c9b342cfaa39636808c0ea44c4009daa Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 15 Jun 2020 12:45:36 -0400 Subject: [PATCH] Revert "Revert "Initial RocksDB"" --- .../source/mr-status-json-schemas.rst.inc | 1 + fdbclient/DatabaseConfiguration.cpp | 4 +- fdbclient/FDBTypes.h | 2 + fdbclient/ManagementAPI.actor.cpp | 3 + fdbclient/Schemas.cpp | 3 +- fdbserver/CMakeLists.txt | 1 + fdbserver/IKeyValueStore.h | 7 +- fdbserver/KeyValueStoreRocksDB.actor.cpp | 429 ++++++++++++++++++ fdbserver/worker.actor.cpp | 18 +- fdbserver/workloads/KVStoreTest.actor.cpp | 4 +- tests/CMakeLists.txt | 1 + tests/RocksDBTest.txt | 48 ++ 12 files changed, 512 insertions(+), 9 deletions(-) create mode 100644 fdbserver/KeyValueStoreRocksDB.actor.cpp create mode 100644 tests/RocksDBTest.txt diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 31ccb629fc..874a9b8f25 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -575,6 +575,7 @@ "ssd-1", "ssd-2", "ssd-redwood-experimental", + "ssd-rocksdb-experimental", "memory" ]}, "coordinators_count":1, diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index d7b8468f25..3edd327b0a 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -268,6 +268,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const { result["storage_engine"] = "ssd-2"; } else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_REDWOOD_V1 ) { result["storage_engine"] = "ssd-redwood-experimental"; + } else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_ROCKSDB_V1) { + result["storage_engine"] = "ssd-rocksdb-experimental"; } else if( tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY ) { result["storage_engine"] = "memory-1"; } else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE ) { @@ -498,7 +500,7 @@ bool DatabaseConfiguration::isExcludedServer( NetworkAddressList a ) const { return get( encodeExcludedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() || get( encodeExcludedServersKey( AddressExclusion(a.address.ip) ) ).present() || get( encodeFailedServersKey( AddressExclusion(a.address.ip, a.address.port) ) ).present() || - get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() || + get( encodeFailedServersKey( AddressExclusion(a.address.ip) ) ).present() || ( a.secondaryAddress.present() && ( get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port) ) ).present() || get( encodeExcludedServersKey( AddressExclusion(a.secondaryAddress.get().ip) ) ).present() || diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index c6aff2a804..117414923c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -670,6 +670,7 @@ struct KeyValueStoreType { SSD_BTREE_V2, SSD_REDWOOD_V1, MEMORY_RADIXTREE, + SSD_ROCKSDB_V1, END }; @@ -689,6 +690,7 @@ struct KeyValueStoreType { case SSD_BTREE_V1: return "ssd-1"; case SSD_BTREE_V2: return "ssd-2"; case SSD_REDWOOD_V1: return "ssd-redwood-experimental"; + case SSD_ROCKSDB_V1: return "ssd-rocksdb-experimental"; case MEMORY: return "memory"; case MEMORY_RADIXTREE: return "memory-radixtree-beta"; default: return "unknown"; diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index e10edf1121..e683a21d56 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -101,6 +101,9 @@ std::map configForToken( std::string const& mode ) { } else if (mode == "ssd-redwood-experimental") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType = KeyValueStoreType::SSD_REDWOOD_V1; + } else if (mode == "ssd-rocksdb-experimental") { + logType = KeyValueStoreType::SSD_BTREE_V2; + storeType = KeyValueStoreType::SSD_ROCKSDB_V1; } else if (mode == "memory" || mode == "memory-2") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType= KeyValueStoreType::MEMORY; diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 6111ed1114..ee372d3bb0 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -150,7 +150,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "fractional_cost": 0.0, "estimated_cost":{ "hz": 0.0 - } + } } } ], @@ -612,6 +612,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "ssd-1", "ssd-2", "ssd-redwood-experimental", + "ssd-rocksdb-experimental", "memory", "memory-1", "memory-2", diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 5d13a1e7f0..97efb6bac3 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -29,6 +29,7 @@ set(FDBSERVER_SRCS IVersionedStore.h KeyValueStoreCompressTestData.actor.cpp KeyValueStoreMemory.actor.cpp + KeyValueStoreRocksDB.actor.cpp KeyValueStoreSQLite.actor.cpp Knobs.cpp Knobs.h diff --git a/fdbserver/IKeyValueStore.h b/fdbserver/IKeyValueStore.h index d82b5e8cf9..3c82a0b7a8 100644 --- a/fdbserver/IKeyValueStore.h +++ b/fdbserver/IKeyValueStore.h @@ -87,6 +87,7 @@ protected: extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false ); extern IKeyValueStore* keyValueStoreRedwoodV1( std::string const& filename, UID logID); +extern IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false); extern IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit, std::string ext = "fdq", KeyValueStoreType storeType = KeyValueStoreType::MEMORY); @@ -102,8 +103,10 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con return keyValueStoreMemory( filename, logID, memoryLimit ); case KeyValueStoreType::SSD_REDWOOD_V1: return keyValueStoreRedwoodV1( filename, logID ); - case KeyValueStoreType::MEMORY_RADIXTREE: - return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr" + case KeyValueStoreType::SSD_ROCKSDB_V1: + return keyValueStoreRocksDB(filename, logID, storeType); + case KeyValueStoreType::MEMORY_RADIXTREE: + return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr" default: UNREACHABLE(); } diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp new file mode 100644 index 0000000000..0daafa8628 --- /dev/null +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -0,0 +1,429 @@ +#ifdef SSD_ROCKSDB_EXPERIMENTAL + +#include +#include +#include "flow/flow.h" +#include "fdbrpc/AsyncFileCached.actor.h" +#include "fdbserver/CoroFlow.h" + +#endif // SSD_ROCKSDB_EXPERIMENTAL + +#include "fdbserver/IKeyValueStore.h" +#include "flow/actorcompiler.h" // has to be last include + +#ifdef SSD_ROCKSDB_EXPERIMENTAL + +namespace { + +class FlowLogger : public rocksdb::Logger, public FastAllocated { + UID id; + std::string loggerName; + size_t logSize = 0; +public: + explicit FlowLogger(UID id, const std::string& loggerName, const rocksdb::InfoLogLevel log_level = rocksdb::InfoLogLevel::INFO_LEVEL) + : rocksdb::Logger(log_level) + , id(id) + , loggerName(loggerName) {} + + rocksdb::Status Close() override { return rocksdb::Status::OK(); } + + void Logv(const char* fmtString, va_list ap) override { + Logv(rocksdb::InfoLogLevel::INFO_LEVEL, fmtString, ap); + } + + void Logv(const rocksdb::InfoLogLevel log_level, const char* fmtString, va_list ap) override { + Severity sev; + switch (log_level) { + case rocksdb::InfoLogLevel::DEBUG_LEVEL: + sev = SevDebug; + break; + case rocksdb::InfoLogLevel::INFO_LEVEL: + case rocksdb::InfoLogLevel::HEADER_LEVEL: + case rocksdb::InfoLogLevel::NUM_INFO_LOG_LEVELS: + sev = SevInfo; + break; + case rocksdb::InfoLogLevel::WARN_LEVEL: + sev = SevWarn; + break; + case rocksdb::InfoLogLevel::ERROR_LEVEL: + sev = SevWarnAlways; + break; + case rocksdb::InfoLogLevel::FATAL_LEVEL: + sev = SevError; + break; + } + std::string outStr; + auto sz = vsformat(outStr, fmtString, ap); + if (sz < 0) { + TraceEvent(SevError, "RocksDBLogFormatError", id) + .detail("Logger", loggerName) + .detail("FormatString", fmtString); + return; + } + logSize += sz; + TraceEvent(sev, "RocksDBLogMessage", id) + .detail("Msg", outStr); + } + + size_t GetLogFileSize() const override { + return logSize; + } +}; + +rocksdb::Slice toSlice(StringRef s) { + return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); +} + +StringRef toStringRef(rocksdb::Slice s) { + return StringRef(reinterpret_cast(s.data()), s.size()); +} + +rocksdb::Options getOptions(const std::string& path) { + rocksdb::Options options; + bool exists = directoryExists(path); + options.create_if_missing = !exists; + return options; +} + +rocksdb::ColumnFamilyOptions getCFOptions() { + return {}; +} + +struct RocksDBKeyValueStore : IKeyValueStore { + using DB = rocksdb::DB*; + using CF = rocksdb::ColumnFamilyHandle*; + + struct Writer : IThreadPoolReceiver { + DB& db; + UID id; + + explicit Writer(DB& db, UID id) : db(db), id(id) {} + + ~Writer() { + if (db) { + delete db; + } + } + + void init() override {} + + Error statusToError(const rocksdb::Status& s) { + if (s == rocksdb::Status::IOError()) { + return io_error(); + } else { + return unknown_error(); + } + } + + struct OpenAction : TypedAction { + std::string path; + ThreadReturnPromise done; + + double getTimeEstimate() { + return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; + } + }; + void action(OpenAction& a) { + std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ + "default", getCFOptions() } }; + std::vector handle; + auto status = rocksdb::DB::Open(getOptions(a.path), a.path, defaultCF, &handle, &db); + if (!status.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open"); + a.done.sendError(statusToError(status)); + } else { + a.done.send(Void()); + } + } + + struct CommitAction : TypedAction { + std::unique_ptr batchToCommit; + ThreadReturnPromise done; + double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + }; + void action(CommitAction& a) { + rocksdb::WriteOptions options; + options.sync = true; + auto s = db->Write(options, a.batchToCommit.get()); + if (!s.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit"); + a.done.sendError(statusToError(s)); + } else { + a.done.send(Void()); + } + } + + struct CloseAction : TypedAction { + ThreadReturnPromise done; + double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + }; + void action(CloseAction& a) { + auto s = db->Close(); + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close"); + a.done.send(Void()); + } + }; + + struct Reader : IThreadPoolReceiver { + DB& db; + rocksdb::ReadOptions readOptions; + std::unique_ptr cursor = nullptr; + + explicit Reader(DB& db) + : db(db) + { + readOptions.total_order_seek = true; + } + + void init() override {} + + struct ReadValueAction : TypedAction { + Key key; + Optional debugID; + ThreadReturnPromise> result; + ReadValueAction(KeyRef key, Optional debugID) + : key(key), debugID(debugID) + {} + double getTimeEstimate() override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } + }; + void action(ReadValueAction& a) { + Optional traceBatch; + if (a.debugID.present()) { + traceBatch = { TraceBatch{} }; + traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before"); + } + rocksdb::PinnableSlice value; + auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + if (a.debugID.present()) { + traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After"); + traceBatch.get().dump(); + } + if (s.ok()) { + a.result.send(Value(toStringRef(value))); + } else { + if (!s.IsNotFound()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValue"); + } + a.result.send(Optional()); + } + } + + struct ReadValuePrefixAction : TypedAction { + Key key; + int maxLength; + Optional debugID; + ThreadReturnPromise> result; + ReadValuePrefixAction(Key key, int maxLength, Optional debugID) : key(key), maxLength(maxLength), debugID(debugID) {}; + virtual double getTimeEstimate() { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } + }; + void action(ReadValuePrefixAction& a) { + rocksdb::PinnableSlice value; + Optional traceBatch; + if (a.debugID.present()) { + traceBatch = { TraceBatch{} }; + traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), + "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); + } + auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value); + if (a.debugID.present()) { + traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(), + "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); + traceBatch.get().dump(); + } + if (s.ok()) { + a.result.send(Value(StringRef(reinterpret_cast(value.data()), + std::min(value.size(), size_t(a.maxLength))))); + } else { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix"); + a.result.send(Optional()); + } + } + + struct ReadRangeAction : TypedAction, FastAllocated { + KeyRange keys; + int rowLimit, byteLimit; + ThreadReturnPromise> result; + ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {} + virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } + }; + void action(ReadRangeAction& a) { + auto cursor = std::unique_ptr(db->NewIterator(readOptions)); + Standalone result; + int accumulatedBytes = 0; + if (a.rowLimit >= 0) { + cursor->Seek(toSlice(a.keys.begin)); + while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit && + accumulatedBytes < a.byteLimit) { + KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); + accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); + result.push_back_deep(result.arena(), kv); + cursor->Next(); + } + } else { + cursor->Seek(toSlice(a.keys.end)); + if (!cursor->Valid()) { + cursor->SeekToLast(); + } else { + cursor->Prev(); + } + + while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit && + accumulatedBytes < a.byteLimit) { + KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); + accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); + result.push_back_deep(result.arena(), kv); + cursor->Prev(); + } + } + auto s = cursor->status(); + if (!s.ok()) { + TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange"); + } + result.more = (result.size() == a.rowLimit); + if (result.more) { + result.readThrough = result[result.size()-1].key; + } + a.result.send(result); + } + }; + + DB db = nullptr; + std::string path; + UID id; + size_t diskBytesUsed = 0; + Reference writeThread; + Reference readThreads; + unsigned nReaders = 2; + Promise errorPromise; + Promise closePromise; + std::unique_ptr writeBatch; + + explicit RocksDBKeyValueStore(const std::string& path, UID id) + : path(path) + , id(id) + { + writeThread = createGenericThreadPool(); + readThreads = createGenericThreadPool(); + writeThread->addThread(new Writer(db, id)); + for (unsigned i = 0; i < nReaders; ++i) { + readThreads->addThread(new Reader(db)); + } + } + + Future getError() override { + return errorPromise.getFuture(); + } + + ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) { + wait(self->readThreads->stop()); + auto a = new Writer::CloseAction{}; + auto f = a->done.getFuture(); + self->writeThread->post(a); + wait(f); + wait(self->writeThread->stop()); + // TODO: delete data on close + if (self->closePromise.canBeSet()) self->closePromise.send(Void()); + if (self->errorPromise.canBeSet()) self->errorPromise.send(Never()); + if (deleteOnClose) { + std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ + "default", getCFOptions() } }; + rocksdb::DestroyDB(self->path, getOptions(self->path), defaultCF); + } + delete self; + } + + Future onClosed() override { + return closePromise.getFuture(); + } + + void dispose() override { + doClose(this, true); + } + + void close() override { + doClose(this, false); + } + + KeyValueStoreType getType() override { + return KeyValueStoreType(KeyValueStoreType::SSD_ROCKSDB_V1); + } + + Future init() override { + std::unique_ptr a(new Writer::OpenAction()); + a->path = path; + auto res = a->done.getFuture(); + writeThread->post(a.release()); + return res; + } + + void set(KeyValueRef kv, const Arena*) override { + if (writeBatch == nullptr) { + writeBatch.reset(new rocksdb::WriteBatch()); + } + writeBatch->Put(toSlice(kv.key), toSlice(kv.value)); + } + + void clear(KeyRangeRef keyRange, const Arena*) override { + if (writeBatch == nullptr) { + writeBatch.reset(new rocksdb::WriteBatch()); + } + + writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end)); + } + + Future commit(bool) override { + // If there is nothing to write, don't write. + if (writeBatch == nullptr) { + return Void(); + } + auto a = new Writer::CommitAction(); + a->batchToCommit = std::move(writeBatch); + auto res = a->done.getFuture(); + writeThread->post(a); + return res; + } + + Future> readValue(KeyRef key, Optional debugID) override { + auto a = new Reader::ReadValueAction(key, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + Future> readValuePrefix(KeyRef key, int maxLength, Optional debugID) override { + auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + Future> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override { + auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit); + auto res = a->result.getFuture(); + readThreads->post(a); + return res; + } + + StorageBytes getStorageBytes() override { + int64_t free; + int64_t total; + + g_network->getDiskBytes(path, free, total); + + return StorageBytes(free, total, diskBytesUsed, free); + } +}; + +} // namespace + +#endif // SSD_ROCKSDB_EXPERIMENTAL + +IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValueStoreType storeType, bool checkChecksums, bool checkIntegrity) { +#ifdef SSD_ROCKSDB_EXPERIMENTAL + return new RocksDBKeyValueStore(path, logID); +#else + TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB"); + ASSERT(false); + return nullptr; +#endif // SSD_ROCKSDB_EXPERIMENTAL +} diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a5fe4f36dd..64d281b9c8 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -254,6 +254,7 @@ std::pair bTreeV2Suffix = std::make_pair(KeyValu std::pair memorySuffix = std::make_pair( KeyValueStoreType::MEMORY, "-0.fdq" ); std::pair memoryRTSuffix = std::make_pair( KeyValueStoreType::MEMORY_RADIXTREE, "-0.fdr" ); std::pair redwoodSuffix = std::make_pair( KeyValueStoreType::SSD_REDWOOD_V1, ".redwood" ); +std::pair rocksdbSuffix = std::make_pair( KeyValueStoreType::SSD_ROCKSDB_V1, ".rocksdb" ); std::string validationFilename = "_validate"; @@ -266,6 +267,8 @@ std::string filenameFromSample( KeyValueStoreType storeType, std::string folder, return joinPath( folder, sample_filename.substr(0, sample_filename.size() - 5) ); else if ( storeType == KeyValueStoreType::SSD_REDWOOD_V1 ) return joinPath(folder, sample_filename); + else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1) + return joinPath(folder, sample_filename); UNREACHABLE(); } @@ -278,6 +281,8 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std return joinPath( folder, prefix + id.toString() + "-" ); else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1) return joinPath(folder, prefix + id.toString() + ".redwood"); + else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1) + return joinPath(folder, prefix + id.toString() + ".rocksdb"); UNREACHABLE(); } @@ -423,8 +428,10 @@ std::vector< DiskStore > getDiskStores( std::string folder ) { result.insert( result.end(), result2.begin(), result2.end() ); auto result3 = getDiskStores( folder, redwoodSuffix.second, redwoodSuffix.first); result.insert( result.end(), result3.begin(), result3.end() ); - auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first ); - result.insert( result.end(), result4.begin(), result4.end() ); + auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first ); + result.insert( result.end(), result4.begin(), result4.end() ); + auto result5 = getDiskStores( folder, rocksdbSuffix.second, rocksdbSuffix.first); + result.insert( result.end(), result3.begin(), result3.end() ); return result; } @@ -1078,7 +1085,7 @@ ACTOR Future workerServer( notUpdated = interf.updateServerDBInfo.getEndpoint(); } else if(localInfo.infoGeneration > dbInfo->get().infoGeneration || dbInfo->get().clusterInterface != ccInterface->get().get()) { - + TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id()) .detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID()) .detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()); @@ -1347,7 +1354,7 @@ ACTOR Future workerServer( DUMPTOKEN( recruited.getQueuingMetrics ); DUMPTOKEN( recruited.confirmRunning ); - errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(), + errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(), logRouter( recruited, req, dbInfo ) ) ) ); req.reply.send(recruited); } @@ -1386,6 +1393,9 @@ ACTOR Future workerServer( } else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) { included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog"); + } + else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) { + included = fileExists(joinPath(d.filename, "CURRENT")) && fileExists(joinPath(d.filename, "IDENTITY")); } else if (d.storeType == KeyValueStoreType::MEMORY) { included = fileExists(d.filename + "1.fdq"); } else { diff --git a/fdbserver/workloads/KVStoreTest.actor.cpp b/fdbserver/workloads/KVStoreTest.actor.cpp index 080f0237b2..f0007f033e 100755 --- a/fdbserver/workloads/KVStoreTest.actor.cpp +++ b/fdbserver/workloads/KVStoreTest.actor.cpp @@ -373,6 +373,8 @@ ACTOR Future testKVStore(KVStoreTestWorkload* workload) { test.store = keyValueStoreSQLite(fn, id, KeyValueStoreType::SSD_REDWOOD_V1); else if (workload->storeType == "ssd-redwood-experimental") test.store = keyValueStoreRedwoodV1(fn, id); + else if (workload->storeType == "ssd-rocksdb-experimental") + test.store = keyValueStoreRocksDB(fn, id, KeyValueStoreType::SSD_ROCKSDB_V1); else if (workload->storeType == "memory") test.store = keyValueStoreMemory(fn, id, 500e6); else if (workload->storeType == "memory-radixtree-beta") @@ -398,4 +400,4 @@ ACTOR Future testKVStore(KVStoreTestWorkload* workload) { wait(c); if (err.code() != invalid_error_code) throw err; return Void(); -} \ No newline at end of file +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f24e73b4c6..4b095cd78d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -75,6 +75,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES RedwoodPerfSet.txt IGNORE) add_fdb_test(TEST_FILES RedwoodPerfPrefixCompression.txt IGNORE) add_fdb_test(TEST_FILES RedwoodPerfSequentialInsert.txt IGNORE) + add_fdb_test(TEST_FILES RocksDBTest.txt IGNORE) add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE) if (NOT USE_UBSAN) # TODO re-enable in UBSAN after https://github.com/apple/foundationdb/issues/2410 is resolved add_fdb_test(TEST_FILES SimpleExternalTest.txt) diff --git a/tests/RocksDBTest.txt b/tests/RocksDBTest.txt new file mode 100644 index 0000000000..a1aeb2d32b --- /dev/null +++ b/tests/RocksDBTest.txt @@ -0,0 +1,48 @@ +testTitle=Insert +testName=KVStoreTest +testDuration=0.0 +operationsPerSecond=28000 +commitFraction=0.001 +setFraction=0.01 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=true +clear=false +count=false +useDB=false + +testTitle=RandomWriteSaturation +testName=KVStoreTest +testDuration=20.0 +saturation=true +operationsPerSecond=10000 +commitFraction=0.00005 +setFraction=1.0 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=false +clear=false +count=false +useDB=false + +testTitle=Scan +testName=KVStoreTest +testDuration=20.0 +operationsPerSecond=28000 +commitFraction=0.0001 +setFraction=0.01 +nodeCount=20000000 +keyBytes=16 +valueBytes=96 +filename=bttest +storeType=ssd-rocksdb-experimental +setup=false +clear=false +count=true +useDB=false