dump-checkpoint-meta-data-to-sstfile

This commit is contained in:
Zhe Wang 2023-02-08 21:07:36 -08:00 committed by Zhe Wang
parent 3d9f37d1d1
commit 2e68b44579
6 changed files with 255 additions and 5 deletions

View File

@ -476,6 +476,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD, 21600 ); // 6h, RocksDB default.
init( ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY, isSimulated ? 10.0 : 300.0 ); // Delays shard clean up, must be larger than ROCKSDB_READ_VALUE_TIMEOUT to prevent reading deleted shard.
init( ROCKSDB_EMPTY_RANGE_CHECK, isSimulated ? true : false);
init( ROCKSDB_CREATE_SST_FILE_RETRY_COUNT_MAX, 50 );
// Leader election
bool longLeaderElection = randomize && BUGGIFY;

View File

@ -389,6 +389,7 @@ public:
int64_t ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD;
double ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
bool ROCKSDB_EMPTY_RANGE_CHECK;
int ROCKSDB_CREATE_SST_FILE_RETRY_COUNT_MAX;
// Leader election
int MAX_NOTIFICATIONS;

View File

@ -3763,6 +3763,87 @@ TEST_CASE("noSim/ShardedRocksDB/CheckpointBasic") {
return Void();
}
TEST_CASE("noSim/ShardedRocksDB/RocksDBSstFileWriter") {
state std::string localFile = "rocksdb-sst-file-dump.sst";
// Write kvs1 to sst file
state std::map<Key, Value> kvs1({ { "a"_sr, "1"_sr },
{ "ab"_sr, "12"_sr },
{ "ad"_sr, "14"_sr },
{ "b"_sr, "2"_sr },
{ "ba"_sr, "21"_sr },
{ "c"_sr, "3"_sr },
{ "d"_sr, "4"_sr },
{ "e"_sr, "5"_sr },
{ "h"_sr, "8"_sr },
{ "ha"_sr, "81"_sr } });
state IRocksDBSstFileWriter* sstWriter = nullptr;
sstWriter = beginRocksDBSstFileWriter(localFile);
for (const auto& [key, value] : kvs1) {
sstWriter->write(key, value);
}
endRocksDBSstFileWriter(sstWriter);
// Write kvs2 to the same sst file where kvs2 keys are different from kvs1
state std::map<Key, Value> kvs2({ { "fa"_sr, "61"_sr },
{ "fab"_sr, "612"_sr },
{ "fad"_sr, "614"_sr },
{ "fb"_sr, "62"_sr },
{ "fba"_sr, "621"_sr },
{ "fc"_sr, "63"_sr },
{ "fd"_sr, "64"_sr },
{ "fe"_sr, "65"_sr },
{ "fh"_sr, "68"_sr },
{ "fha"_sr, "681"_sr } });
sstWriter = beginRocksDBSstFileWriter(localFile);
for (const auto& [key, value] : kvs2) {
sstWriter->write(key, value);
}
endRocksDBSstFileWriter(sstWriter);
// Write kvs3 to the same sst file where kvs3 modifies values of kvs2
state std::map<Key, Value> kvs3({ { "fa"_sr, "1"_sr },
{ "fab"_sr, "12"_sr },
{ "fad"_sr, "14"_sr },
{ "fb"_sr, "2"_sr },
{ "fba"_sr, "21"_sr },
{ "fc"_sr, "3"_sr },
{ "fd"_sr, "4"_sr },
{ "fe"_sr, "5"_sr },
{ "fh"_sr, "8"_sr },
{ "fha"_sr, "81"_sr } });
sstWriter = beginRocksDBSstFileWriter(localFile);
for (const auto& [key, value] : kvs3) {
sstWriter->write(key, value);
}
endRocksDBSstFileWriter(sstWriter);
// Check: sst only contains kv of kvs3
rocksdb::Status status;
rocksdb::IngestExternalFileOptions ingestOptions;
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
status = rocksdb::DB::Open(options, "testdb", &db);
ASSERT(status.ok());
status = db->IngestExternalFile({ localFile }, ingestOptions);
ASSERT(status.ok());
std::string value;
for (const auto& [key, targetValue] : kvs1) {
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(status.IsNotFound());
}
for (const auto& [key, targetValue] : kvs2) {
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(value != targetValue.toString());
}
for (const auto& [key, targetValue] : kvs3) {
std::string value;
status = db->Get(rocksdb::ReadOptions(), key.toString(), &value);
ASSERT(status.ok());
ASSERT(value == targetValue.toString());
}
delete db;
return Void();
}
} // namespace
#endif // SSD_ROCKSDB_EXPERIMENTAL

View File

@ -545,6 +545,66 @@ ACTOR Future<Void> RocksDBCheckpointReader::doClose(RocksDBCheckpointReader* sel
return Void();
}
class RocksDBSstFileWriter : public IRocksDBSstFileWriter {
public:
RocksDBSstFileWriter()
: writer(std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(), rocksdb::Options())), hasData(false){};
void open(const std::string localFile) override;
void write(const KeyRef key, const ValueRef value) override;
void finish() override;
private:
std::shared_ptr<rocksdb::SstFileWriter> writer;
std::string localFile;
bool hasData;
};
void RocksDBSstFileWriter::open(const std::string localFile) {
rocksdb::Status status;
this->localFile = localFile;
status = this->writer->Open(this->localFile);
if (!status.ok()) {
Error e = statusToError(status);
TraceEvent(SevError, "RocksDBSstFileWriterWrapperOpenFileError")
.detail("LocalFile", this->localFile)
.detail("Status", status.ToString());
throw e;
}
}
void RocksDBSstFileWriter::write(const KeyRef key, const ValueRef value) {
rocksdb::Status status;
status = this->writer->Put(toSlice(key), toSlice(value));
if (!status.ok()) {
Error e = statusToError(status);
TraceEvent(SevError, "RocksDBSstFileWriterWrapperWriteError")
.detail("LocalFile", this->localFile)
.detail("Key", key)
.detail("Value", value)
.detail("Status", status.ToString());
throw e;
}
this->hasData = true;
}
void RocksDBSstFileWriter::finish() {
if (!this->hasData) {
return;
}
rocksdb::Status status;
status = this->writer->Finish();
if (!status.ok()) {
Error e = statusToError(status);
TraceEvent(SevError, "RocksDBSstFileWriterWrapperCloseError")
.detail("LocalFile", this->localFile)
.detail("Status", status.ToString());
throw e;
}
}
// RocksDBCFCheckpointReader reads an exported RocksDB Column Family checkpoint, and returns the serialized
// checkpoint via nextChunk.
class RocksDBCFCheckpointReader : public ICheckpointReader {
@ -1038,6 +1098,21 @@ ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoi
return nullptr;
}
IRocksDBSstFileWriter* beginRocksDBSstFileWriter(std::string localFile) {
#ifdef SSD_ROCKSDB_EXPERIMENTAL
IRocksDBSstFileWriter* sstWriter = new RocksDBSstFileWriter();
sstWriter->open(localFile);
return sstWriter;
#endif // SSD_ROCKSDB_EXPERIMENTAL
return nullptr;
}
void endRocksDBSstFileWriter(IRocksDBSstFileWriter* sstWriter) {
ASSERT(sstWriter != nullptr);
sstWriter->finish();
delete sstWriter;
}
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint) {
RocksDBColumnFamilyCheckpoint rocksCF;
ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion());

View File

@ -31,6 +31,17 @@
#include "flow/actorcompiler.h" // has to be last include
class IRocksDBSstFileWriter {
public:
virtual void open(const std::string localFile) = 0;
virtual void write(const KeyRef key, const ValueRef value) = 0;
virtual void finish() = 0;
virtual ~IRocksDBSstFileWriter() {}
};
struct CheckpointFile {
constexpr static FileIdentifier file_identifier = 13804348;
std::string path;
@ -285,6 +296,10 @@ ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoi
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID);
IRocksDBSstFileWriter* beginRocksDBSstFileWriter(std::string localFile);
void endRocksDBSstFileWriter(IRocksDBSstFileWriter* sstWriter);
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint);
RocksDBCheckpoint getRocksCheckpoint(const CheckpointMetaData& checkpoint);

View File

@ -9623,14 +9623,77 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
}
ACTOR Future<Void> createSstFileForCheckPointMetadata(StorageServer* data,
CheckpointMetaData metaData,
std::string dir) {
state int failureCount = 0;
loop {
try {
ASSERT(metaData.ranges.size() > 0);
state std::string localFile = dir + "/metadata_bytes.sst";
state IRocksDBSstFileWriter* sstWriter = nullptr;
sstWriter = beginRocksDBSstFileWriter(localFile);
if (sstWriter == nullptr) {
return Void();
}
state std::vector<KeyRange>::iterator iter = metaData.ranges.begin();
std::sort(metaData.ranges.begin(), metaData.ranges.end(), [](KeyRange a, KeyRange b) {
return a.begin > b.begin;
}); // inplacement sorting acceptable?
state int64_t numGetRangeQueries = 0;
state int64_t numSampledKeys = 0;
while (iter != metaData.ranges.end()) {
state KeyRange range = *iter;
state Key readBegin = range.begin.withPrefix(persistByteSampleKeys.begin);
state Key readEnd = range.end.withPrefix(persistByteSampleKeys.begin);
loop {
try {
state RangeResult result = wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd),
SERVER_KNOBS->STORAGE_LIMIT_BYTES,
SERVER_KNOBS->STORAGE_LIMIT_BYTES));
numGetRangeQueries++;
for (int i = 0; i < result.size(); i++) {
numSampledKeys++;
sstWriter->write(result[i].key, result[i].value);
}
if (result.more) {
ASSERT(result.readThrough.present());
readBegin = keyAfter(result.readThrough.get());
} else {
break;
}
} catch (Error& e) {
if (failureCount < SERVER_KNOBS->ROCKSDB_CREATE_SST_FILE_RETRY_COUNT_MAX) {
failureCount++;
continue;
} else {
throw e;
}
}
}
iter++;
}
endRocksDBSstFileWriter(sstWriter);
TraceEvent("DumpCheckPointMetaData", data->thisServerID)
.detail("NumSampledKeys", numSampledKeys)
.detail("NumGetRangeQueries", numGetRangeQueries)
.detail("CheckpointID", metaData.checkpointID);
break;
} catch (Error& e) {
throw e;
}
}
return Void();
}
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
ASSERT(std::find(metaData.src.begin(), metaData.src.end(), data->thisServerID) != metaData.src.end() &&
!metaData.ranges.empty());
const CheckpointRequest req(metaData.version,
metaData.ranges,
static_cast<CheckpointFormat>(metaData.format),
metaData.checkpointID,
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());
state std::string dir = data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString();
const CheckpointRequest req(
metaData.version, metaData.ranges, static_cast<CheckpointFormat>(metaData.format), metaData.checkpointID, dir);
state CheckpointMetaData checkpointResult;
try {
wait(store(checkpointResult, data->storage.checkpoint(req)));
@ -9647,6 +9710,20 @@ ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData meta
.detail("PendingCheckpoint", checkpointResult.toString());
}
// Dump the checkpoint meta data to the sst file of metadata.
try {
wait(createSstFileForCheckPointMetadata(data, metaData, dir));
TraceEvent("StorageCreateCheckpointMetaDataSstFileDumped", data->thisServerID)
.detail("Checkpoint", checkpointResult.toString());
} catch (Error& e) {
// If the checkpoint meta data is not dumped successfully, remove the checkpoint.
TraceEvent(SevWarn, "StorageCreateCheckpointMetaDataSstFileDumped", data->thisServerID)
.errorUnsuppressed(e)
.detail("Checkpoint", checkpointResult.toString());
data->checkpoints[checkpointResult.checkpointID].setState(CheckpointMetaData::Deleting);
data->actors.add(deleteCheckpointQ(data, metaData.version, checkpointResult));
}
// Persist the checkpoint meta data.
try {
Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() +