add unit test for replaceRawClearRanges

This commit is contained in:
Xiaoxi Wang 2023-02-07 13:21:18 -08:00
parent 6df5f1fa56
commit 145b81170c
1 changed files with 102 additions and 14 deletions

View File

@ -1264,7 +1264,8 @@ TEST_CASE("/CommitProxy/SplitRange/SplitClearRangeByTenant") {
void replaceRawClearRanges(Arena& arena,
VectorRef<MutationRef>& mutations,
std::vector<std::pair<int, std::vector<MutationRef>>>& splitMutations,
size_t totalSize) {
size_t totalSize,
Optional<UID> debugId = Optional<UID>()) {
if (splitMutations.empty())
return;
@ -1298,6 +1299,86 @@ void replaceRawClearRanges(Arena& arena,
ASSERT_EQ(splitMutations.size(), 0);
}
size_t processClearRangeMutation(Arena& arena,
const std::map<int64_t, TenantName>& tenantMap,
MutationRef& mutation,
int mutationIdx,
int& newMutationSize,
std::vector<std::pair<int, std::vector<MutationRef>>>& idxSplitMutations) {
std::vector<MutationRef> newClears = splitClearRangeByTenant(arena, mutation, tenantMap);
if (newClears.size() == 1) {
mutation = newClears[0];
} else if (newClears.size() > 1) {
CODE_PROBE(true, "Clear Range raw access or cross multiple tenants");
idxSplitMutations.emplace_back(mutationIdx, newClears);
newMutationSize += newClears.size() - 1;
}
return newClears.size();
}
TEST_CASE("/CommitProxy/SplitRange/replaceRawClearRanges") {
int mapSize = 1000;
std::map<int64_t, TenantName> tenantMap;
for (int i = 0; i < mapSize; ++i) {
tenantMap[i * 2] = ""_sr;
}
Arena arena(15 << 10);
VectorRef<MutationRef> mutations;
VectorRef<MutationRef> targetMutations;
mutations.emplace_back_deep(arena, MutationRef::SetValue, "0"_sr, ""_sr);
targetMutations.emplace_back_deep(arena, MutationRef::SetValue, "0"_sr, ""_sr);
// single tenant
int64_t tenantId = deterministicRandom()->randomInt64(0, mapSize) * 2;
KeyRef prefix = TenantAPI::idToPrefix(arena, tenantId);
KeyRef param1 = prefix.withSuffix("a"_sr, arena);
KeyRef param2 = prefix.withSuffix("b"_sr, arena);
mutations.emplace_back(arena, MutationRef::ClearRange, param1, param2);
targetMutations.emplace_back_deep(arena, MutationRef::ClearRange, param1, param2);
// other op
mutations.emplace_back_deep(arena, MutationRef::SetValue, "1"_sr, ""_sr);
targetMutations.emplace_back_deep(arena, MutationRef::SetValue, "1"_sr, ""_sr);
// multiple tenants
int64_t tid1 = deterministicRandom()->randomInt64(0, mapSize - 1) * 2;
KeyRef prefix1 = TenantAPI::idToPrefix(arena, tid1);
param1 = deterministicRandom()->coinflip() ? prefix1 : prefix1.withSuffix("a"_sr, arena); // align or not
// with system keys
int targetSize = mapSize - tid1 / 2 + 1;
Key randomSysKey = systemKeys.begin.withSuffix("sf"_sr, arena);
mutations.emplace_back(arena, MutationRef::ClearRange, param1, randomSysKey);
auto sMutations = splitClearRangeByTenant(arena, mutations.back(), tenantMap);
ASSERT_EQ(targetSize, sMutations.size());
targetMutations.append(arena, sMutations.begin(), sMutations.size());
// other op
mutations.emplace_back_deep(arena, MutationRef::SetValue, "3"_sr, ""_sr);
targetMutations.emplace_back_deep(arena, MutationRef::SetValue, "3"_sr, ""_sr);
// [s, 0], [cr, t0a, t0b], [s, 1], [c, t1a, randomSys], [s, 3]
std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations;
int newMutationSize = mutations.size();
for(int i = 0; i < mutations.size(); ++ i) {
if(mutations[i].type == MutationRef::ClearRange) {
processClearRangeMutation(
arena, tenantMap, mutations[i], i, newMutationSize, idxSplitMutations);
}
}
replaceRawClearRanges(arena, mutations, idxSplitMutations, newMutationSize);
// verify
ASSERT_EQ(mutations.size(), targetMutations.size());
for(int i = 0; i < mutations.size(); ++ i) {
ASSERT_EQ(targetMutations[i].type, mutations[i].type);
ASSERT(targetMutations[i].param1 == mutations[i].param1);
ASSERT(targetMutations[i].param2 == mutations[i].param2);
}
return Void();
}
// Return success and properly split clear range mutations if all tenant check pass. Otherwise, return corresponding
// error
Error validateAndProcessTenantAccess(Arena& arena,
@ -1309,7 +1390,7 @@ Error validateAndProcessTenantAccess(Arena& arena,
bool writeNormalKey = false;
std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations;
size_t newMutationSize = mutations.size();
int newMutationSize = mutations.size();
for (int i = 0; i < mutations.size(); ++i) {
auto& mutation = mutations[i];
Optional<int64_t> tenantId;
@ -1317,14 +1398,8 @@ Error validateAndProcessTenantAccess(Arena& arena,
changeTenant = changeTenant || tenantMapChanging(mutation);
if (mutation.type == MutationRef::ClearRange) {
std::vector<MutationRef> newClears = splitClearRangeByTenant(arena, mutation, pProxyCommitData->tenantMap);
if (newClears.size() == 1) {
mutation = newClears[0];
} else if (newClears.size() > 1) {
CODE_PROBE(true, "Clear Range raw access or cross multiple tenants");
idxSplitMutations.emplace_back(i, newClears);
newMutationSize += newClears.size() - 1;
}
int newClearSize = processClearRangeMutation(
arena, pProxyCommitData->tenantMap, mutation, i, newMutationSize, idxSplitMutations);
if (debugId.present()) {
TraceEvent(SevDebug, "SplitTenantClearRange", pProxyCommitData->dbgid)
@ -1332,7 +1407,7 @@ Error validateAndProcessTenantAccess(Arena& arena,
.detail("Idx", i)
.detail("NewMutationSize", newMutationSize)
.detail("OldMutationSize", mutations.size())
.detail("NewClears", newClears.size());
.detail("NewClears", newClearSize);
}
} else if (!isSystemKey(mutation.param1)) {
validAccess = validTenantAccess(mutation, pProxyCommitData->tenantMap, tenantId);
@ -1368,7 +1443,7 @@ Error validateAndProcessTenantAccess(Arena& arena,
}
}
// replaceRawClearRanges(arena, mutations, idxSplitMutations, newMutationSize);
replaceRawClearRanges(arena, mutations, idxSplitMutations, newMutationSize);
return success();
}
@ -1612,8 +1687,21 @@ ACTOR Future<WriteMutationRefVar> writeMutationEncryptedMutation(CommitBatchCont
TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(dbInfo, *header, BlobCipherMetrics::TLOG));
MutationRef decryptedMutation = encryptedMutation.decrypt(cipherKeys, *arena, BlobCipherMetrics::TLOG);
ASSERT(decryptedMutation.param1 == mutation->param1 && decryptedMutation.param2 == mutation->param2 &&
decryptedMutation.type == mutation->type);
TraceEvent(SevDebug, "MutationEncryptedMutation")
.detail("DeType", decryptedMutation.type)
.detail("EnType", mutation->type)
.detail("DeP1", decryptedMutation.param1)
.detail("DeP2", decryptedMutation.param2)
.detail("EnP1", mutation->param1)
.detail("EnP2", mutation->param2);
ASSERT(decryptedMutation.type == mutation->type);
if (mutation->type != (int)MutationRef::ClearRange) {
ASSERT(decryptedMutation.param1 == mutation->param1);
ASSERT(decryptedMutation.param2 == mutation->param2);
} else {
// TODO: re-encrypt split clear ranges
}
CODE_PROBE(true, "encrypting non-metadata mutations");
self->toCommit.writeTypedMessage(encryptedMutation);
return encryptedMutation;