RocksDB fixes
This commit is contained in:
parent
440d46bccc
commit
fd973e5055
|
@ -22,10 +22,9 @@ StringRef toStringRef(rocksdb::Slice s) {
|
|||
return StringRef(reinterpret_cast<const uint8_t*>(s.data()), s.size());
|
||||
}
|
||||
|
||||
rocksdb::Options getOptions(const std::string& path) {
|
||||
rocksdb::Options getOptions() {
|
||||
rocksdb::Options options;
|
||||
bool exists = directoryExists(path);
|
||||
options.create_if_missing = !exists;
|
||||
options.create_if_missing = true;
|
||||
return options;
|
||||
}
|
||||
|
||||
|
@ -71,7 +70,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
auto status = rocksdb::DB::Open(getOptions(a.path), a.path, defaultCF, &handle, &db);
|
||||
auto status = rocksdb::DB::Open(getOptions(), a.path, defaultCF, &handle, &db);
|
||||
if (!status.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
|
@ -112,7 +111,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (a.deleteOnClose) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
rocksdb::DestroyDB(a.path, getOptions(a.path), defaultCF);
|
||||
rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
|
||||
}
|
||||
a.done.send(Void());
|
||||
}
|
||||
|
@ -121,7 +120,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
struct Reader : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
rocksdb::ReadOptions readOptions;
|
||||
std::unique_ptr<rocksdb::Iterator> cursor = nullptr;
|
||||
|
||||
explicit Reader(DB& db) : db(db) {}
|
||||
|
||||
|
@ -197,11 +195,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadRangeAction& a) {
|
||||
if (cursor == nullptr) {
|
||||
cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
||||
} else {
|
||||
cursor->Refresh();
|
||||
}
|
||||
auto cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions));
|
||||
Standalone<RangeResultRef> result;
|
||||
int accumulatedBytes = 0;
|
||||
if (a.rowLimit >= 0) {
|
||||
|
@ -231,7 +225,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadRange");
|
||||
}
|
||||
result.more = (result.size() == a.rowLimit);
|
||||
result.more =
|
||||
(result.size() == a.rowLimit) || (result.size() == -a.rowLimit) || (accumulatedBytes >= a.byteLimit);
|
||||
if (result.more) {
|
||||
result.readThrough = result[result.size()-1].key;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue