extract shouldBackup method

This commit is contained in:
Xiaoxi Wang 2023-02-08 13:25:46 -08:00
parent 145b81170c
commit e28fc664d0
1 changed files with 34 additions and 34 deletions

View File

@ -1359,17 +1359,16 @@ TEST_CASE("/CommitProxy/SplitRange/replaceRawClearRanges") {
// [s, 0], [cr, t0a, t0b], [s, 1], [c, t1a, randomSys], [s, 3] // [s, 0], [cr, t0a, t0b], [s, 1], [c, t1a, randomSys], [s, 3]
std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations; std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations;
int newMutationSize = mutations.size(); int newMutationSize = mutations.size();
for(int i = 0; i < mutations.size(); ++ i) { for (int i = 0; i < mutations.size(); ++i) {
if(mutations[i].type == MutationRef::ClearRange) { if (mutations[i].type == MutationRef::ClearRange) {
processClearRangeMutation( processClearRangeMutation(arena, tenantMap, mutations[i], i, newMutationSize, idxSplitMutations);
arena, tenantMap, mutations[i], i, newMutationSize, idxSplitMutations);
} }
} }
replaceRawClearRanges(arena, mutations, idxSplitMutations, newMutationSize); replaceRawClearRanges(arena, mutations, idxSplitMutations, newMutationSize);
// verify // verify
ASSERT_EQ(mutations.size(), targetMutations.size()); ASSERT_EQ(mutations.size(), targetMutations.size());
for(int i = 0; i < mutations.size(); ++ i) { for (int i = 0; i < mutations.size(); ++i) {
ASSERT_EQ(targetMutations[i].type, mutations[i].type); ASSERT_EQ(targetMutations[i].type, mutations[i].type);
ASSERT(targetMutations[i].param1 == mutations[i].param1); ASSERT(targetMutations[i].param1 == mutations[i].param1);
ASSERT(targetMutations[i].param2 == mutations[i].param2); ASSERT(targetMutations[i].param2 == mutations[i].param2);
@ -1377,8 +1376,6 @@ TEST_CASE("/CommitProxy/SplitRange/replaceRawClearRanges") {
return Void(); return Void();
} }
// Return success and properly split clear range mutations if all tenant check pass. Otherwise, return corresponding // Return success and properly split clear range mutations if all tenant check pass. Otherwise, return corresponding
// error // error
Error validateAndProcessTenantAccess(Arena& arena, Error validateAndProcessTenantAccess(Arena& arena,
@ -1694,13 +1691,10 @@ ACTOR Future<WriteMutationRefVar> writeMutationEncryptedMutation(CommitBatchCont
.detail("DeP2", decryptedMutation.param2) .detail("DeP2", decryptedMutation.param2)
.detail("EnP1", mutation->param1) .detail("EnP1", mutation->param1)
.detail("EnP2", mutation->param2); .detail("EnP2", mutation->param2);
ASSERT(decryptedMutation.type == mutation->type); ASSERT(decryptedMutation.type == mutation->type);
if (mutation->type != (int)MutationRef::ClearRange) { ASSERT(decryptedMutation.param1 == mutation->param1);
ASSERT(decryptedMutation.param1 == mutation->param1); ASSERT(decryptedMutation.param2 == mutation->param2);
ASSERT(decryptedMutation.param2 == mutation->param2);
} else {
// TODO: re-encrypt split clear ranges
}
CODE_PROBE(true, "encrypting non-metadata mutations"); CODE_PROBE(true, "encrypting non-metadata mutations");
self->toCommit.writeTypedMessage(encryptedMutation); self->toCommit.writeTypedMessage(encryptedMutation);
@ -1790,6 +1784,23 @@ Future<WriteMutationRefVar> writeMutation(CommitBatchContext* self,
} }
} }
// Check whether the mutation intersects any legal backup ranges
// If so, it will be clamped to the intersecting range(s) later
inline bool shouldBackup(MutationRef const& m) {
if (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey) {
return true;
} else if (m.type != MutationRef::Type::ClearRange) {
return systemBackupMutationMask().rangeContaining(m.param1).value();
} else {
for (auto& r : systemBackupMutationMask().intersectingRanges(KeyRangeRef(m.param1, m.param2))) {
if (r->value()) {
return true;
}
}
}
return false;
}
/// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' /// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers'
/// tags /// tags
ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) { ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
@ -1808,7 +1819,11 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations; state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
state VectorRef<Optional<MutationRef>>* encryptedMutations = state VectorRef<Optional<MutationRef>>* encryptedMutations =
&trs[self->transactionNum].transaction.encryptedMutations; &trs[self->transactionNum].transaction.encryptedMutations;
ASSERT(encryptedMutations->size() == 0 || encryptedMutations->size() == pMutations->size());
if (!encryptedMutations->empty()) {
ASSERT_EQ(encryptedMutations->size(), pMutations->size());
}
state int64_t encryptDomain = trs[self->transactionNum].tenantInfo.tenantId; state int64_t encryptDomain = trs[self->transactionNum].tenantInfo.tenantId;
if (self->pProxyCommitData->encryptMode.mode == EncryptionAtRestMode::CLUSTER_AWARE && if (self->pProxyCommitData->encryptMode.mode == EncryptionAtRestMode::CLUSTER_AWARE &&
encryptDomain != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) { encryptDomain != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) {
@ -1944,29 +1959,14 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
UNREACHABLE(); UNREACHABLE();
} }
if (pProxyCommitData->vecBackupKeys.size() <= 1) { if (pProxyCommitData->vecBackupKeys.size() <= 1 || !shouldBackup(m)) {
continue; continue;
} }
// Check whether the mutation intersects any legal backup ranges // In required tenant mode, the clear ranges are already split by tenant
// If so, it will be clamped to the intersecting range(s) later TraceEvent(SevDebug, "BackupMutationLog", pProxyCommitData->dbgid)
bool hasCandidateBackupKeys = false; .detail("TenantMode", (int)pProxyCommitData->getTenantMode());
if (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey) { if (m.type != MutationRef::Type::ClearRange || pProxyCommitData->getTenantMode() == TenantMode::REQUIRED) {
hasCandidateBackupKeys = true;
} else if (m.type != MutationRef::Type::ClearRange) {
hasCandidateBackupKeys = systemBackupMutationMask().rangeContaining(m.param1).value();
} else {
for (auto& r : systemBackupMutationMask().intersectingRanges(KeyRangeRef(m.param1, m.param2))) {
if (r->value()) {
hasCandidateBackupKeys = true;
break;
}
}
}
if (!hasCandidateBackupKeys) {
continue;
}
if (m.type != MutationRef::Type::ClearRange) {
// Add the mutation to the relevant backup tag // Add the mutation to the relevant backup tag
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) { for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
// If encryption is enabled make sure the mutation we are writing is also encrypted // If encryption is enabled make sure the mutation we are writing is also encrypted