Enabled checkpoint, restore test for RocksDB format.

This commit is contained in:
He Liu 2022-04-08 15:19:25 -07:00 committed by He Liu
parent bc509d9572
commit 61490022b4
4 changed files with 17 additions and 8 deletions

View File

@ -8149,8 +8149,10 @@ ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
futures.clear();
for (index = 0; index < locations.size(); ++index) {
futures.push_back(getCheckpointMetaDataInternal(
GetCheckpointRequest(version, keys, format), locations[index].locations, timeout));
futures.push_back(
getCheckpointMetaDataInternal(GetCheckpointRequest(version, locations[index].range, format),
locations[index].locations,
timeout));
TraceEvent("GetCheckpointShardBegin")
.detail("Range", locations[index].range)
.detail("Version", version)

View File

@ -605,9 +605,14 @@ ACTOR Future<Void> fetchCheckpointRange(Database cx,
state Transaction tr(cx);
state StorageServerInterface ssi;
loop {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
Optional<Value> ss = wait(tr.get(serverListKeyFor(ssID)));
if (!ss.present()) {
TraceEvent(SevWarnAlways, "FetchCheckpointRangeStorageServerNotFound")
.detail("SSID", ssID)
.detail("InitialState", metaData->toString());
throw checkpoint_not_found();
}
ssi = decodeServerListValue(ss.get());

View File

@ -1055,7 +1055,7 @@ public:
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this),
@ -1989,7 +1989,6 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
wait(req.reply.onReady());
FetchCheckpointKeyValuesStreamReply reply;
reply.arena.dependsOn(res.arena());
// reply.data.reserve(reply.arena, res.size());
for (int i = 0; i < res.size(); ++i) {
reply.data.push_back(reply.arena, res[i]);
}

View File

@ -77,7 +77,7 @@ struct SSCheckpointWorkload : TestWorkload {
// Create checkpoint.
state Transaction tr(cx);
state CheckpointFormat format = RocksDBColumnFamily;
state CheckpointFormat format = deterministicRandom()->random01() < 0.5 ? RocksDBColumnFamily : RocksDB;
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
@ -115,7 +115,7 @@ struct SSCheckpointWorkload : TestWorkload {
TraceEvent("TestCheckpointFetched")
.detail("Range", KeyRangeRef(key, endKey).toString())
.detail("Version", version)
.detail("Shards", records.size());
.detail("Checkpoints", describe(records));
state std::string pwd = platform::getWorkingDirectory();
state std::string folder = pwd + "/checkpoints";
@ -123,13 +123,15 @@ struct SSCheckpointWorkload : TestWorkload {
ASSERT(platform::createDirectory(folder));
// Fetch checkpoint.
state std::vector<CheckpointMetaData> fetchedCheckpoints;
state int i = 0;
for (; i < records.size(); ++i) {
loop {
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
try {
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
TraceEvent("TestCheckpointFetched").detail("Checkpoint", records[i].toString());
fetchedCheckpoints.push_back(record);
TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString());
break;
} catch (Error& e) {
TraceEvent("TestFetchCheckpointError")
@ -146,8 +148,9 @@ struct SSCheckpointWorkload : TestWorkload {
// Restore KVS.
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
wait(kvStore->init());
try {
wait(kvStore->restore(records));
wait(kvStore->restore(fetchedCheckpoints));
} catch (Error& e) {
TraceEvent(SevError, "TestRestoreCheckpointError")
.errorUnsuppressed(e)