From 61490022b4f12dcead5ccbe0c569f7d41e4f3a2e Mon Sep 17 00:00:00 2001 From: He Liu Date: Fri, 8 Apr 2022 15:19:25 -0700 Subject: [PATCH] Enabled checkpoint, restore test for RocksDB format. --- fdbclient/NativeAPI.actor.cpp | 6 ++++-- fdbserver/RocksDBCheckpointUtils.actor.cpp | 5 +++++ fdbserver/storageserver.actor.cpp | 3 +-- fdbserver/workloads/PhysicalShardMove.actor.cpp | 11 +++++++---- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8fbc9fdd0d..bfc4403685 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -8149,8 +8149,10 @@ ACTOR Future> getCheckpointMetaData(Database cx, futures.clear(); for (index = 0; index < locations.size(); ++index) { - futures.push_back(getCheckpointMetaDataInternal( - GetCheckpointRequest(version, keys, format), locations[index].locations, timeout)); + futures.push_back( + getCheckpointMetaDataInternal(GetCheckpointRequest(version, locations[index].range, format), + locations[index].locations, + timeout)); TraceEvent("GetCheckpointShardBegin") .detail("Range", locations[index].range) .detail("Version", version) diff --git a/fdbserver/RocksDBCheckpointUtils.actor.cpp b/fdbserver/RocksDBCheckpointUtils.actor.cpp index 4268bef508..f42b491526 100644 --- a/fdbserver/RocksDBCheckpointUtils.actor.cpp +++ b/fdbserver/RocksDBCheckpointUtils.actor.cpp @@ -605,9 +605,14 @@ ACTOR Future fetchCheckpointRange(Database cx, state Transaction tr(cx); state StorageServerInterface ssi; loop { + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { Optional ss = wait(tr.get(serverListKeyFor(ssID))); if (!ss.present()) { + TraceEvent(SevWarnAlways, "FetchCheckpointRangeStorageServerNotFound") + .detail("SSID", ssID) + .detail("InitialState", metaData->toString()); throw checkpoint_not_found(); } ssi = decodeServerListValue(ss.get()); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 397ed0ed2a..b19dc11d29 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1055,7 +1055,7 @@ public: fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM), fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false), - serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), + serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM), instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false), versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0), lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()), counters(this), @@ -1989,7 +1989,6 @@ ACTOR Future fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin wait(req.reply.onReady()); FetchCheckpointKeyValuesStreamReply reply; reply.arena.dependsOn(res.arena()); - // reply.data.reserve(reply.arena, res.size()); for (int i = 0; i < res.size(); ++i) { reply.data.push_back(reply.arena, res[i]); } diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp index 7cc6149646..be0a8541cf 100644 --- a/fdbserver/workloads/PhysicalShardMove.actor.cpp +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -77,7 +77,7 @@ struct SSCheckpointWorkload : TestWorkload { // Create checkpoint. state Transaction tr(cx); - state CheckpointFormat format = RocksDBColumnFamily; + state CheckpointFormat format = deterministicRandom()->random01() < 0.5 ? RocksDBColumnFamily : RocksDB; loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -115,7 +115,7 @@ struct SSCheckpointWorkload : TestWorkload { TraceEvent("TestCheckpointFetched") .detail("Range", KeyRangeRef(key, endKey).toString()) .detail("Version", version) - .detail("Shards", records.size()); + .detail("Checkpoints", describe(records)); state std::string pwd = platform::getWorkingDirectory(); state std::string folder = pwd + "/checkpoints"; @@ -123,13 +123,15 @@ struct SSCheckpointWorkload : TestWorkload { ASSERT(platform::createDirectory(folder)); // Fetch checkpoint. + state std::vector fetchedCheckpoints; state int i = 0; for (; i < records.size(); ++i) { loop { TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString()); try { state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder)); - TraceEvent("TestCheckpointFetched").detail("Checkpoint", records[i].toString()); + fetchedCheckpoints.push_back(record); + TraceEvent("TestCheckpointFetched").detail("Checkpoint", record.toString()); break; } catch (Error& e) { TraceEvent("TestFetchCheckpointError") @@ -146,8 +148,9 @@ struct SSCheckpointWorkload : TestWorkload { // Restore KVS. state IKeyValueStore* kvStore = keyValueStoreRocksDB( rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1); + wait(kvStore->init()); try { - wait(kvStore->restore(records)); + wait(kvStore->restore(fetchedCheckpoints)); } catch (Error& e) { TraceEvent(SevError, "TestRestoreCheckpointError") .errorUnsuppressed(e)