Add RocksDB work

This commit is contained in:
Daniel Smith 2020-08-14 17:16:20 +00:00
parent ce05bb2a26
commit 94571786bf
3 changed files with 54 additions and 19 deletions

View File

@ -2,6 +2,7 @@
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include "flow/flow.h"
#include "flow/IThreadPool.h"
@ -22,14 +23,23 @@ StringRef toStringRef(rocksdb::Slice s) {
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
}
rocksdb::Options getOptions() {
rocksdb::Options options;
options.create_if_missing = true;
rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::ColumnFamilyOptions options;
options.level_compaction_dynamic_level_bytes = true;
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
// Compact sstables when there's too much deleted stuff.
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
return options;
}
rocksdb::ColumnFamilyOptions getCFOptions() {
return {};
rocksdb::Options getOptions() {
rocksdb::Options options({}, getCFOptions());
options.avoid_unnecessary_blocking_io = true;
options.create_if_missing = true;
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
}
return options;
}
struct RocksDBKeyValueStore : IKeyValueStore {
@ -119,7 +129,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
struct Reader : IThreadPoolReceiver {
DB& db;
rocksdb::ReadOptions readOptions;
explicit Reader(DB& db) : db(db) {}
@ -141,7 +150,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
rocksdb::PinnableSlice value;
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
traceBatch.get().dump();
@ -172,7 +181,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
auto s = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(a.key), &value);
auto s = db->Get({}, db->DefaultColumnFamily(), toSlice(a.key), &value);
if (a.debugID.present()) {
traceBatch.get().addEvent("GetValuePrefixDebug", a.debugID.get().first(),
"Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
@ -195,33 +204,51 @@ struct RocksDBKeyValueStore : IKeyValueStore {
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
void action(ReadRangeAction& a) {
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
Standalone<RangeResultRef> result;
if (a.rowLimit == 0 || a.byteLimit == 0) {
a.result.send(result);
}
int accumulatedBytes = 0;
rocksdb::Status s;
if (a.rowLimit >= 0) {
rocksdb::ReadOptions options;
auto endSlice = toSlice(a.keys.end);
options.iterate_upper_bound = &endSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->Seek(toSlice(a.keys.begin));
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end && result.size() < a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) < a.keys.end) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Next()` is potentially expensive, so short-circut here just in case.
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Next();
}
s = cursor->status();
} else {
rocksdb::ReadOptions options;
auto beginSlice = toSlice(a.keys.begin);
options.iterate_lower_bound = &beginSlice;
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(options));
cursor->SeekForPrev(toSlice(a.keys.end));
if (cursor->Valid() && toStringRef(cursor->key()) == a.keys.end) {
cursor->Prev();
}
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin && result.size() < -a.rowLimit &&
accumulatedBytes < a.byteLimit) {
while (cursor->Valid() && toStringRef(cursor->key()) >= a.keys.begin) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv);
// Calling `cursor->Prev()` is potentially expensive, so short-circut here just in case.
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
break;
}
cursor->Prev();
}
s = cursor->status();
}
auto s = cursor->status();
if (!s.ok()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
}

View File

@ -93,7 +93,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0;
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 0 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
@ -236,7 +236,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true;
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
init( DD_SS_STUCK_TIME_LIMIT, 300.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 200.0 + deterministicRandom()->random01() * 100.0; }
@ -308,6 +308,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// KeyValueStoreMemory
init( REPLACE_CONTENTS_BYTES, 1e5 );
// KeyValueStoreRocksDB
init( ROCKSDB_BACKGROUND_PARALLELISM, 0 );
init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
init( MAX_NOTIFICATIONS, 100000 );
@ -573,7 +577,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 );
init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 );
init( MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 10.0 );
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
init( MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS, 30.0 );
init( DBINFO_FAILED_DELAY, 1.0 );
// Test harness
@ -650,7 +654,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 );
init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 );
init( REDWOOD_LOGGING_INTERVAL, 5.0 );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );

View File

@ -243,6 +243,10 @@ public:
// KeyValueStoreMemory
int64_t REPLACE_CONTENTS_BYTES;
// KeyValueStoreRocksDB
int ROCKSDB_BACKGROUND_PARALLELISM;
int64_t ROCKSDB_MEMTABLE_BYTES;
// Leader election
int MAX_NOTIFICATIONS;
int MIN_NOTIFICATIONS;