Implemented checkpoint(), restore() for KeyValueStoreRocksDB, for the

new format.
This commit is contained in:
He Liu 2022-04-08 11:13:34 -07:00 committed by He Liu
parent 5233d2a6ac
commit 05bbe174c3
1 changed files with 247 additions and 135 deletions

View File

@ -846,8 +846,10 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
}
}
void logRocksDBError(const rocksdb::Status& status, const std::string& method) {
auto level = status.IsTimedOut() ? SevWarn : SevError;
void logRocksDBError(const rocksdb::Status& status,
const std::string& method,
Optional<Severity> sev = Optional<Severity>()) {
Severity level = sev.present() ? sev.get() : (status.IsTimedOut() ? SevWarn : SevError);
TraceEvent e(level, "RocksDBError");
e.detail("Error", status.ToString()).detail("Method", method).detail("RocksDBSeverity", status.severity());
if (status.IsIOError()) {
@ -867,9 +869,28 @@ Error statusToError(const rocksdb::Status& s) {
struct RocksDBKeyValueStore : IKeyValueStore {
struct Writer : IThreadPoolReceiver {
struct CheckpointAction : TypedAction<Writer, CheckpointAction> {
CheckpointAction(const CheckpointRequest& request) : request(request) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
const CheckpointRequest request;
ThreadReturnPromise<CheckpointMetaData> reply;
};
struct RestoreAction : TypedAction<Writer, RestoreAction> {
RestoreAction(const std::string& path, const std::vector<CheckpointMetaData>& checkpoints)
: path(path), checkpoints(checkpoints) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
const std::string path;
const std::vector<CheckpointMetaData> checkpoints;
ThreadReturnPromise<Void> done;
};
DB& db;
CF& cf;
UID id;
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
std::shared_ptr<ReadIteratorPool> readIterPool;
@ -1153,127 +1174,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
a.done.send(Void());
}
struct CheckpointAction : TypedAction<Writer, CheckpointAction> {
CheckpointAction(const CheckpointRequest& request) : request(request) {}
void action(CheckpointAction& a);
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
const CheckpointRequest request;
ThreadReturnPromise<CheckpointMetaData> reply;
};
void action(CheckpointAction& a) {
TraceEvent("RocksDBServeCheckpointBegin", id)
.detail("MinVersion", a.request.version)
.detail("Range", a.request.range.toString())
.detail("Format", static_cast<int>(a.request.format))
.detail("CheckpointDir", a.request.checkpointDir);
rocksdb::Checkpoint* checkpoint;
rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
rocksdb::PinnableSlice value;
rocksdb::ReadOptions readOptions = getReadOptions();
s = db->Get(readOptions, cf, toSlice(persistVersion), &value);
if (!s.ok() && !s.IsNotFound()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
const Version version = s.IsNotFound()
? latestVersion
: BinaryReader::fromStringRef<Version>(toStringRef(value), Unversioned());
TraceEvent("RocksDBServeCheckpointVersion", id)
.detail("CheckpointVersion", a.request.version)
.detail("PersistVersion", version);
// TODO: set the range as the actual shard range.
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
const std::string& checkpointDir = a.request.checkpointDir;
if (a.request.format == RocksDBColumnFamily) {
rocksdb::ExportImportFilesMetaData* pMetadata;
platform::eraseDirectoryRecursive(checkpointDir);
const std::string cwd = platform::getWorkingDirectory() + "/";
s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
populateMetaData(&res, *pMetadata);
delete pMetadata;
TraceEvent("RocksDBServeCheckpointSuccess", id)
.detail("CheckpointMetaData", res.toString())
.detail("RocksDBCF", getRocksCF(res).toString());
} else {
throw not_implemented();
}
res.setState(CheckpointMetaData::Complete);
a.reply.send(res);
}
struct RestoreAction : TypedAction<Writer, RestoreAction> {
RestoreAction(const std::string& path, const std::vector<CheckpointMetaData>& checkpoints)
: path(path), checkpoints(checkpoints) {}
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
const std::string path;
const std::vector<CheckpointMetaData> checkpoints;
ThreadReturnPromise<Void> done;
};
void action(RestoreAction& a) {
TraceEvent("RocksDBServeRestoreBegin", id).detail("Path", a.path);
// TODO: Fail gracefully.
ASSERT(!a.checkpoints.empty());
if (a.checkpoints[0].format == RocksDBColumnFamily) {
ASSERT_EQ(a.checkpoints.size(), 1);
TraceEvent("RocksDBServeRestoreCF", id)
.detail("Path", a.path)
.detail("Checkpoint", a.checkpoints[0].toString())
.detail("RocksDBCF", getRocksCF(a.checkpoints[0]).toString());
auto options = getOptions();
rocksdb::Status status = rocksdb::DB::Open(options, a.path, &db);
if (!status.ok()) {
logRocksDBError(status, "Restore");
a.done.sendError(statusToError(status));
return;
}
rocksdb::ExportImportFilesMetaData metaData = getMetaData(a.checkpoints[0]);
rocksdb::ImportColumnFamilyOptions importOptions;
importOptions.move_files = true;
status = db->CreateColumnFamilyWithImport(
getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, importOptions, metaData, &cf);
if (!status.ok()) {
logRocksDBError(status, "Restore");
a.done.sendError(statusToError(status));
} else {
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Restore");
a.done.send(Void());
}
} else {
throw not_implemented();
}
}
void action(RestoreAction& a);
};
struct Reader : IThreadPoolReceiver {
@ -2035,6 +1938,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
.detail("Dir", dir);
}
} else if (checkpoint.format == RocksDB) {
std::cout << "3" << std::endl;
throw not_implemented();
} else {
throw internal_error();
@ -2043,6 +1947,172 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
};
void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
TraceEvent("RocksDBServeCheckpointBegin", id)
.detail("MinVersion", a.request.version)
.detail("Range", a.request.range.toString())
.detail("Format", static_cast<int>(a.request.format))
.detail("CheckpointDir", a.request.checkpointDir);
rocksdb::Checkpoint* checkpoint;
rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
rocksdb::PinnableSlice value;
rocksdb::ReadOptions readOptions = getReadOptions();
s = db->Get(readOptions, cf, toSlice(persistVersion), &value);
if (!s.ok() && !s.IsNotFound()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
const Version version =
s.IsNotFound() ? latestVersion : BinaryReader::fromStringRef<Version>(toStringRef(value), Unversioned());
ASSERT(a.request.version == version || a.request.version == latestVersion);
TraceEvent(SevDebug, "RocksDBServeCheckpointVersion", id)
.detail("CheckpointVersion", a.request.version)
.detail("PersistVersion", version);
// TODO: set the range as the actual shard range.
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
const std::string& checkpointDir = a.request.checkpointDir;
if (a.request.format == RocksDBColumnFamily) {
rocksdb::ExportImportFilesMetaData* pMetadata;
platform::eraseDirectoryRecursive(checkpointDir);
const std::string cwd = platform::getWorkingDirectory() + "/";
s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata);
if (!s.ok()) {
logRocksDBError(s, "ExportColumnFamily");
a.reply.sendError(statusToError(s));
return;
}
populateMetaData(&res, *pMetadata);
delete pMetadata;
TraceEvent("RocksDBServeCheckpointSuccess", id)
.detail("CheckpointMetaData", res.toString())
.detail("RocksDBCF", getRocksCF(res).toString());
} else if (a.request.format == RocksDB) {
platform::eraseDirectoryRecursive(checkpointDir);
uint64_t debugCheckpointSeq = -1;
s = checkpoint->CreateCheckpoint(checkpointDir, /*log_size_for_flush=*/0, &debugCheckpointSeq);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
RocksDBCheckpoint rcp;
rcp.checkpointDir = checkpointDir;
rcp.sstFiles = platform::listFiles(checkpointDir, ".sst");
res.serializedCheckpoint = ObjectWriter::toValue(rcp, IncludeVersion());
TraceEvent("RocksDBCheckpointCreated", id)
.detail("CheckpointVersion", a.request.version)
.detail("RocksSequenceNumber", debugCheckpointSeq)
.detail("CheckpointDir", checkpointDir);
} else {
throw not_implemented();
}
res.setState(CheckpointMetaData::Complete);
a.reply.send(res);
}
void RocksDBKeyValueStore::Writer::action(RestoreAction& a) {
TraceEvent("RocksDBRestoreBegin", id).detail("Path", a.path).detail("Checkpoints", describe(a.checkpoints));
ASSERT(db != nullptr);
ASSERT(!a.checkpoints.empty());
const CheckpointFormat format = a.checkpoints[0].getFormat();
for (int i = 1; i < a.checkpoints.size(); ++i) {
if (a.checkpoints[i].getFormat() != format) {
throw invalid_checkpoint_format();
}
}
rocksdb::Status status;
if (format == RocksDBColumnFamily) {
ASSERT_EQ(a.checkpoints.size(), 1);
TraceEvent("RocksDBServeRestoreCF", id)
.detail("Path", a.path)
.detail("Checkpoint", a.checkpoints[0].toString())
.detail("RocksDBCF", getRocksCF(a.checkpoints[0]).toString());
if (g_network->isSimulated() && cf != nullptr) {
ASSERT(db->DropColumnFamily(cf).ok());
}
rocksdb::ExportImportFilesMetaData metaData = getMetaData(a.checkpoints[0]);
rocksdb::ImportColumnFamilyOptions importOptions;
importOptions.move_files = true;
status = db->CreateColumnFamilyWithImport(
getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, importOptions, metaData, &cf);
if (!status.ok()) {
logRocksDBError(status, "Restore");
a.done.sendError(statusToError(status));
} else {
TraceEvent(SevInfo, "RocksDBRestoreCFSuccess")
.detail("Path", a.path)
.detail("Checkpoint", a.checkpoints[0].toString());
a.done.send(Void());
}
} else if (format == RocksDB) {
if (cf == nullptr) {
status = db->CreateColumnFamily(getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, &cf);
TraceEvent("RocksDBServeRestoreRange", id)
.detail("Path", a.path)
.detail("Checkpoint", describe(a.checkpoints));
if (!status.ok()) {
logRocksDBError(status, "CreateColumnFamily");
a.done.sendError(statusToError(status));
return;
}
}
std::vector<std::string> sstFiles;
for (const auto& checkpoint : a.checkpoints) {
const RocksDBCheckpoint rocksCheckpoint = getRocksCheckpoint(checkpoint);
for (const auto& file : rocksCheckpoint.fetchedFiles) {
TraceEvent("RocksDBRestoreFile", id)
.detail("Checkpoint", rocksCheckpoint.toString())
.detail("File", file.toString());
sstFiles.push_back(file.path);
}
}
if (!sstFiles.empty()) {
rocksdb::IngestExternalFileOptions ingestOptions;
ingestOptions.move_files = true;
ingestOptions.write_global_seqno = false;
ingestOptions.verify_checksums_before_ingest = true;
status = db->IngestExternalFile(cf, sstFiles, ingestOptions);
if (!status.ok()) {
logRocksDBError(status, "IngestExternalFile", SevWarnAlways);
a.done.sendError(statusToError(status));
return;
}
} else {
TraceEvent(SevDebug, "RocksDBServeRestoreEmptyRange", id)
.detail("Path", a.path)
.detail("Checkpoint", describe(a.checkpoints));
}
TraceEvent("RocksDBServeRestoreEnd", id).detail("Path", a.path).detail("Checkpoint", describe(a.checkpoints));
a.done.send(Void());
} else {
throw not_implemented();
}
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL
@ -2155,7 +2225,7 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBReopen") {
return Void();
}
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") {
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreColumnFamily") {
state std::string cwd = platform::getWorkingDirectory() + "/";
state std::string rocksDBTestDir = "rocksdb-kvstore-br-test-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
@ -2169,6 +2239,13 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") {
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
state std::string rocksDBRestoreDir = "rocksdb-kvstore-br-restore-db";
platform::eraseDirectoryRecursive(rocksDBRestoreDir);
state IKeyValueStore* kvStoreCopy =
new RocksDBKeyValueStore(rocksDBRestoreDir, deterministicRandom()->randomUniqueID());
wait(kvStoreCopy->init());
platform::eraseDirectoryRecursive("checkpoint");
state std::string checkpointDir = cwd + "checkpoint";
@ -2176,12 +2253,6 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") {
latestVersion, allKeys, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
state std::string rocksDBRestoreDir = "rocksdb-kvstore-br-restore-db";
platform::eraseDirectoryRecursive(rocksDBRestoreDir);
state IKeyValueStore* kvStoreCopy =
new RocksDBKeyValueStore(rocksDBRestoreDir, deterministicRandom()->randomUniqueID());
std::vector<CheckpointMetaData> checkpoints;
checkpoints.push_back(metaData);
wait(kvStoreCopy->restore(checkpoints));
@ -2202,11 +2273,52 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") {
return Void();
}
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBTypes") {
// If the following assertion fails, update SstFileMetaData and LiveFileMetaData in RocksDBCheckpointUtils.actor.h
// to be the same as rocksdb::SstFileMetaData and rocksdb::LiveFileMetaData.
ASSERT_EQ(sizeof(rocksdb::LiveFileMetaData), 184);
ASSERT_EQ(sizeof(rocksdb::ExportImportFilesMetaData), 32);
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestoreKeyValues") {
state std::string cwd = platform::getWorkingDirectory() + "/";
state std::string rocksDBTestDir = "rocksdb-kvstore-brsst-test-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
wait(kvStore->init());
kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") });
wait(kvStore->commit(false));
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
platform::eraseDirectoryRecursive("checkpoint");
std::string checkpointDir = cwd + "checkpoint";
CheckpointRequest request(latestVersion, allKeys, RocksDB, deterministicRandom()->randomUniqueID(), checkpointDir);
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
state ICheckpointReader* cpReader = newCheckpointReader(metaData, deterministicRandom()->randomUniqueID());
wait(cpReader->init(BinaryWriter::toValue(KeyRangeRef("foo"_sr, "foobar"_sr), IncludeVersion())));
loop {
try {
state RangeResult res =
wait(cpReader->nextKeyValues(CLIENT_KNOBS->REPLY_BYTE_LIMIT, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
state int i = 0;
for (; i < res.size(); ++i) {
Optional<Value> val = wait(kvStore->readValue(res[i].key));
ASSERT(val.present() && val.get() == res[i].value);
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
} else {
TraceEvent(SevError, "TestFailed").error(e);
}
}
}
std::vector<Future<Void>> closes;
closes.push_back(cpReader->close());
closes.push_back(kvStore->onClosed());
kvStore->close();
wait(waitForAll(closes));
platform::eraseDirectoryRecursive(rocksDBTestDir);
return Void();
}