Fix psm test (#10273)

This commit is contained in:
He Liu 2023-05-18 14:54:26 -07:00 committed by GitHub
parent 2916a11a86
commit a5f639f859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 36 deletions

View File

@ -1512,23 +1512,25 @@ ACTOR static Future<Void> startMoveShards(Database occ,
physicalShardMap[ssId].emplace_back(rangeIntersectKeys, srcId);
}
const UID checkpointId = UID(deterministicRandom()->randomUInt64(), srcId.first());
CheckpointMetaData checkpoint(std::vector<KeyRange>{ rangeIntersectKeys },
DataMoveRocksCF,
src,
checkpointId,
dataMoveId);
checkpoint.setState(CheckpointMetaData::Pending);
tr.set(checkpointKeyFor(checkpointId), checkpointValue(checkpoint));
TraceEvent(sevDm, "InitiatedCheckpoint")
.detail("CheckpointID", checkpointId.toString())
.detail("Range", rangeIntersectKeys)
.detail("DataMoveID", dataMoveId)
.detail("SrcServers", describe(src))
.detail("ReadVersion", tr.getReadVersion().get());
dataMove.src.insert(src.begin(), src.end());
dataMove.checkpoints.insert(checkpointId);
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
const UID checkpointId = UID(deterministicRandom()->randomUInt64(), srcId.first());
CheckpointMetaData checkpoint(std::vector<KeyRange>{ rangeIntersectKeys },
DataMoveRocksCF,
src,
checkpointId,
dataMoveId);
checkpoint.setState(CheckpointMetaData::Pending);
tr.set(checkpointKeyFor(checkpointId), checkpointValue(checkpoint));
TraceEvent(sevDm, "InitiatedCheckpoint")
.detail("CheckpointID", checkpointId.toString())
.detail("Range", rangeIntersectKeys)
.detail("DataMoveID", dataMoveId)
.detail("SrcServers", describe(src))
.detail("ReadVersion", tr.getReadVersion().get());
dataMove.checkpoints.insert(checkpointId);
}
}
// Remove old dests from serverKeys.
@ -1945,7 +1947,9 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
wait(waitForAll(actors));
if (range.end == dataMove.ranges.front().end) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
}
tr.clear(dataMoveKeyFor(dataMoveId));
complete = true;
TraceEvent(SevDebug, "FinishMoveShardsDeleteMetaData", dataMoveId)
@ -2697,7 +2701,9 @@ ACTOR Future<Void> cleanUpDataMoveCore(Database occ,
}
if (range.end == dataMove.ranges.front().end) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
if (SERVER_KNOBS->ENABLE_DD_PHYSICAL_SHARD_MOVE) {
wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId));
}
tr.clear(dataMoveKeyFor(dataMoveId));
complete = true;
TraceEvent(SevVerbose, "CleanUpDataMoveDeleteMetaData", dataMoveId)
@ -2757,15 +2763,18 @@ ACTOR Future<Void> cleanUpDataMove(Database occ,
wait(cleanUpDataMoveCore(occ, dataMoveId, lock, cleanUpDataMoveParallelismLock, keys, ddEnabledState));
} catch (Error& e) {
if (e.code() == error_code_retry_clean_up_datamove_tombstone_added) {
TraceEvent(SevDebug, "CleanUpDataMoveTriggerBackground", dataMoveId).detail("DataMoveID", dataMoveId);
ASSERT_WE_THINK(addCleanUpDataMoveActor.present());
addCleanUpDataMoveActor.get().send(cleanUpDataMoveBackground(occ,
dataMoveId,
lock,
cleanUpDataMoveParallelismLock,
keys,
ddEnabledState,
/*backgroundDelaySeconds=*/10));
if (addCleanUpDataMoveActor.present()) {
TraceEvent(SevDebug, "CleanUpDataMoveTriggerBackground", dataMoveId).detail("DataMoveID", dataMoveId);
addCleanUpDataMoveActor.get().send(cleanUpDataMoveBackground(occ,
dataMoveId,
lock,
cleanUpDataMoveParallelismLock,
keys,
ddEnabledState,
/*backgroundDelaySeconds=*/10));
} else {
TraceEvent(SevWarn, "CleanUpDataMoveNotFound", dataMoveId).errorUnsuppressed(e);
}
} else {
TraceEvent(SevWarn, "CleanUpDataMoveFail", dataMoveId).errorUnsuppressed(e);
throw e;

View File

@ -123,7 +123,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
state std::vector<KeyRange> checkpointRanges;
checkpointRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyAC"_sr));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::True, &kvs));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::False, &kvs));
// Move range [TestKeyD, TestKeyF) to sh0;
includes.insert(teamA.begin(), teamA.end());
@ -152,7 +151,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
checkpointRanges.push_back(KeyRangeRef("TestKeyA"_sr, "TestKeyB"_sr));
checkpointRanges.push_back(KeyRangeRef("TestKeyD"_sr, "TestKeyE"_sr));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::True, &kvs));
wait(self->checkpointRestore(self, cx, checkpointRanges, checkpointRanges, CheckpointAsKeyValues::False, &kvs));
// Move range [TestKeyB, TestKeyC) to sh1, on the same server.
includes.insert(teamA.begin(), teamA.end());
@ -212,6 +210,43 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
return Void();
}
ACTOR Future<Void> deleteCheckpoints(Database cx, std::vector<UID> checkpointIds) {
TraceEvent(SevDebug, "DataMoveDeleteCheckpoints").detail("Checkpoints", describe(checkpointIds));
state Transaction tr(cx);
loop {
try {
std::vector<Future<Optional<Value>>> checkpointEntries;
for (const UID& id : checkpointIds) {
checkpointEntries.push_back(tr.get(checkpointKeyFor(id)));
}
std::vector<Optional<Value>> checkpointValues = wait(getAll(checkpointEntries));
for (int i = 0; i < checkpointIds.size(); ++i) {
const auto& value = checkpointValues[i];
if (!value.present()) {
TraceEvent(SevWarnAlways, "CheckpointNotFound");
continue;
}
CheckpointMetaData checkpoint = decodeCheckpointValue(value.get());
const Key key = checkpointKeyFor(checkpoint.checkpointID);
// Setting the state as CheckpointMetaData::Deleting will trigger private mutations to instruct
// all storage servers to delete their local checkpoints.
checkpoint.setState(CheckpointMetaData::Deleting);
tr.set(key, checkpointValue(checkpoint));
tr.clear(singleKeyRange(key));
TraceEvent(SevDebug, "DataMoveDeleteCheckpoint").detail("Checkpoint", checkpoint.toString());
}
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<Void> checkpointRestore(PhysicalShardMoveWorkLoad* self,
Database cx,
std::vector<KeyRange> checkpointRanges,
@ -300,6 +335,12 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
}
}
std::vector<UID> checkpointIds;
for (const auto& it : records) {
checkpointIds.push_back(it.second.checkpointID);
}
wait(self->deleteCheckpoints(cx, checkpointIds));
// Restore KVS.
state std::string rocksDBTestDir = "rocksdb-kvstore-test-restored-db";
platform::eraseDirectoryRecursive(rocksDBTestDir);
@ -479,9 +520,8 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
return version;
}
// Move keys to a random selected team consisting of a single SS, after disabling DD, so that keys won't be
// kept in the new team until DD is enabled.
// Returns the address of the single SS of the new team.
// Move keys to a random selected team consisting of a single SS, this requires DD is disabled to prevent shards
// being moved by DD automatically. Returns the address of the single SS of the new team.
ACTOR Future<std::vector<UID>> moveShard(PhysicalShardMoveWorkLoad* self,
Database cx,
UID dataMoveId,
@ -489,9 +529,6 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
int teamSize,
std::unordered_set<UID> includes,
std::unordered_set<UID> excludes) {
// Disable DD to avoid DD undoing of our move.
int ignore = wait(setDDMode(cx, 0));
// Pick a random SS as the dest, keys will reside on a single server after the move.
std::vector<StorageServerInterface> interfs = wait(getStorageServers(cx));
ASSERT(interfs.size() > teamSize - includes.size());

View File

@ -9,7 +9,7 @@ allowDefaultTenant = false
[[knobs]]
shard_encode_location_metadata = true
enable_dd_physical_shard = true
min_byte_sampling_probability = 0.99
# min_byte_sampling_probability = 0.99
[[test]]
testTitle = 'PhysicalShardMove'