Resolving comments.

This commit is contained in:
He Liu 2022-05-09 12:07:13 -07:00 committed by He Liu
parent 54c566e57d
commit 78dc7c5d77
4 changed files with 39 additions and 41 deletions

View File

@ -1938,7 +1938,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
.detail("Dir", dir);
}
} else if (checkpoint.format == RocksDB) {
std::cout << "3" << std::endl;
throw not_implemented();
} else {
throw internal_error();
@ -1982,12 +1981,11 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
// TODO: set the range as the actual shard range.
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
const std::string& checkpointDir = a.request.checkpointDir;
const std::string& checkpointDir = abspath(a.request.checkpointDir);
if (a.request.format == RocksDBColumnFamily) {
rocksdb::ExportImportFilesMetaData* pMetadata;
platform::eraseDirectoryRecursive(checkpointDir);
const std::string cwd = platform::getWorkingDirectory() + "/";
s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata);
if (!s.ok()) {
logRocksDBError(s, "ExportColumnFamily");

View File

@ -44,15 +44,10 @@
#ifdef SSD_ROCKSDB_EXPERIMENTAL
// Enforcing rocksdb version to be 6.22.1 or greater.
static_assert(ROCKSDB_MAJOR >= 6, "Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
static_assert(ROCKSDB_MAJOR == 6 ? ROCKSDB_MINOR >= 22 : true,
"Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
static_assert((ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR == 22) ? ROCKSDB_PATCH >= 1 : true,
"Unsupported rocksdb version. Update the rocksdb to 6.22.1 version");
#endif // SSD_ROCKSDB_EXPERIMENTAL
static_assert(ROCKSDB_MAJOR == 6 && ROCKSDB_MINOR >= 22 && ROCKSDB_PATCH >= 1,
"Unsupported rocksdb version. Update the rocksdb to at least 6.22.1 version");
namespace {
#ifdef SSD_ROCKSDB_EXPERIMENTAL
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*;
@ -221,6 +216,7 @@ RocksDBCheckpointReader::Reader::Reader(DB& db) : db(db), cf(nullptr) {
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
}
}
void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::OpenAction& a) {
ASSERT(cf == nullptr);
@ -324,7 +320,7 @@ void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::Re
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("Timeout value", readRangeTimeout);
a.result.sendError(transaction_too_old());
a.result.sendError(timed_out());
return;
}
@ -334,6 +330,7 @@ void RocksDBCheckpointReader::Reader::action(RocksDBCheckpointReader::Reader::Re
return;
}
// For now, only forward scan is supported.
ASSERT(a.rowLimit > 0);
int accumulatedBytes = 0;
@ -393,7 +390,6 @@ ACTOR Future<Void> RocksDBCheckpointReader::doClose(RocksDBCheckpointReader* sel
return Void();
}
#endif // SSD_ROCKSDB_EXPERIMENTAL
// RocksDBCFCheckpointReader reads an exported RocksDB Column Family checkpoint, and returns the serialized
// checkpoint via nextChunk.
@ -582,7 +578,6 @@ ACTOR Future<Void> fetchCheckpointFile(Database cx,
}
}
#ifdef SSD_ROCKSDB_EXPERIMENTAL
// TODO: Return when a file exceeds a limit.
ACTOR Future<Void> fetchCheckpointRange(Database cx,
std::shared_ptr<CheckpointMetaData> metaData,
@ -732,11 +727,9 @@ ACTOR Future<Void> fetchCheckpointRange(Database cx,
return Void();
}
#endif // SSD_ROCKSDB_EXPERIMENTAL
} // namespace
#ifdef SSD_ROCKSDB_EXPERIMENTAL
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
CheckpointMetaData initialState,
std::string dir,
@ -768,17 +761,7 @@ ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
return *metaData;
}
#else
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
CheckpointMetaData initialState,
std::string dir,
std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
wait(delay(0));
return initialState;
}
#endif // SSD_ROCKSDB_EXPERIMENTAL
#ifdef SSD_ROCKSDB_EXPERIMENTAL
ACTOR Future<Void> deleteRocksCheckpoint(CheckpointMetaData checkpoint) {
state CheckpointFormat format = checkpoint.getFormat();
state std::unordered_set<std::string> dirs;
@ -814,6 +797,14 @@ ACTOR Future<Void> deleteRocksCheckpoint(CheckpointMetaData checkpoint) {
return Void();
}
#else
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
CheckpointMetaData initialState,
std::string dir,
std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
wait(delay(0));
return initialState;
}
ACTOR Future<Void> deleteRocksCheckpoint(CheckpointMetaData checkpoint) {
wait(delay(0));
return Void();
@ -825,6 +816,7 @@ int64_t getTotalFetchedBytes(const std::vector<CheckpointMetaData>& checkpoints)
for (const auto& checkpoint : checkpoints) {
const CheckpointFormat format = checkpoint.getFormat();
if (format == RocksDBColumnFamily) {
// TODO: Returns the checkpoint size of a RocksDB Column Family.
} else if (format == RocksDB) {
auto rcp = getRocksCheckpoint(checkpoint);
for (const auto& file : rcp.fetchedFiles) {

View File

@ -1960,7 +1960,7 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
req.reply.setByteLimit(SERVER_KNOBS->CHECKPOINT_TRANSFER_BLOCK_BYTES);
// Returns error is the checkpoint cannot be found.
// Returns error if the checkpoint cannot be found.
const auto it = self->checkpoints.find(req.checkpointID);
if (it == self->checkpoints.end()) {
req.reply.sendError(checkpoint_not_found());
@ -1978,11 +1978,11 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
if (!res.empty()) {
TraceEvent(SevDebug, "FetchCheckpontKeyValuesReadRange", self->thisServerID)
.detail("CheckpointID", req.checkpointID)
.detail("Begin", res.front().key)
.detail("End", res.back().key)
.detail("FirstReturnedKey", res.front().key)
.detail("LastReturnedKey", res.back().key)
.detail("Size", res.size());
} else {
TraceEvent(SevWarn, "FetchCheckpontKeyValuesEmptyRange", self->thisServerID)
TraceEvent(SevInfo, "FetchCheckpontKeyValuesEmptyRange", self->thisServerID)
.detail("CheckpointID", req.checkpointID);
}
@ -7636,10 +7636,17 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion);
if (requireCheckpoint) {
// `pendingCheckpoints` is a queue of checkpoint requests ordered by their versoins, and
// `newOldestVersion` is chosen such that it is no larger than the smallest pending checkpoing
// version. When the exact desired checkpoint version is committed, updateStorage() is blocked
// and a checkpoint will be created at that version from the underlying storage engine.
// Note a pending checkpoint is only dequeued after the corresponding checkpoint is created
// successfully.
TraceEvent(SevDebug, "CheckpointVersionDurable", data->thisServerID)
.detail("NewDurableVersion", newOldestVersion)
.detail("DesiredVersion", desiredVersion)
.detail("SmallestCheckPointVersion", data->pendingCheckpoints.begin()->first);
// newOldestVersion could be smaller than the desired version due to byte limit.
ASSERT(newOldestVersion <= data->pendingCheckpoints.begin()->first);
if (newOldestVersion == data->pendingCheckpoints.begin()->first) {
std::vector<Future<Void>> createCheckpoints;
@ -7648,6 +7655,8 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx]));
}
wait(waitForAll(createCheckpoints));
// Erase the pending checkpoint after the checkpoint has been created successfully.
ASSERT(newOldestVersion == data->pendingCheckpoints.begin()->first);
data->pendingCheckpoints.erase(data->pendingCheckpoints.begin());
}
requireCheckpoint = false;

View File

@ -71,18 +71,19 @@ struct SSCheckpointWorkload : TestWorkload {
state Key key = "TestKey"_sr;
state Key endKey = "TestKey0"_sr;
state Value oldValue = "TestValue"_sr;
state KeyRange testRange = KeyRangeRef(key, endKey);
int ignore = wait(setDDMode(cx, 0));
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
// Create checkpoint.
state Transaction tr(cx);
state CheckpointFormat format = deterministicRandom()->random01() < 0.5 ? RocksDBColumnFamily : RocksDB;
state CheckpointFormat format = deterministicRandom()->coinflip() ? RocksDBColumnFamily : RocksDB;
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(createCheckpoint(&tr, KeyRangeRef(key, endKey), format));
wait(createCheckpoint(&tr, testRange, format));
wait(tr.commit());
version = tr.getCommittedVersion();
break;
@ -91,20 +92,18 @@ struct SSCheckpointWorkload : TestWorkload {
}
}
TraceEvent("TestCheckpointCreated")
.detail("Range", KeyRangeRef(key, endKey).toString())
.detail("Version", version);
TraceEvent("TestCheckpointCreated").detail("Range", testRange).detail("Version", version);
// Fetch checkpoint meta data.
loop {
try {
state std::vector<CheckpointMetaData> records =
wait(getCheckpointMetaData(cx, KeyRangeRef(key, endKey), version, format));
wait(getCheckpointMetaData(cx, testRange, version, format));
break;
} catch (Error& e) {
TraceEvent("TestFetchCheckpointMetadataError")
.errorUnsuppressed(e)
.detail("Range", KeyRangeRef(key, endKey).toString())
.detail("Range", testRange)
.detail("Version", version);
// The checkpoint was just created, we don't expect this error.
@ -113,7 +112,7 @@ struct SSCheckpointWorkload : TestWorkload {
}
TraceEvent("TestCheckpointFetched")
.detail("Range", KeyRangeRef(key, endKey).toString())
.detail("Range", testRange)
.detail("Version", version)
.detail("Checkpoints", describe(records));
@ -170,10 +169,10 @@ struct SSCheckpointWorkload : TestWorkload {
}
}
for (i = 0; i < res.size(); ++i) {
Optional<Value> value = wait(kvStore->readValue(res[i].key));
ASSERT(value.present());
ASSERT(value.get() == res[i].value);
RangeResult kvRange = wait(kvStore->readRange(testRange));
ASSERT(res.size() == kvRange.size());
for (int i = 0; i < res.size(); ++i) {
ASSERT(res[i] == kvRange[i]);
}
int ignore = wait(setDDMode(cx, 1));