Fetch checkpoint as key-value pairs (#9003)

* Allow multiple keyranges in CheckpointRequest.
Include DataMove ID in CheckpointMetaData.

* Use UID dataMoveId instead of Optional<UID>.

* Implemented ShardedRocks::checkpoint().

* Implementing createCheckpoint().

* Attempted to change getCheckpointMetaData*() for a single keyrange.

* Added getCheckpointMetaDataForRange.

* Minor fixes for NativeAPI.actor.cpp.

* Replace UID CheckpointMetaData::ssId with std::vector<UID>
CheckpointMetaData::src;

* Implemented getCheckpointMetaData() and completed checkpoint creation
and fetch in test.

* Refactoring CheckpointRequest and CheckpointMetaData

rename `dataMoveId` as `actionId` and make it Optional.

* Fixed ctor of CheckpointMetaData.

* Implemented ShardedRocksDB::restore().

* Tested checkpoint restore, and added range check for restore, so that
the target ranges can be a subset of the checkpoint ranges.

* Added test to partially restore a checkpoint.

* Refactor: added checkpointRestore().

* Sort ranges for comparison.

* Cleanups.

* Check restore ranges are empty; Add ranges in main thread.

* Resolved comments.

* Fixed GetCheckpointMetaData range check issue.

* Refactor CheckpointReader for CF checkpoint.

* Added CheckpointAsKeyValues as a parameter for newCheckpointReader.

* PhysicalShard::restoreKvs().

* Added `ranges` in fetchCheckpoint.

* Added RocksDBCheckpointKeyValues::ranges.

* Added ICheckpointIterator and implemented for RocksDBCheckpointReader.

* Refactored OpenAction for CheckpointReader, handled failure cases.

* Use RocksDBCheckpointIterator::end() in readRange.

* Set CheckpointReader timout and other Rocks read options.

* Implementing fetchCheckpointRange().

* Added more CheckpointReader tests.

* Cleanup.

* More cleanup.

* Resolved comments.

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2022-12-14 17:44:47 -08:00 committed by GitHub
parent a05649c620
commit 2024237e5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 557 additions and 193 deletions

View File

@ -388,6 +388,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_WRITER_THREAD_PRIORITY, 0 ); init( ROCKSDB_WRITER_THREAD_PRIORITY, 0 );
init( ROCKSDB_BACKGROUND_PARALLELISM, 4 ); init( ROCKSDB_BACKGROUND_PARALLELISM, 4 );
init( ROCKSDB_READ_PARALLELISM, 4 ); init( ROCKSDB_READ_PARALLELISM, 4 );
init( ROCKSDB_CHECKPOINT_READER_PARALLELISM, 4 );
// If true, do not process and store RocksDB logs // If true, do not process and store RocksDB logs
init( ROCKSDB_MUTE_LOGS, true ); init( ROCKSDB_MUTE_LOGS, true );
// Use a smaller memtable in simulation to avoid OOMs. // Use a smaller memtable in simulation to avoid OOMs.
@ -407,6 +408,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_VALUE_TIMEOUT = 5 * 60; init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_VALUE_TIMEOUT = 5 * 60;
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_VALUE_PREFIX_TIMEOUT = 5 * 60; init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_VALUE_PREFIX_TIMEOUT = 5 * 60;
init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_RANGE_TIMEOUT = 5 * 60; init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 ); if (isSimulated) ROCKSDB_READ_RANGE_TIMEOUT = 5 * 60;
init( ROCKSDB_READ_CHECKPOINT_TIMEOUT, 60.0 ); if (isSimulated) ROCKSDB_READ_CHECKPOINT_TIMEOUT = 5 * 60;
init( ROCKSDB_CHECKPOINT_READ_AHEAD_SIZE, 2 << 20 ); // 2M
init( ROCKSDB_READ_QUEUE_WAIT, 1.0 ); init( ROCKSDB_READ_QUEUE_WAIT, 1.0 );
init( ROCKSDB_READ_QUEUE_HARD_MAX, 1000 ); init( ROCKSDB_READ_QUEUE_HARD_MAX, 1000 );
init( ROCKSDB_READ_QUEUE_SOFT_MAX, 500 ); init( ROCKSDB_READ_QUEUE_SOFT_MAX, 500 );

View File

@ -321,6 +321,7 @@ public:
int ROCKSDB_WRITER_THREAD_PRIORITY; int ROCKSDB_WRITER_THREAD_PRIORITY;
int ROCKSDB_BACKGROUND_PARALLELISM; int ROCKSDB_BACKGROUND_PARALLELISM;
int ROCKSDB_READ_PARALLELISM; int ROCKSDB_READ_PARALLELISM;
int ROCKSDB_CHECKPOINT_READER_PARALLELISM;
int64_t ROCKSDB_MEMTABLE_BYTES; int64_t ROCKSDB_MEMTABLE_BYTES;
bool ROCKSDB_LEVEL_STYLE_COMPACTION; bool ROCKSDB_LEVEL_STYLE_COMPACTION;
bool ROCKSDB_UNSAFE_AUTO_FSYNC; bool ROCKSDB_UNSAFE_AUTO_FSYNC;
@ -332,6 +333,8 @@ public:
double ROCKSDB_READ_VALUE_TIMEOUT; double ROCKSDB_READ_VALUE_TIMEOUT;
double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT; double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
double ROCKSDB_READ_RANGE_TIMEOUT; double ROCKSDB_READ_RANGE_TIMEOUT;
double ROCKSDB_READ_CHECKPOINT_TIMEOUT;
int64_t ROCKSDB_CHECKPOINT_READ_AHEAD_SIZE;
double ROCKSDB_READ_QUEUE_WAIT; double ROCKSDB_READ_QUEUE_WAIT;
int ROCKSDB_READ_QUEUE_SOFT_MAX; int ROCKSDB_READ_QUEUE_SOFT_MAX;
int ROCKSDB_READ_QUEUE_HARD_MAX; int ROCKSDB_READ_QUEUE_HARD_MAX;

View File

@ -31,6 +31,8 @@ enum CheckpointFormat {
DataMoveRocksCF = 1, DataMoveRocksCF = 1,
// For RocksDB, checkpoint generated via rocksdb::Checkpoint::CreateCheckpoint(). // For RocksDB, checkpoint generated via rocksdb::Checkpoint::CreateCheckpoint().
RocksDB = 2, RocksDB = 2,
// Checkpoint fetched as key-value pairs.
RocksDBKeyValues = 3,
}; };
// Metadata of a FDB checkpoint. // Metadata of a FDB checkpoint.

View File

@ -2595,15 +2595,23 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
std::string checkpointDir = cwd + "checkpoint"; std::string checkpointDir = cwd + "checkpoint";
CheckpointRequest request( CheckpointRequest request(
latestVersion, { allKeys }, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir); latestVersion, { allKeys }, DataMoveRocksCF, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request)); CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
state ICheckpointReader* cpReader = newCheckpointReader(metaData, deterministicRandom()->randomUniqueID()); TraceEvent(SevDebug, "RocksDBCreatedCheckpoint");
wait(cpReader->init(BinaryWriter::toValue(KeyRangeRef("foo"_sr, "foobar"_sr), IncludeVersion()))); state KeyRange testRange = KeyRangeRef("foo"_sr, "foobar"_sr);
state Standalone<StringRef> token = BinaryWriter::toValue(testRange, IncludeVersion());
state ICheckpointReader* cpReader =
newCheckpointReader(metaData, CheckpointAsKeyValues::True, deterministicRandom()->randomUniqueID());
TraceEvent(SevDebug, "RocksDBCheckpointReaderCreated");
ASSERT(cpReader != nullptr);
wait(cpReader->init(token));
TraceEvent(SevDebug, "RocksDBCheckpointReaderInited");
state std::unique_ptr<ICheckpointIterator> iter = cpReader->getIterator(testRange);
loop { loop {
try { try {
state RangeResult res = state RangeResult res =
wait(cpReader->nextKeyValues(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT)); wait(iter->nextBatch(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
state int i = 0; state int i = 0;
for (; i < res.size(); ++i) { for (; i < res.size(); ++i) {
Optional<Value> val = wait(kvStore->readValue(res[i].key)); Optional<Value> val = wait(kvStore->readValue(res[i].key));
@ -2618,6 +2626,7 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
} }
} }
iter.reset();
std::vector<Future<Void>> closes; std::vector<Future<Void>> closes;
closes.push_back(cpReader->close()); closes.push_back(cpReader->close());
closes.push_back(kvStore->onClosed()); closes.push_back(kvStore->onClosed());

View File

@ -80,15 +80,6 @@ struct ShardedRocksDBKeyValueStore;
using rocksdb::BackgroundErrorReason; using rocksdb::BackgroundErrorReason;
struct RangeLessThan {
inline bool operator()(const KeyRange& l, const KeyRange& r) {
if (l.begin == r.begin) {
return l.end < r.end;
}
return l.begin < r.begin;
}
};
// Returns string representation of RocksDB background error reason. // Returns string representation of RocksDB background error reason.
// Error reason code: // Error reason code:
// https://github.com/facebook/rocksdb/blob/12d798ac06bcce36be703b057d5f5f4dab3b270c/include/rocksdb/listener.h#L125 // https://github.com/facebook/rocksdb/blob/12d798ac06bcce36be703b057d5f5f4dab3b270c/include/rocksdb/listener.h#L125
@ -2247,8 +2238,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksdb::WriteBatch writeBatch; rocksdb::WriteBatch writeBatch;
if (format == DataMoveRocksCF) { if (format == DataMoveRocksCF) {
CheckpointMetaData& checkpoint = a.checkpoints.front(); CheckpointMetaData& checkpoint = a.checkpoints.front();
std::sort(a.ranges.begin(), a.ranges.end(), RangeLessThan()); std::sort(a.ranges.begin(), a.ranges.end(), KeyRangeRef::ArbitraryOrder());
std::sort(checkpoint.ranges.begin(), checkpoint.ranges.end(), RangeLessThan()); std::sort(checkpoint.ranges.begin(), checkpoint.ranges.end(), KeyRangeRef::ArbitraryOrder());
if (a.ranges.empty() || checkpoint.ranges.empty() || a.ranges.size() > checkpoint.ranges.size() || if (a.ranges.empty() || checkpoint.ranges.empty() || a.ranges.size() > checkpoint.ranges.size() ||
a.ranges.front().begin != checkpoint.ranges.front().begin) { a.ranges.front().begin != checkpoint.ranges.front().begin) {
TraceEvent(SevError, "ShardedRocksDBRestoreFailed", logId) TraceEvent(SevError, "ShardedRocksDBRestoreFailed", logId)
@ -2301,6 +2292,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
writeBatch.DeleteRange(ps->cf, toSlice(cRange.begin), toSlice(cRange.end)); writeBatch.DeleteRange(ps->cf, toSlice(cRange.begin), toSlice(cRange.end));
} }
} }
} else if (format == RocksDBKeyValues) {
a.done.sendError(not_implemented());
return;
} else if (format == RocksDB) { } else if (format == RocksDB) {
a.done.sendError(not_implemented()); a.done.sendError(not_implemented());
return; return;
@ -3489,6 +3483,101 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
return Void(); return Void();
} }
TEST_CASE("noSim/ShardedRocksDB/CheckpointBasic") {
state std::string rocksDBTestDir = "sharded-rocks-checkpoint-restore";
state std::map<Key, Value> kvs({ { "a"_sr, "TestValueA"_sr },
{ "ab"_sr, "TestValueAB"_sr },
{ "ad"_sr, "TestValueAD"_sr },
{ "b"_sr, "TestValueB"_sr },
{ "ba"_sr, "TestValueBA"_sr },
{ "c"_sr, "TestValueC"_sr },
{ "d"_sr, "TestValueD"_sr },
{ "e"_sr, "TestValueE"_sr },
{ "h"_sr, "TestValueH"_sr },
{ "ha"_sr, "TestValueHA"_sr } });
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore =
new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
// Add some ranges.
std::vector<Future<Void>> addRangeFutures;
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("a"_sr, "c"_sr), "shard-1"));
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("c"_sr, "f"_sr), "shard-2"));
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("h"_sr, "k"_sr), "shard-1"));
kvStore->persistRangeMapping(KeyRangeRef("a"_sr, "f"_sr), true);
wait(waitForAll(addRangeFutures) && kvStore->commit(false));
for (const auto& [k, v] : kvs) {
kvStore->set(KeyValueRef(k, v));
}
wait(kvStore->commit(false));
state std::string checkpointDir = "checkpoint";
platform::eraseDirectoryRecursive(checkpointDir);
// Checkpoint iterator returns only the desired keyrange, i.e., ["ab", "b"].
CheckpointRequest request(latestVersion,
{ KeyRangeRef("a"_sr, "c"_sr), KeyRangeRef("h"_sr, "k"_sr) },
DataMoveRocksCF,
deterministicRandom()->randomUniqueID(),
checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
state Standalone<StringRef> token = BinaryWriter::toValue(KeyRangeRef("a"_sr, "k"_sr), IncludeVersion());
state ICheckpointReader* cpReader =
newCheckpointReader(metaData, CheckpointAsKeyValues::True, deterministicRandom()->randomUniqueID());
ASSERT(cpReader != nullptr);
wait(cpReader->init(token));
state KeyRange testRange(KeyRangeRef("ab"_sr, "b"_sr));
state std::unique_ptr<ICheckpointIterator> iter0 = cpReader->getIterator(testRange);
state int numKeys = 0;
try {
loop {
RangeResult res = wait(iter0->nextBatch(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
for (const auto& kv : res) {
ASSERT(testRange.contains(kv.key));
ASSERT(kvs[kv.key] == kv.value);
++numKeys;
}
}
} catch (Error& e) {
ASSERT(e.code() == error_code_end_of_stream);
ASSERT(numKeys == 2);
}
testRange = KeyRangeRef("a"_sr, "k"_sr);
state std::unique_ptr<ICheckpointIterator> iter1 = cpReader->getIterator(testRange);
try {
numKeys = 0;
loop {
RangeResult res = wait(iter1->nextBatch(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
for (const auto& kv : res) {
ASSERT(testRange.contains(kv.key));
ASSERT(kvs[kv.key] == kv.value);
++numKeys;
}
}
} catch (Error& e) {
ASSERT(e.code() == error_code_end_of_stream);
ASSERT(numKeys == 7);
}
iter0.reset();
iter1.reset();
ASSERT(!cpReader->inUse());
TraceEvent(SevDebug, "ShardedRocksCheckpointReaaderTested");
std::vector<Future<Void>> closes;
closes.push_back(cpReader->close());
closes.push_back(kvStore->onClosed());
kvStore->dispose();
wait(waitForAll(closes));
platform::eraseDirectoryRecursive(rocksDBTestDir);
platform::eraseDirectoryRecursive(checkpointDir);
return Void();
}
} // namespace } // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL #endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -42,6 +42,8 @@
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
FDB_DEFINE_BOOLEAN_PARAM(CheckpointAsKeyValues);
#ifdef SSD_ROCKSDB_EXPERIMENTAL #ifdef SSD_ROCKSDB_EXPERIMENTAL
// Enforcing rocksdb version to be 7.7.3. // Enforcing rocksdb version to be 7.7.3.
static_assert((ROCKSDB_MAJOR == 7 && ROCKSDB_MINOR == 7 && ROCKSDB_PATCH == 3), static_assert((ROCKSDB_MAJOR == 7 && ROCKSDB_MINOR == 7 && ROCKSDB_PATCH == 3),
@ -53,6 +55,47 @@ using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*; using CF = rocksdb::ColumnFamilyHandle*;
const KeyRef persistVersion = "\xff\xffVersion"_sr; const KeyRef persistVersion = "\xff\xffVersion"_sr;
const KeyRef readerInitialized = "\xff\xff/ReaderInitialized"_sr;
const std::string checkpointCf = "RocksDBCheckpoint";
const std::string checkpointReaderSubDir = "/reader";
const std::string rocksDefaultCf = "default";
rocksdb::ExportImportFilesMetaData getMetaData(const CheckpointMetaData& checkpoint) {
rocksdb::ExportImportFilesMetaData metaData;
if (checkpoint.getFormat() != DataMoveRocksCF) {
return metaData;
}
RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint);
metaData.db_comparator_name = rocksCF.dbComparatorName;
for (const LiveFileMetaData& fileMetaData : rocksCF.sstFiles) {
rocksdb::LiveFileMetaData liveFileMetaData;
liveFileMetaData.size = fileMetaData.size;
liveFileMetaData.name = fileMetaData.name;
liveFileMetaData.file_number = fileMetaData.file_number;
liveFileMetaData.db_path = fileMetaData.db_path;
liveFileMetaData.smallest_seqno = fileMetaData.smallest_seqno;
liveFileMetaData.largest_seqno = fileMetaData.largest_seqno;
liveFileMetaData.smallestkey = fileMetaData.smallestkey;
liveFileMetaData.largestkey = fileMetaData.largestkey;
liveFileMetaData.num_reads_sampled = fileMetaData.num_reads_sampled;
liveFileMetaData.being_compacted = fileMetaData.being_compacted;
liveFileMetaData.num_entries = fileMetaData.num_entries;
liveFileMetaData.num_deletions = fileMetaData.num_deletions;
liveFileMetaData.temperature = static_cast<rocksdb::Temperature>(fileMetaData.temperature);
liveFileMetaData.oldest_blob_file_number = fileMetaData.oldest_blob_file_number;
liveFileMetaData.oldest_ancester_time = fileMetaData.oldest_ancester_time;
liveFileMetaData.file_creation_time = fileMetaData.file_creation_time;
liveFileMetaData.file_checksum = fileMetaData.file_checksum;
liveFileMetaData.file_checksum_func_name = fileMetaData.file_checksum_func_name;
liveFileMetaData.column_family_name = fileMetaData.column_family_name;
liveFileMetaData.level = fileMetaData.level;
metaData.files.push_back(liveFileMetaData);
}
return metaData;
}
rocksdb::Slice toSlice(StringRef s) { rocksdb::Slice toSlice(StringRef s) {
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size()); return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
@ -69,7 +112,7 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
rocksdb::Options getOptions() { rocksdb::Options getOptions() {
rocksdb::Options options({}, getCFOptions()); rocksdb::Options options({}, getCFOptions());
options.create_if_missing = false; options.create_if_missing = true;
options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY; options.db_log_dir = SERVER_KNOBS->LOG_DIRECTORY;
return options; return options;
} }
@ -103,27 +146,65 @@ Error statusToError(const rocksdb::Status& s) {
// RocksDBCheckpointReader reads a RocksDB checkpoint, and returns the key-value pairs via nextKeyValues. // RocksDBCheckpointReader reads a RocksDB checkpoint, and returns the key-value pairs via nextKeyValues.
class RocksDBCheckpointReader : public ICheckpointReader { class RocksDBCheckpointReader : public ICheckpointReader {
public: public:
class RocksDBCheckpointIterator : public ICheckpointIterator {
public:
RocksDBCheckpointIterator(RocksDBCheckpointReader* reader, const KeyRange& range)
: reader(reader), range(range) {
ASSERT(reader != nullptr);
ASSERT(reader->db != nullptr);
ASSERT(reader->cf != nullptr);
this->beginSlice = toSlice(this->range.begin);
this->endSlice = toSlice(this->range.end);
rocksdb::ReadOptions options = getReadOptions();
options.iterate_lower_bound = &beginSlice;
options.iterate_upper_bound = &endSlice;
options.fill_cache = false; // Optimized for bulk scan.
options.readahead_size = SERVER_KNOBS->ROCKSDB_CHECKPOINT_READ_AHEAD_SIZE;
const uint64_t deadlineMicros =
reader->db->GetEnv()->NowMicros() + SERVER_KNOBS->ROCKSDB_READ_CHECKPOINT_TIMEOUT * 1000000;
options.deadline = std::chrono::microseconds(deadlineMicros);
this->iterator = std::unique_ptr<rocksdb::Iterator>(reader->db->NewIterator(options, reader->cf));
iterator->Seek(this->beginSlice);
}
~RocksDBCheckpointIterator() { this->reader->numIter--; }
Future<RangeResult> nextBatch(const int rowLimit, const int ByteLimit) override;
rocksdb::Iterator* getIterator() { return iterator.get(); }
const rocksdb::Slice& end() const { return this->endSlice; }
private:
RocksDBCheckpointReader* const reader;
const KeyRange range;
rocksdb::Slice beginSlice;
rocksdb::Slice endSlice;
std::unique_ptr<rocksdb::Iterator> iterator;
};
RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID); RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID);
Future<Void> init(StringRef token) override; Future<Void> init(StringRef token) override;
Future<RangeResult> nextKeyValues(const int rowLimit, const int byteLimit) override; Future<RangeResult> nextKeyValues(const int rowLimit, const int byteLimit) override { throw not_implemented(); }
Future<Standalone<StringRef>> nextChunk(const int byteLimit) override { throw not_implemented(); } Future<Standalone<StringRef>> nextChunk(const int byteLimit) override { throw not_implemented(); }
Future<Void> close() override { return doClose(this); } Future<Void> close() override { return doClose(this); }
std::unique_ptr<ICheckpointIterator> getIterator(KeyRange range) override;
bool inUse() const override { return this->numIter > 0; }
private: private:
struct Reader : IThreadPoolReceiver { struct Reader : IThreadPoolReceiver {
struct OpenAction : TypedAction<Reader, OpenAction> { struct OpenAction : TypedAction<Reader, OpenAction> {
OpenAction(std::string path, KeyRange range, Version version) OpenAction(CheckpointMetaData checkpoint) : checkpoint(std::move(checkpoint)) {}
: path(std::move(path)), range(range), version(version) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
const std::string path; const CheckpointMetaData checkpoint;
const KeyRange range;
const Version version;
ThreadReturnPromise<Void> done; ThreadReturnPromise<Void> done;
}; };
@ -137,17 +218,18 @@ private:
}; };
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> { struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
ReadRangeAction(int rowLimit, int byteLimit) ReadRangeAction(int rowLimit, int byteLimit, RocksDBCheckpointIterator* iterator)
: rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()) {} : rowLimit(rowLimit), byteLimit(byteLimit), iterator(iterator), startTime(timer_monotonic()) {}
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; } double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
const int rowLimit, byteLimit; const int rowLimit, byteLimit;
RocksDBCheckpointIterator* const iterator;
const double startTime; const double startTime;
ThreadReturnPromise<RangeResult> result; ThreadReturnPromise<RangeResult> result;
}; };
explicit Reader(DB& db); explicit Reader(DB& db, CF& cf);
~Reader() override {} ~Reader() override {}
void init() override {} void init() override {}
@ -158,35 +240,48 @@ private:
void action(ReadRangeAction& a); void action(ReadRangeAction& a);
rocksdb::Status tryOpenForRead(const std::string& path);
rocksdb::Status importCheckpoint(const std::string& path, const CheckpointMetaData& checkpoint);
rocksdb::Status closeInternal(const std::string& path, const bool deleteOnClose);
DB& db; DB& db;
CF cf; CF& cf;
Key begin;
Key end;
std::vector<rocksdb::ColumnFamilyHandle*> handles; std::vector<rocksdb::ColumnFamilyHandle*> handles;
double readRangeTimeout; double readRangeTimeout;
std::unique_ptr<rocksdb::Iterator> cursor;
}; };
Future<RangeResult> nextBatch(const int rowLimit, const int byteLimit, RocksDBCheckpointIterator* iterator);
ACTOR static Future<Void> doClose(RocksDBCheckpointReader* self); ACTOR static Future<Void> doClose(RocksDBCheckpointReader* self);
DB db = nullptr; DB db = nullptr;
CF cf = nullptr;
std::string path; std::string path;
const UID id; const UID id;
Version version; Version version;
Reference<IThreadPool> readThreads; CheckpointMetaData checkpoint;
Reference<IThreadPool> threads;
Future<Void> openFuture; Future<Void> openFuture;
int numIter;
}; };
Future<RangeResult> RocksDBCheckpointReader::RocksDBCheckpointIterator::nextBatch(const int rowLimit,
const int ByteLimit) {
return this->reader->nextBatch(rowLimit, ByteLimit, this);
}
RocksDBCheckpointReader::RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) RocksDBCheckpointReader::RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID)
: id(logID), version(checkpoint.version) { : id(logID), checkpoint(checkpoint), numIter(0) {
RocksDBCheckpoint rocksCheckpoint = getRocksCheckpoint(checkpoint);
this->path = rocksCheckpoint.checkpointDir;
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
readThreads = CoroThreadPool::createThreadPool(); threads = CoroThreadPool::createThreadPool();
} else { } else {
readThreads = createGenericThreadPool(); threads = createGenericThreadPool();
}
for (int i = 0; i < SERVER_KNOBS->ROCKSDB_CHECKPOINT_READER_PARALLELISM; ++i) {
threads->addThread(new Reader(db, cf), "fdb-rocks-cr");
} }
readThreads->addThread(new Reader(db), "fdb-rocks-rd");
} }
Future<Void> RocksDBCheckpointReader::init(StringRef token) { Future<Void> RocksDBCheckpointReader::init(StringRef token) {
@ -194,143 +289,70 @@ Future<Void> RocksDBCheckpointReader::init(StringRef token) {
return openFuture; return openFuture;
} }
KeyRange range = BinaryReader::fromStringRef<KeyRange>(token, IncludeVersion()); auto a = std::make_unique<Reader::OpenAction>(this->checkpoint);
auto a = std::make_unique<Reader::OpenAction>(this->path, range, this->version);
openFuture = a->done.getFuture(); openFuture = a->done.getFuture();
readThreads->post(a.release()); threads->post(a.release());
return openFuture; return openFuture;
} }
Future<RangeResult> RocksDBCheckpointReader::nextKeyValues(const int rowLimit, const int byteLimit) { Future<RangeResult> RocksDBCheckpointReader::nextBatch(const int rowLimit,
auto a = std::make_unique<Reader::ReadRangeAction>(rowLimit, byteLimit); const int byteLimit,
RocksDBCheckpointIterator* iterator) {
auto a = std::make_unique<Reader::ReadRangeAction>(rowLimit, byteLimit, iterator);
auto res = a->result.getFuture(); auto res = a->result.getFuture();
readThreads->post(a.release()); threads->post(a.release());
return res; return res;
} }
RocksDBCheckpointReader::Reader::Reader(DB& db) : db(db), cf(nullptr) { std::unique_ptr<ICheckpointIterator> RocksDBCheckpointReader::getIterator(KeyRange range) {
if (g_network->isSimulated()) { ++this->numIter;
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have return std::unique_ptr<ICheckpointIterator>(new RocksDBCheckpointIterator(this, range));
// very high load and single read thread cannot process all the load within the timeouts.
readRangeTimeout = 5 * 60;
} else {
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
}
} }
RocksDBCheckpointReader::Reader::Reader(DB& db, CF& cf) : db(db), cf(cf) {}
void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::OpenAction& a) { void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::OpenAction& a) {
TraceEvent(SevDebug, "RocksDBCheckpointReaderInitBegin").detail("Checkpoint", a.checkpoint.toString());
ASSERT(cf == nullptr); ASSERT(cf == nullptr);
std::vector<std::string> columnFamilies; const CheckpointMetaData& checkpoint = a.checkpoint;
rocksdb::Options options = getOptions(); const CheckpointFormat format = checkpoint.getFormat();
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, a.path, &columnFamilies); if (format != DataMoveRocksCF) {
if (std::find(columnFamilies.begin(), columnFamilies.end(), "default") == columnFamilies.end()) { TraceEvent(SevDebug, "RocksDBCheckpointReaderError").detail("InvalidFormat", checkpoint.toString());
columnFamilies.push_back("default"); a.done.sendError(not_implemented());
}
rocksdb::ColumnFamilyOptions cfOptions = getCFOptions();
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
for (const std::string& name : columnFamilies) {
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, cfOptions });
}
status = rocksdb::DB::OpenForReadOnly(options, a.path, descriptors, &handles, &db);
if (!status.ok()) {
logRocksDBError(status, "OpenForReadOnly");
a.done.sendError(statusToError(status));
return; return;
} }
for (rocksdb::ColumnFamilyHandle* handle : handles) { RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint);
if (handle->GetName() == SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY) { ASSERT(!rocksCF.sstFiles.empty());
cf = handle; const std::string path = rocksCF.sstFiles.front().db_path + checkpointReaderSubDir;
break;
rocksdb::Status status = tryOpenForRead(path);
if (!status.ok()) {
platform::eraseDirectoryRecursive(path);
status = importCheckpoint(path, checkpoint);
if (status.ok()) {
status = tryOpenForRead(path);
} }
} }
ASSERT(db != nullptr && cf != nullptr); if (!status.ok()) {
begin = a.range.begin;
end = a.range.end;
TraceEvent(SevInfo, "RocksDBCheckpointReaderInit")
.detail("Path", a.path)
.detail("Method", "OpenForReadOnly")
.detail("ColumnFamily", cf->GetName())
.detail("Begin", begin)
.detail("End", end);
rocksdb::PinnableSlice value;
rocksdb::ReadOptions readOptions = getReadOptions();
status = db->Get(readOptions, cf, toSlice(persistVersion), &value);
if (!status.ok() && !status.IsNotFound()) {
logRocksDBError(status, "Checkpoint");
a.done.sendError(statusToError(status)); a.done.sendError(statusToError(status));
return; return;
} }
const Version version =
status.IsNotFound() ? latestVersion : BinaryReader::fromStringRef<Version>(toStringRef(value), Unversioned());
ASSERT(version == a.version);
cursor = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(readOptions, cf));
cursor->Seek(toSlice(begin));
a.done.send(Void()); a.done.send(Void());
TraceEvent(SevDebug, "RocksDBCheckpointReaderInitEnd").detail("Path", path).detail("ColumnFamily", cf->GetName());
} }
void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::CloseAction& a) { void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::CloseAction& a) {
if (db == nullptr) { closeInternal(a.path, a.deleteOnClose);
a.done.send(Void());
return;
}
for (rocksdb::ColumnFamilyHandle* handle : handles) {
if (handle != nullptr) {
TraceEvent("RocksDBCheckpointReaderDestroyCF").detail("Path", a.path).detail("CF", handle->GetName());
db->DestroyColumnFamilyHandle(handle);
}
}
handles.clear();
rocksdb::Status s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
}
if (a.deleteOnClose) {
std::set<std::string> columnFamilies{ "default" };
columnFamilies.insert(SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY);
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
for (const std::string& name : columnFamilies) {
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, getCFOptions() });
}
s = rocksdb::DestroyDB(a.path, getOptions(), descriptors);
if (!s.ok()) {
logRocksDBError(s, "Destroy");
} else {
TraceEvent("RocksDBCheckpointReader").detail("Path", a.path).detail("Method", "Destroy");
}
}
TraceEvent("RocksDBCheckpointReader").detail("Path", a.path).detail("Method", "Close");
a.done.send(Void()); a.done.send(Void());
} }
void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::ReadRangeAction& a) { void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::ReadRangeAction& a) {
const double readBeginTime = timer_monotonic(); TraceEvent(SevDebug, "RocksDBCheckpointReaderReadRangeBegin");
ASSERT(a.iterator != nullptr);
if (readBeginTime - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "RocksDBCheckpointReaderError")
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("Timeout value", readRangeTimeout);
a.result.sendError(timed_out());
return;
}
RangeResult result; RangeResult result;
if (a.rowLimit == 0 || a.byteLimit == 0) { if (a.rowLimit == 0 || a.byteLimit == 0) {
@ -341,55 +363,176 @@ void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::Re
// For now, only forward scan is supported. // For now, only forward scan is supported.
ASSERT(a.rowLimit > 0); ASSERT(a.rowLimit > 0);
rocksdb::Iterator* iter = a.iterator->getIterator();
int accumulatedBytes = 0; int accumulatedBytes = 0;
rocksdb::Status s; rocksdb::Status s;
while (cursor->Valid() && toStringRef(cursor->key()) < end) { while (iter->Valid() && iter->key().compare(a.iterator->end()) < 0) {
KeyValueRef kv(toStringRef(cursor->key()), toStringRef(cursor->value())); KeyValueRef kv(toStringRef(iter->key()), toStringRef(iter->value()));
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize(); accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back_deep(result.arena(), kv); result.push_back_deep(result.arena(), kv);
cursor->Next(); iter->Next();
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) { if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
break; break;
} }
if (timer_monotonic() - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "RocksDBCheckpointReaderError")
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("Timeout value", readRangeTimeout);
a.result.sendError(transaction_too_old());
delete (cursor.release());
return;
}
} }
s = cursor->status(); s = iter->status();
if (!s.ok()) { if (!s.ok()) {
logRocksDBError(s, "ReadRange"); logRocksDBError(s, "ReadRange");
a.result.sendError(statusToError(s)); a.result.sendError(statusToError(s));
delete (cursor.release());
return; return;
} }
if (result.empty()) { if (result.empty()) {
delete (cursor.release());
a.result.sendError(end_of_stream()); a.result.sendError(end_of_stream());
} else { } else {
a.result.send(result); a.result.send(result);
} }
} }
rocksdb::Status RocksDBCheckpointReader::Reader::tryOpenForRead(const std::string& path) {
std::vector<std::string> columnFamilies;
const rocksdb::Options options = getOptions();
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies);
if (std::find(columnFamilies.begin(), columnFamilies.end(), rocksDefaultCf) == columnFamilies.end() ||
std::find(columnFamilies.begin(), columnFamilies.end(), checkpointCf) == columnFamilies.end()) {
return rocksdb::Status::Aborted();
}
const rocksdb::ColumnFamilyOptions cfOptions = getCFOptions();
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
for (const std::string& name : columnFamilies) {
descriptors.emplace_back(name, cfOptions);
}
status = rocksdb::DB::OpenForReadOnly(options, path, descriptors, &handles, &db);
if (!status.ok()) {
logRocksDBError(status, "OpenForReadOnly");
return status;
}
rocksdb::PinnableSlice value;
rocksdb::ReadOptions readOptions = getReadOptions();
status = db->Get(readOptions, db->DefaultColumnFamily(), toSlice(readerInitialized), &value);
if (!status.ok() && !status.IsNotFound()) {
logRocksDBError(status, "CheckpointCheckInitState");
return status;
}
if (status.IsNotFound()) {
status = closeInternal(path, /*deleteOnClose=*/true);
if (!status.ok()) {
return status;
} else {
delete db;
TraceEvent(SevDebug, "RocksDBCheckpointReaderTryOpenError").detail("Path", path);
return rocksdb::Status::Aborted();
}
}
ASSERT(handles.size() == 2);
for (rocksdb::ColumnFamilyHandle* handle : handles) {
if (handle->GetName() == checkpointCf) {
TraceEvent(SevDebug, "RocksDBCheckpointCF").detail("Path", path).detail("ColumnFamily", handle->GetName());
cf = handle;
break;
}
}
ASSERT(db != nullptr && cf != nullptr);
return rocksdb::Status::OK();
}
rocksdb::Status RocksDBCheckpointReader::Reader::importCheckpoint(const std::string& path,
const CheckpointMetaData& checkpoint) {
std::vector<std::string> columnFamilies;
const rocksdb::Options options = getOptions();
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, path, &columnFamilies);
if (std::find(columnFamilies.begin(), columnFamilies.end(), rocksDefaultCf) == columnFamilies.end()) {
columnFamilies.push_back(rocksDefaultCf);
}
const rocksdb::ColumnFamilyOptions cfOptions = getCFOptions();
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
for (const std::string& name : columnFamilies) {
descriptors.emplace_back(name, cfOptions);
}
status = rocksdb::DB::Open(options, path, descriptors, &handles, &db);
if (!status.ok()) {
logRocksDBError(status, "CheckpointReaderOpen");
return status;
}
rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint);
rocksdb::ImportColumnFamilyOptions importOptions;
importOptions.move_files = false;
status = db->CreateColumnFamilyWithImport(cfOptions, checkpointCf, importOptions, metaData, &cf);
if (!status.ok()) {
logRocksDBError(status, "CheckpointReaderImportCheckpoint");
return status;
}
handles.push_back(cf);
TraceEvent(SevDebug, "RocksDBCheckpointReaderImportedCF");
rocksdb::WriteOptions writeOptions;
writeOptions.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
status = db->Put(writeOptions, toSlice(readerInitialized), toSlice("1"_sr));
if (!status.ok()) {
logRocksDBError(status, "CheckpointReaderPersistInitKey");
return status;
}
ASSERT(db != nullptr && cf != nullptr);
return closeInternal(path, /*deleteOnClose=*/false);
}
rocksdb::Status RocksDBCheckpointReader::Reader::closeInternal(const std::string& path, const bool deleteOnClose) {
if (db == nullptr) {
return rocksdb::Status::OK();
}
for (rocksdb::ColumnFamilyHandle* handle : handles) {
if (handle != nullptr) {
TraceEvent("RocksDBCheckpointReaderDestroyCF").detail("Path", path).detail("CF", handle->GetName());
db->DestroyColumnFamilyHandle(handle);
}
}
handles.clear();
rocksdb::Status s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
}
if (deleteOnClose) {
rocksdb::ColumnFamilyOptions options;
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
descriptors.emplace_back(rocksDefaultCf, options);
descriptors.emplace_back(checkpointCf, options);
s = rocksdb::DestroyDB(path, getOptions(), descriptors);
if (!s.ok()) {
logRocksDBError(s, "Destroy");
} else {
TraceEvent(SevDebug, "RocksDBCheckpointReader").detail("Path", path).detail("Method", "Destroy");
}
}
TraceEvent(SevDebug, "RocksDBCheckpointReader").detail("Path", path).detail("Method", "Close");
return s;
}
ACTOR Future<Void> RocksDBCheckpointReader::doClose(RocksDBCheckpointReader* self) { ACTOR Future<Void> RocksDBCheckpointReader::doClose(RocksDBCheckpointReader* self) {
if (self == nullptr) if (self == nullptr)
return Void(); return Void();
auto a = new RocksDBCheckpointReader::Reader::CloseAction(self->path, false); auto a = new RocksDBCheckpointReader::Reader::CloseAction(self->path, false);
auto f = a->done.getFuture(); auto f = a->done.getFuture();
self->readThreads->post(a); self->threads->post(a);
wait(f); wait(f);
if (self != nullptr) { if (self != nullptr) {
wait(self->readThreads->stop()); wait(self->threads->stop());
} }
if (self != nullptr) { if (self != nullptr) {
@ -602,13 +745,15 @@ ACTOR Future<Void> fetchCheckpointRange(Database cx,
std::shared_ptr<rocksdb::SstFileWriter> writer, std::shared_ptr<rocksdb::SstFileWriter> writer,
std::function<Future<Void>(const CheckpointMetaData&)> cFun, std::function<Future<Void>(const CheckpointMetaData&)> cFun,
int maxRetries = 3) { int maxRetries = 3) {
state std::string localFile = dir + "/" + metaData->checkpointID.toString() + ".sst"; state std::string localFile =
RocksDBCheckpoint rcp = getRocksCheckpoint(*metaData); dir + "/" + UID(metaData->checkpointID.first(), deterministicRandom()->randomUInt64()).toString() + ".sst";
RocksDBCheckpointKeyValues rkv = getRocksKeyValuesCheckpoint(*metaData);
TraceEvent("FetchCheckpointRange") TraceEvent("FetchCheckpointRange")
.detail("InitialState", metaData->toString()) .detail("InitialState", metaData->toString())
.detail("RocksCheckpoint", rcp.toString()); .detail("RocksCheckpointKeyValues", rkv.toString())
.detail("FilePath", localFile);
for (const auto& file : rcp.fetchedFiles) { for (const auto& file : rkv.fetchedFiles) {
ASSERT(!file.range.intersects(range)); ASSERT(!file.range.intersects(range));
} }
@ -745,6 +890,40 @@ ACTOR Future<Void> fetchCheckpointRange(Database cx,
return Void(); return Void();
} }
ACTOR Future<Void> fetchCheckpointRanges(Database cx,
std::shared_ptr<CheckpointMetaData> metaData,
std::string dir,
std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
RocksDBCheckpointKeyValues rkv = getRocksKeyValuesCheckpoint(*metaData);
TraceEvent("FetchCheckpointRanges")
.detail("InitialState", metaData->toString())
.detail("RocksCheckpointKeyValues", rkv.toString());
KeyRangeMap<CheckpointFile> fileMap;
for (const auto& file : rkv.fetchedFiles) {
fileMap.insert(file.range, file);
}
std::vector<Future<Void>> fs;
for (const auto& range : rkv.ranges) {
auto ranges = fileMap.intersectingRanges(range);
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
CheckpointFile& file = r->value();
KeyRangeRef currentRange = range & r->range();
if (!file.isValid()) {
std::shared_ptr<rocksdb::SstFileWriter> writer =
std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options());
fs.push_back(fetchCheckpointRange(cx, metaData, currentRange, dir, writer, cFun));
}
}
}
wait(waitForAll(fs));
if (cFun) {
wait(cFun(*metaData));
}
return Void();
}
} // namespace } // namespace
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx, ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
@ -759,7 +938,7 @@ ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
state std::shared_ptr<CheckpointMetaData> metaData = std::make_shared<CheckpointMetaData>(initialState); state std::shared_ptr<CheckpointMetaData> metaData = std::make_shared<CheckpointMetaData>(initialState);
if (metaData->format == DataMoveRocksCF) { if (metaData->getFormat() == DataMoveRocksCF) {
state RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(initialState); state RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(initialState);
TraceEvent(SevDebug, "RocksDBCheckpointMetaData").detail("RocksCF", rocksCF.toString()); TraceEvent(SevDebug, "RocksDBCheckpointMetaData").detail("RocksCF", rocksCF.toString());
@ -772,10 +951,10 @@ ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
.detail("Server", describe(metaData->src)); .detail("Server", describe(metaData->src));
} }
wait(waitForAll(fs)); wait(waitForAll(fs));
} else if (metaData->format == RocksDB) { } else if (metaData->getFormat() == RocksDBKeyValues) {
std::shared_ptr<rocksdb::SstFileWriter> writer = wait(fetchCheckpointRanges(cx, metaData, dir, cFun));
std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options()); } else if (metaData->getFormat() == RocksDB) {
wait(fetchCheckpointRange(cx, metaData, metaData->ranges.front(), dir, writer, cFun)); throw not_implemented();
} }
return *metaData; return *metaData;
@ -846,12 +1025,14 @@ int64_t getTotalFetchedBytes(const std::vector<CheckpointMetaData>& checkpoints)
return totalBytes; return totalBytes;
} }
ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) { ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint,
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID) {
#ifdef SSD_ROCKSDB_EXPERIMENTAL #ifdef SSD_ROCKSDB_EXPERIMENTAL
const CheckpointFormat format = checkpoint.getFormat(); const CheckpointFormat format = checkpoint.getFormat();
if (format == DataMoveRocksCF) { if (format == DataMoveRocksCF && !checkpointAsKeyValues) {
return new RocksDBCFCheckpointReader(checkpoint, logID); return new RocksDBCFCheckpointReader(checkpoint, logID);
} else if (format == RocksDB) { } else {
return new RocksDBCheckpointReader(checkpoint, logID); return new RocksDBCheckpointReader(checkpoint, logID);
} }
#endif // SSD_ROCKSDB_EXPERIMENTAL #endif // SSD_ROCKSDB_EXPERIMENTAL
@ -871,3 +1052,10 @@ RocksDBCheckpoint getRocksCheckpoint(const CheckpointMetaData& checkpoint) {
reader.deserialize(rocksCheckpoint); reader.deserialize(rocksCheckpoint);
return rocksCheckpoint; return rocksCheckpoint;
} }
RocksDBCheckpointKeyValues getRocksKeyValuesCheckpoint(const CheckpointMetaData& checkpoint) {
RocksDBCheckpointKeyValues rocksCheckpoint;
ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion());
reader.deserialize(rocksCheckpoint);
return rocksCheckpoint;
}

View File

@ -23,10 +23,12 @@
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) { ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint,
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID) {
const CheckpointFormat format = checkpoint.getFormat(); const CheckpointFormat format = checkpoint.getFormat();
if (format == DataMoveRocksCF || format == RocksDB) { if (format == DataMoveRocksCF || format == RocksDB) {
return newRocksDBCheckpointReader(checkpoint, logID); return newRocksDBCheckpointReader(checkpoint, checkpointAsKeyValues, logID);
} else { } else {
throw not_implemented(); throw not_implemented();
} }
@ -51,11 +53,12 @@ ACTOR Future<CheckpointMetaData> fetchCheckpoint(Database cx,
std::string dir, std::string dir,
std::function<Future<Void>(const CheckpointMetaData&)> cFun) { std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
TraceEvent("FetchCheckpointBegin", initialState.checkpointID).detail("CheckpointMetaData", initialState.toString()); TraceEvent("FetchCheckpointBegin", initialState.checkpointID).detail("CheckpointMetaData", initialState.toString());
state CheckpointMetaData result; state CheckpointMetaData result;
const CheckpointFormat format = initialState.getFormat(); const CheckpointFormat format = initialState.getFormat();
ASSERT(format != RocksDBKeyValues);
if (format == DataMoveRocksCF || format == RocksDB) { if (format == DataMoveRocksCF || format == RocksDB) {
CheckpointMetaData _result = wait(fetchRocksDBCheckpoint(cx, initialState, dir, cFun)); wait(store(result, fetchRocksDBCheckpoint(cx, initialState, dir, cFun)));
result = _result;
} else { } else {
throw not_implemented(); throw not_implemented();
} }
@ -64,15 +67,30 @@ ACTOR Future<CheckpointMetaData> fetchCheckpoint(Database cx,
return result; return result;
} }
ACTOR Future<std::vector<CheckpointMetaData>> fetchCheckpoints( ACTOR Future<CheckpointMetaData> fetchCheckpointRanges(Database cx,
Database cx, CheckpointMetaData initialState,
std::vector<CheckpointMetaData> initialStates, std::string dir,
std::string dir, std::vector<KeyRange> ranges,
std::function<Future<Void>(const CheckpointMetaData&)> cFun) { std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
std::vector<Future<CheckpointMetaData>> actors; TraceEvent(SevDebug, "FetchCheckpointRangesBegin", initialState.checkpointID)
for (const auto& checkpoint : initialStates) { .detail("CheckpointMetaData", initialState.toString())
actors.push_back(fetchCheckpoint(cx, checkpoint, dir, cFun)); .detail("Ranges", describe(ranges));
ASSERT(!ranges.empty());
state CheckpointMetaData result;
const CheckpointFormat format = initialState.getFormat();
if (format != RocksDBKeyValues) {
if (format != DataMoveRocksCF) {
throw not_implemented();
}
initialState.setFormat(RocksDBKeyValues);
initialState.serializedCheckpoint = ObjectWriter::toValue(RocksDBCheckpointKeyValues(ranges), IncludeVersion());
} }
std::vector<CheckpointMetaData> res = wait(getAll(actors));
return res; wait(store(result, fetchRocksDBCheckpoint(cx, initialState, dir, cFun)));
TraceEvent(SevDebug, "FetchCheckpointRangesEnd", initialState.checkpointID)
.detail("CheckpointMetaData", result.toString())
.detail("Ranges", describe(ranges));
return result;
} }

View File

@ -40,6 +40,8 @@ struct CheckpointFile {
CheckpointFile() = default; CheckpointFile() = default;
CheckpointFile(std::string path, KeyRange range, int64_t size) : path(path), range(range), size(size) {} CheckpointFile(std::string path, KeyRange range, int64_t size) : path(path), range(range), size(size) {}
bool isValid() const { return !path.empty(); }
std::string toString() const { std::string toString() const {
return "CheckpointFile:\nFile Name: " + this->path + "\nRange: " + range.toString() + return "CheckpointFile:\nFile Name: " + this->path + "\nRange: " + range.toString() +
"\nSize: " + std::to_string(size) + "\n"; "\nSize: " + std::to_string(size) + "\n";
@ -241,6 +243,30 @@ struct RocksDBCheckpoint {
} }
}; };
struct RocksDBCheckpointKeyValues {
constexpr static FileIdentifier file_identifier = 13804349;
std::vector<CheckpointFile> fetchedFiles; // Used for fetchCheckpoint, to record the progress.
std::vector<KeyRange> ranges; // The ranges we want to fetch.
RocksDBCheckpointKeyValues(std::vector<KeyRange> ranges) : ranges(ranges) {}
RocksDBCheckpointKeyValues() = default;
CheckpointFormat format() const { return RocksDBKeyValues; }
std::string toString() const {
std::string res = "RocksDBKeyValuesCheckpoint: [Target Ranges]: " + describe(ranges) + " [Fetched Files]: ";
for (const auto& file : fetchedFiles) {
res += file.toString();
}
return res;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, fetchedFiles, ranges);
}
};
// Fetch the checkpoint file(s) to local dir, the checkpoint is specified by initialState. // Fetch the checkpoint file(s) to local dir, the checkpoint is specified by initialState.
// If cFun is provided, the fetch progress can be checkpointed, so that next time, the fetch process // If cFun is provided, the fetch progress can be checkpointed, so that next time, the fetch process
// can be continued, in case of crash. // can be continued, in case of crash.
@ -255,12 +281,16 @@ int64_t getTotalFetchedBytes(const std::vector<CheckpointMetaData>& checkpoints)
// Clean up on-disk files associated with checkpoint. // Clean up on-disk files associated with checkpoint.
ACTOR Future<Void> deleteRocksCheckpoint(CheckpointMetaData checkpoint); ACTOR Future<Void> deleteRocksCheckpoint(CheckpointMetaData checkpoint);
ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID); ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint,
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID);
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint); RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint);
RocksDBCheckpoint getRocksCheckpoint(const CheckpointMetaData& checkpoint); RocksDBCheckpoint getRocksCheckpoint(const CheckpointMetaData& checkpoint);
RocksDBCheckpointKeyValues getRocksKeyValuesCheckpoint(const CheckpointMetaData& checkpoint);
#include "flow/unactorcompiler.h" #include "flow/unactorcompiler.h"
#endif #endif

View File

@ -31,6 +31,15 @@
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
FDB_DECLARE_BOOLEAN_PARAM(CheckpointAsKeyValues);
class ICheckpointIterator {
public:
virtual Future<RangeResult> nextBatch(const int rowLimit, const int ByteLimit) = 0;
virtual ~ICheckpointIterator() {}
};
// An ICheckpointReader can read the contents of a checkpoint created from a KV store, // An ICheckpointReader can read the contents of a checkpoint created from a KV store,
// i.e., by IKeyValueStore::checkpoint(). // i.e., by IKeyValueStore::checkpoint().
class ICheckpointReader { class ICheckpointReader {
@ -47,11 +56,17 @@ public:
virtual Future<Void> close() = 0; virtual Future<Void> close() = 0;
virtual std::unique_ptr<ICheckpointIterator> getIterator(KeyRange range) { throw not_implemented(); }
virtual bool inUse() const { return false; }
protected: protected:
virtual ~ICheckpointReader() {} virtual ~ICheckpointReader() {}
}; };
ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID); ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint,
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID);
// Delete a checkpoint. // Delete a checkpoint.
ACTOR Future<Void> deleteCheckpoint(CheckpointMetaData checkpoint); ACTOR Future<Void> deleteCheckpoint(CheckpointMetaData checkpoint);
@ -64,10 +79,12 @@ ACTOR Future<CheckpointMetaData> fetchCheckpoint(Database cx,
std::string dir, std::string dir,
std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr); std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr);
ACTOR Future<std::vector<CheckpointMetaData>> fetchCheckpoints( // Same as above, except that the checkpoint is fetched as key-value pairs.
ACTOR Future<CheckpointMetaData> fetchCheckpointRanges(
Database cx, Database cx,
std::vector<CheckpointMetaData> initialStates, CheckpointMetaData initialState,
std::string dir, std::string dir,
std::vector<KeyRange> ranges,
std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr); std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr);
#include "flow/unactorcompiler.h" #include "flow/unactorcompiler.h"

View File

@ -2425,7 +2425,7 @@ ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest
} }
try { try {
reader = newCheckpointReader(it->second, deterministicRandom()->randomUniqueID()); reader = newCheckpointReader(it->second, CheckpointAsKeyValues::False, deterministicRandom()->randomUniqueID());
wait(reader->init(req.token)); wait(reader->init(req.token));
loop { loop {
@ -2479,13 +2479,15 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
} }
state ICheckpointReader* reader = nullptr; state ICheckpointReader* reader = nullptr;
state std::unique_ptr<ICheckpointIterator> iter;
try { try {
reader = newCheckpointReader(it->second, self->thisServerID); reader = newCheckpointReader(it->second, CheckpointAsKeyValues::True, self->thisServerID);
wait(reader->init(BinaryWriter::toValue(req.range, IncludeVersion()))); wait(reader->init(BinaryWriter::toValue(req.range, IncludeVersion())));
iter = reader->getIterator(req.range);
loop { loop {
state RangeResult res = state RangeResult res =
wait(reader->nextKeyValues(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT)); wait(iter->nextBatch(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
if (!res.empty()) { if (!res.empty()) {
TraceEvent(SevDebug, "FetchCheckpontKeyValuesReadRange", self->thisServerID) TraceEvent(SevDebug, "FetchCheckpontKeyValuesReadRange", self->thisServerID)
.detail("CheckpointID", req.checkpointID) .detail("CheckpointID", req.checkpointID)
@ -2524,7 +2526,10 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
} }
} }
wait(reader->close()); iter.reset();
if (!reader->inUse()) {
wait(reader->close());
}
return Void(); return Void();
} }