Restore kv (#9030)

* Allow multiple keyranges in CheckpointRequest.
Include DataMove ID in CheckpointMetaData.

* Use UID dataMoveId instead of Optional<UID>.

* Implemented ShardedRocks::checkpoint().

* Implementing createCheckpoint().

* Attempted to change getCheckpointMetaData*() for a single keyrange.

* Added getCheckpointMetaDataForRange.

* Minor fixes for NativeAPI.actor.cpp.

* Replace UID CheckpointMetaData::ssId with std::vector<UID>
CheckpointMetaData::src;

* Implemented getCheckpointMetaData() and completed checkpoint creation
and fetch in test.

* Refactoring CheckpointRequest and CheckpointMetaData

rename `dataMoveId` as `actionId` and make it Optional.

* Fixed ctor of CheckpointMetaData.

* Implemented ShardedRocksDB::restore().

* Tested checkpoint restore, and added range check for restore, so that
the target ranges can be a subset of the checkpoint ranges.

* Added test to partially restore a checkpoint.

* Refactor: added checkpointRestore().

* Sort ranges for comparison.

* Cleanups.

* Check restore ranges are empty; Add ranges in main thread.

* Resolved comments.

* Fixed GetCheckpointMetaData range check issue.

* Refactor CheckpointReader for CF checkpoint.

* Added CheckpointAsKeyValues as a parameter for newCheckpointReader.

* PhysicalShard::restoreKvs().

* Added `ranges` in fetchCheckpoint.

* Added RocksDBCheckpointKeyValues::ranges.

* Added ICheckpointIterator and implemented for RocksDBCheckpointReader.

* Refactored OpenAction for CheckpointReader, handled failure cases.

* Use RocksDBCheckpointIterator::end() in readRange.

* Set CheckpointReader timout and other Rocks read options.

* Implementing fetchCheckpointRange().

* Added more CheckpointReader tests.

* Cleanup.

* More cleanup.

* Added fetchCheckpointRanges test.

* Implemented restore of kv-based checkpoint.

* Improved CheckpointRestore test for kv, non-kv, as well as partial
restore.

* Added test of merge.

* Fixed merge test.

* Cleanup.

* Resolved comments.

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2023-01-20 09:58:34 -08:00 committed by GitHub
parent 827c3b63ce
commit ec6716ff2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 237 additions and 67 deletions

View File

@ -8794,32 +8794,39 @@ static Future<Void> createCheckpointImpl(T tr,
ASSERT(actionId.present());
TraceEvent(SevDebug, "CreateCheckpointTransactionBegin").detail("Ranges", describe(ranges));
// Only the first range is used to look up the location, since we assume all ranges are hosted by a single shard.
// The operation will fail on the storage server otherwise.
state RangeResult keyServers = wait(krmGetRanges(tr, keyServersPrefix, ranges[0]));
ASSERT(!keyServers.more);
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
state std::unordered_map<UID, std::vector<KeyRange>> rangeMap;
state std::unordered_map<UID, std::vector<UID>> srcMap;
for (const auto& range : ranges) {
RangeResult keyServers = wait(krmGetRanges(tr, keyServersPrefix, range));
ASSERT(!keyServers.more);
for (int i = 0; i < keyServers.size() - 1; ++i) {
const KeyRangeRef currentRange(keyServers[i].key, keyServers[i + 1].key);
std::vector<UID> src;
std::vector<UID> dest;
UID srcId;
UID destId;
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId);
rangeMap[srcId].push_back(currentRange);
srcMap.emplace(srcId, src);
}
}
if (format == DataMoveRocksCF) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcId;
UID destId;
decodeKeyServersValue(UIDtoTagMap, keyServers[0].value, src, dest, srcId, destId);
for (const auto& [srcId, ranges] : rangeMap) {
// The checkpoint request is sent to all replicas, in case any of them is unhealthy.
// An alternative is to choose a healthy replica.
const UID checkpointID = UID(srcId.first(), deterministicRandom()->randomUInt64());
CheckpointMetaData checkpoint(ranges, format, srcMap[srcId], checkpointID, actionId.get());
checkpoint.setState(CheckpointMetaData::Pending);
tr->set(checkpointKeyFor(checkpointID), checkpointValue(checkpoint));
// The checkpoint request is sent to all replicas, in case any of them is unhealthy.
// An alternative is to choose a healthy replica.
const UID checkpointID = UID(srcId.first(), deterministicRandom()->randomUInt64());
CheckpointMetaData checkpoint(ranges, format, src, checkpointID, actionId.get());
checkpoint.setState(CheckpointMetaData::Pending);
tr->set(checkpointKeyFor(checkpointID), checkpointValue(checkpoint));
TraceEvent(SevDebug, "CreateCheckpointTransactionShard")
.detail("CheckpointKey", checkpointKeyFor(checkpointID))
.detail("CheckpointMetaData", checkpoint.toString())
.detail("ReadVersion", tr->getReadVersion().get());
TraceEvent(SevDebug, "CreateCheckpointTransactionShard")
.detail("CheckpointKey", checkpointKeyFor(checkpointID))
.detail("CheckpointMetaData", checkpoint.toString());
}
} else {
throw not_implemented();
}
@ -8975,6 +8982,7 @@ ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
double timeout) {
state std::vector<Future<std::vector<CheckpointMetaData>>> futures;
// TODO(heliu): Avoid send requests to the same shard.
for (const auto& range : ranges) {
futures.push_back(getCheckpointMetaDataForRange(cx, range, version, format, actionId, timeout));
}

View File

@ -545,18 +545,55 @@ struct PhysicalShard {
// Restore from the checkpoint.
rocksdb::Status restore(const CheckpointMetaData& checkpoint) {
rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint);
rocksdb::ImportColumnFamilyOptions importOptions;
importOptions.move_files = true;
rocksdb::Status status = db->CreateColumnFamilyWithImport(getCFOptions(), id, importOptions, metaData, &cf);
const CheckpointFormat format = checkpoint.getFormat();
rocksdb::Status status;
if (format == DataMoveRocksCF) {
rocksdb::ExportImportFilesMetaData metaData = getMetaData(checkpoint);
rocksdb::ImportColumnFamilyOptions importOptions;
importOptions.move_files = true;
status = db->CreateColumnFamilyWithImport(getCFOptions(), id, importOptions, metaData, &cf);
if (!status.ok()) {
logRocksDBError(status, "Restore");
return status;
if (!status.ok()) {
logRocksDBError(status, "RocksImportColumnFamily");
}
} else if (format == RocksDBKeyValues) {
std::vector<std::string> sstFiles;
const RocksDBCheckpointKeyValues rcp = getRocksKeyValuesCheckpoint(checkpoint);
for (const auto& file : rcp.fetchedFiles) {
TraceEvent(SevDebug, "RocksDBRestoreFile")
.detail("Shard", id)
.detail("CheckpointID", checkpoint.checkpointID)
.detail("File", file.toString());
sstFiles.push_back(file.path);
}
if (!sstFiles.empty()) {
ASSERT(cf != nullptr);
rocksdb::IngestExternalFileOptions ingestOptions;
ingestOptions.move_files = true;
ingestOptions.write_global_seqno = false;
ingestOptions.verify_checksums_before_ingest = true;
status = db->IngestExternalFile(cf, sstFiles, ingestOptions);
if (!status.ok()) {
logRocksDBError(status, "RocksIngestExternalFile");
}
} else {
TraceEvent(SevWarn, "RocksDBServeRestoreEmptyRange")
.detail("Shard", id)
.detail("RocksKeyValuesCheckpoint", rcp.toString())
.detail("Checkpoint", checkpoint.toString());
}
TraceEvent(SevInfo, "RocksDBServeRestoreEnd")
.detail("Shard", id)
.detail("Checkpoint", checkpoint.toString());
} else {
throw not_implemented();
}
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
this->isInitialized.store(true);
if (status.ok() && !this->isInitialized) {
readIterPool = std::make_shared<ReadIteratorPool>(db, cf, id);
this->isInitialized.store(true);
}
return status;
}
@ -2236,6 +2273,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
rocksdb::Status status;
rocksdb::WriteBatch writeBatch;
rocksdb::WriteOptions options;
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
if (format == DataMoveRocksCF) {
CheckpointMetaData& checkpoint = a.checkpoints.front();
std::sort(a.ranges.begin(), a.ranges.end(), KeyRangeRef::ArbitraryOrder());
@ -2293,8 +2333,92 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
}
} else if (format == RocksDBKeyValues) {
a.done.sendError(not_implemented());
return;
// Make sure the files are complete for the desired ranges.
std::vector<KeyRange> fetchedRanges;
std::vector<KeyRange> intendedRanges(a.ranges.begin(), a.ranges.end());
std::vector<RocksDBCheckpointKeyValues> rkvs;
for (const auto& checkpoint : a.checkpoints) {
rkvs.push_back(getRocksKeyValuesCheckpoint(checkpoint));
for (const auto& file : rkvs.back().fetchedFiles) {
fetchedRanges.push_back(file.range);
}
}
// Verify that the collective fetchedRanges is the same as the collective intendedRanges.
std::sort(fetchedRanges.begin(), fetchedRanges.end(), KeyRangeRef::ArbitraryOrder());
std::sort(intendedRanges.begin(), intendedRanges.end(), KeyRangeRef::ArbitraryOrder());
int i = 0, j = 0;
while (i < fetchedRanges.size() && j < intendedRanges.size()) {
if (fetchedRanges[i].begin != intendedRanges[j].begin) {
break;
} else if (fetchedRanges[i] == intendedRanges[j]) {
++i;
++j;
} else if (fetchedRanges[i].contains(intendedRanges[j])) {
fetchedRanges[i] = KeyRangeRef(intendedRanges[j].end, fetchedRanges[i].end);
++j;
} else if (intendedRanges[j].contains(fetchedRanges[i])) {
intendedRanges[j] = KeyRangeRef(fetchedRanges[i].end, intendedRanges[j].end);
++i;
} else {
break;
}
}
if (i != fetchedRanges.size() || j != intendedRanges.size()) {
TraceEvent(SevError, "ShardedRocksDBRestoreFailed", logId)
.detail("Reason", "RestoreFilesRangesMismatch")
.detail("Ranges", describe(a.ranges))
.detail("FetchedFiles", describe(rkvs));
a.done.sendError(failed_to_restore_checkpoint());
return;
}
if (!ps->initialized()) {
TraceEvent(SevDebug, "ShardedRocksRestoreInitPS", logId)
.detail("Path", a.path)
.detail("Checkpoints", describe(a.checkpoints))
.detail("PhysicalShard", ps->toString());
status = ps->init();
}
if (!status.ok()) {
logRocksDBError(status, "RestoreInitPhysicalShard");
a.done.sendError(statusToError(status));
return;
}
for (const auto& checkpoint : a.checkpoints) {
status = ps->restore(checkpoint);
if (!status.ok()) {
TraceEvent(SevWarnAlways, "ShardedRocksIngestFileError", logId)
.detail("Error", status.ToString())
.detail("Path", a.path)
.detail("Checkpoint", checkpoint.toString())
.detail("PhysicalShard", ps->toString());
break;
}
}
if (!status.ok()) {
logRocksDBError(status, "RestoreIngestFile");
for (const auto& range : a.ranges) {
writeBatch.DeleteRange(ps->cf, toSlice(range.begin), toSlice(range.end));
}
TraceEvent(SevInfo, "ShardedRocksRevertRestore", logId)
.detail("Path", a.path)
.detail("Checkpoints", describe(a.checkpoints))
.detail("PhysicalShard", ps->toString())
.detail("RestoreRanges", describe(a.ranges));
rocksdb::Status s = a.shardManager->getDb()->Write(options, &writeBatch);
if (!s.ok()) {
TraceEvent(SevError, "ShardedRocksRevertRestoreError", logId)
.detail("Error", s.ToString())
.detail("Path", a.path)
.detail("PhysicalShard", ps->toString())
.detail("RestoreRanges", describe(a.ranges));
}
a.done.sendError(statusToError(status));
return;
}
} else if (format == RocksDB) {
a.done.sendError(not_implemented());
return;
@ -2305,9 +2429,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
a.shardManager->populateRangeMappingMutations(&writeBatch, range, /*isAdd=*/true);
}
rocksdb::WriteOptions options;
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
status = a.shardManager->getDb()->Write(options, &writeBatch);
if (!status.ok()) {
logRocksDBError(status, "RestorePersistMetaData");
@ -2316,8 +2437,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
TraceEvent(SevDebug, "ShardedRocksRestoredMetaDataPersisted", logId)
.detail("Path", a.path)
.detail("Checkpoints", describe(a.checkpoints))
.detail("RocksDBCF", getRocksCF(a.checkpoints[0]).toString());
.detail("Checkpoints", describe(a.checkpoints));
a.shardManager->getMetaDataShard()->refreshReadIteratorPool();
a.done.send(Void());
TraceEvent(SevInfo, "ShardedRocksDBRestoreEnd", logId)

View File

@ -856,9 +856,8 @@ ACTOR Future<Void> fetchCheckpointRange(Database cx,
}
} else {
if (totalBytes > 0) {
RocksDBCheckpoint rcp = getRocksCheckpoint(*metaData);
RocksDBCheckpointKeyValues rcp = getRocksKeyValuesCheckpoint(*metaData);
rcp.fetchedFiles.emplace_back(localFile, range, totalBytes);
rcp.checkpointDir = dir;
metaData->serializedCheckpoint = ObjectWriter::toValue(rcp, IncludeVersion());
}
if (!fileExists(localFile)) {

View File

@ -783,6 +783,7 @@ public:
std::map<Version, std::vector<CheckpointMetaData>> pendingCheckpoints; // Pending checkpoint requests
std::unordered_map<UID, CheckpointMetaData> checkpoints; // Existing and deleting checkpoints
std::unordered_map<UID, ICheckpointReader*> liveCheckpointReaders; // Active checkpoint readers
VersionedMap<int64_t, TenantName> tenantMap;
std::map<Version, std::vector<PendingNewShard>>
pendingAddRanges; // Pending requests to add ranges to physical shards
@ -2476,9 +2477,15 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
}
state ICheckpointReader* reader = nullptr;
auto crIt = self->liveCheckpointReaders.find(req.checkpointID);
if (crIt != self->liveCheckpointReaders.end()) {
reader = crIt->second;
} else {
reader = newCheckpointReader(it->second, CheckpointAsKeyValues::True, self->thisServerID);
}
state std::unique_ptr<ICheckpointIterator> iter;
try {
reader = newCheckpointReader(it->second, CheckpointAsKeyValues::True, self->thisServerID);
wait(reader->init(BinaryWriter::toValue(req.range, IncludeVersion())));
iter = reader->getIterator(req.range);
@ -2525,6 +2532,7 @@ ACTOR Future<Void> fetchCheckpointKeyValuesQ(StorageServer* self, FetchCheckpoin
iter.reset();
if (!reader->inUse()) {
self->liveCheckpointReaders.erase(req.checkpointID);
wait(reader->close());
}
return Void();

View File

@ -82,6 +82,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
{ "TestKeyAB"_sr, "TestValueAB"_sr },
{ "TestKeyAD"_sr, "TestValueAD"_sr },
{ "TestKeyB"_sr, "TestValueB"_sr },
{ "TestKeyBA"_sr, "TestValueBA"_sr },
{ "TestKeyC"_sr, "TestValueC"_sr },
{ "TestKeyD"_sr, "TestValueD"_sr },
{ "TestKeyE"_sr, "TestValueE"_sr },
@ -117,12 +118,13 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
teamSize,
includes,
excludes)));
state std::vector<KeyRange> testRanges;
testRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyAC"_sr));
wait(self->checkpointRestore(self, cx, testRanges, &kvs));
TraceEvent(SevDebug, "TestMovedRange").detail("Range", KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
state std::vector<KeyRange> checkpointRanges;
checkpointRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyAC"_sr));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::True, &kvs));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::False, &kvs));
// Move range [TestKeyD, TestKeyF) to sh0;
includes.insert(teamA.begin(), teamA.end());
state std::vector<UID> teamE = wait(self->moveShard(self,
@ -142,13 +144,15 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
wait(self->getStorageServerShards(cx, teamA[teamIdx], KeyRangeRef("TestKeyD"_sr, "TestKeyF"_sr)));
ASSERT(shards.size() == 1);
ASSERT(shards[0].desiredId == sh0);
ASSERT(shards[0].id == sh0);
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
}
testRanges.clear();
testRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
testRanges.push_back(KeyRangeRef("TestKeyD"_sr, "TestKeyF"_sr));
wait(self->checkpointRestore(self, cx, testRanges, &kvs));
checkpointRanges.clear();
checkpointRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
checkpointRanges.push_back(KeyRangeRef("TestKeyD"_sr, "TestKeyE"_sr));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::True, &kvs));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::False, &kvs));
// Move range [TestKeyB, TestKeyC) to sh1, on the same server.
includes.insert(teamA.begin(), teamA.end());
@ -171,6 +175,14 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
TraceEvent("TestStorageServerShards", teamA[teamIdx]).detail("Shards", describe(shards));
}
checkpointRanges.clear();
checkpointRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
checkpointRanges.push_back(KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr));
std::vector<KeyRange> restoreRanges;
restoreRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
restoreRanges.push_back(KeyRangeRef("TestKeyB"_sr, "TestKeyC"_sr));
wait(self->checkpointRestore(self, cx, checkpointRanges, restoreRanges, CheckpointAsKeyValues::True, &kvs));
state std::vector<UID> teamC = wait(self->moveShard(self,
cx,
UID(sh2, deterministicRandom()->randomUInt64()),
@ -202,11 +214,13 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
ACTOR Future<Void> checkpointRestore(PhysicalShardMoveWorkLoad* self,
Database cx,
std::vector<KeyRange> testRanges,
std::vector<KeyRange> checkpointRanges,
std::vector<KeyRange> restoreRanges,
CheckpointAsKeyValues asKeyValues,
std::map<Key, Value>* kvs) {
// Create checkpoint.
TraceEvent(SevDebug, "TestCreatingCheckpoint").detail("Ranges", describe(testRanges));
TraceEvent(SevDebug, "TestCreatingCheckpoint").detail("Ranges", describe(checkpointRanges));
state Transaction tr(cx);
state CheckpointFormat format = DataMoveRocksCF;
state UID dataMoveId = deterministicRandom()->randomUniqueID();
@ -216,7 +230,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(createCheckpoint(&tr, testRanges, format, dataMoveId));
wait(createCheckpoint(&tr, checkpointRanges, format, dataMoveId));
wait(tr.commit());
version = tr.getCommittedVersion();
break;
@ -230,9 +244,10 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
loop {
records.clear();
try {
wait(store(records, getCheckpointMetaData(cx, testRanges, version, format, Optional<UID>(dataMoveId))));
wait(store(records,
getCheckpointMetaData(cx, checkpointRanges, version, format, Optional<UID>(dataMoveId))));
TraceEvent(SevDebug, "TestCheckpointMetaDataFetched")
.detail("Range", describe(testRanges))
.detail("Range", describe(checkpointRanges))
.detail("Version", version)
.detail("Checkpoints", describe(records));
@ -240,7 +255,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
} catch (Error& e) {
TraceEvent("TestFetchCheckpointMetadataError")
.errorUnsuppressed(e)
.detail("Range", describe(testRanges))
.detail("Range", describe(checkpointRanges))
.detail("Version", version);
// The checkpoint was just created, we don't expect this error.
@ -249,17 +264,33 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
}
// Fetch checkpoint.
state std::string pwd = platform::getWorkingDirectory();
state std::string folder = pwd + "/checkpoints";
platform::eraseDirectoryRecursive(folder);
ASSERT(platform::createDirectory(folder));
state std::string checkpointDir = abspath("checkpoints");
platform::eraseDirectoryRecursive(checkpointDir);
ASSERT(platform::createDirectory(checkpointDir));
state std::vector<CheckpointMetaData> fetchedCheckpoints;
state int i = 0;
for (; i < records.size(); ++i) {
loop {
TraceEvent(SevDebug, "TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
try {
CheckpointMetaData record = wait(fetchCheckpoint(cx, records[i], folder));
state CheckpointMetaData record;
if (asKeyValues) {
std::vector<KeyRange> fetchRanges;
for (const auto& range : restoreRanges) {
for (const auto& cRange : records[i].ranges) {
if (cRange.contains(range)) {
fetchRanges.push_back(range);
break;
}
}
}
ASSERT(!fetchRanges.empty());
wait(store(record, fetchCheckpointRanges(cx, records[i], checkpointDir, fetchRanges)));
ASSERT(record.getFormat() == RocksDBKeyValues);
} else {
wait(store(record, fetchCheckpoint(cx, records[i], checkpointDir)));
ASSERT(record.getFormat() == format);
}
fetchedCheckpoints.push_back(record);
TraceEvent(SevDebug, "TestCheckpointFetched").detail("Checkpoint", record.toString());
break;
@ -280,7 +311,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_SHARDED_ROCKSDB);
wait(kvStore->init());
try {
wait(kvStore->restore(shardId, testRanges, fetchedCheckpoints));
wait(kvStore->restore(shardId, restoreRanges, fetchedCheckpoints));
} catch (Error& e) {
TraceEvent(SevError, "TestRestoreCheckpointError")
.errorUnsuppressed(e)
@ -291,6 +322,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
// Validate the restored kv-store.
RangeResult kvRange = wait(kvStore->readRange(normalKeys));
ASSERT(!kvRange.more);
std::unordered_map<Key, Value> kvsKvs;
for (int i = 0; i < kvRange.size(); ++i) {
kvsKvs[kvRange[i].key] = kvRange[i].value;
@ -305,22 +337,25 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
return false;
};
int count = 0;
for (const auto& [key, value] : *kvs) {
auto it = kvsKvs.find(key);
if (containsKey(testRanges, key)) {
TraceEvent(SevVerbose, "TestExpectKeyValueMatch").detail("Key", key).detail("Value", value);
ASSERT(it->second == value);
} else {
TraceEvent(SevVerbose, "TestExpectKeyNotExist").detail("Key", key);
ASSERT(it == kvsKvs.end());
if (containsKey(restoreRanges, key)) {
TraceEvent(SevDebug, "TestExpectKeyValueMatch").detail("Key", key).detail("Value", value);
auto it = kvsKvs.find(key);
ASSERT(it != kvsKvs.end() && it->second == value);
++count;
}
}
ASSERT(kvsKvs.size() == count);
TraceEvent(SevDebug, "TestCheckpointVerified").detail("Checkpoint", describe(fetchedCheckpoints));
Future<Void> close = kvStore->onClosed();
kvStore->dispose();
wait(close);
platform::eraseDirectoryRecursive(rocksDBTestDir);
platform::eraseDirectoryRecursive(checkpointDir);
TraceEvent(SevDebug, "TestRocksDBClosed").detail("Checkpoint", describe(fetchedCheckpoints));