Commit Proxy Encryption Code Probes (#8618)
* add commit proxy encryption code probes * fix comment * address pr comments * address pr comments * address pr comments * address pr comments * Trigger Build
This commit is contained in:
parent
d385fc5af1
commit
24ce8c0fd0
|
@ -232,10 +232,10 @@ void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
|
|||
.detail("ExpectedHeaderSalt", header.cipherHeaderDetails.salt);
|
||||
throw encrypt_header_metadata_mismatch();
|
||||
}
|
||||
// Validate encryption header 'cipherHeader' details sanity
|
||||
if (!(header.cipherHeaderDetails.baseCipherId == eKeys.headerCipherKey->getBaseCipherId() &&
|
||||
header.cipherHeaderDetails.encryptDomainId == eKeys.headerCipherKey->getDomainId() &&
|
||||
header.cipherHeaderDetails.salt == eKeys.headerCipherKey->getSalt())) {
|
||||
// Validate encryption header 'cipherText' details sanity
|
||||
if (!(header.cipherTextDetails.baseCipherId == eKeys.textCipherKey->getBaseCipherId() &&
|
||||
header.cipherTextDetails.encryptDomainId == eKeys.textCipherKey->getDomainId() &&
|
||||
header.cipherTextDetails.salt == eKeys.textCipherKey->getSalt())) {
|
||||
TraceEvent(SevError, "EncryptionHeader_CipherTextMismatch")
|
||||
.detail("TextDomainId", eKeys.textCipherKey->getDomainId())
|
||||
.detail("ExpectedTextDomainId", header.cipherTextDetails.encryptDomainId)
|
||||
|
|
|
@ -944,9 +944,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ENCRYPTION_MODE, "AES-256-CTR" );
|
||||
init( SIM_KMS_MAX_KEYS, 4096 );
|
||||
init( ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH, 100000 );
|
||||
init( ENABLE_TLOG_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION && !PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ) ENABLE_TLOG_ENCRYPTION = true;
|
||||
init( ENABLE_STORAGE_SERVER_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY) ENABLE_STORAGE_SERVER_ENCRYPTION = !ENABLE_STORAGE_SERVER_ENCRYPTION;
|
||||
init( ENABLE_BLOB_GRANULE_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY) ENABLE_BLOB_GRANULE_ENCRYPTION = !ENABLE_BLOB_GRANULE_ENCRYPTION;
|
||||
init( ENABLE_TLOG_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION ) ENABLE_TLOG_ENCRYPTION = false;
|
||||
init( ENABLE_STORAGE_SERVER_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION) ENABLE_STORAGE_SERVER_ENCRYPTION = false;
|
||||
init( ENABLE_BLOB_GRANULE_ENCRYPTION, ENABLE_ENCRYPTION ); if ( randomize && BUGGIFY && ENABLE_ENCRYPTION) ENABLE_BLOB_GRANULE_ENCRYPTION = false;
|
||||
|
||||
// encrypt key proxy
|
||||
init( ENABLE_BLOB_GRANULE_COMPRESSION, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_COMPRESSION = deterministicRandom()->coinflip(); }
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "flow/Error.h"
|
||||
|
@ -87,9 +88,10 @@ public:
|
|||
|
||||
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
||||
ResolverData& resolverData_,
|
||||
const VectorRef<MutationRef>& mutations_)
|
||||
const VectorRef<MutationRef>& mutations_,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys_)
|
||||
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
|
||||
txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
|
||||
cipherKeys(cipherKeys_), txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
|
||||
confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem), popVersion(resolverData_.popVersion),
|
||||
keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
|
||||
initialCommit(resolverData_.initialCommit), forResolver(true) {}
|
||||
|
@ -160,11 +162,13 @@ private:
|
|||
|
||||
private:
|
||||
void writeMutation(const MutationRef& m) {
|
||||
if (forResolver || !isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||
if (!isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||
toCommit->writeTypedMessage(m);
|
||||
} else {
|
||||
ASSERT(cipherKeys != nullptr);
|
||||
Arena arena;
|
||||
CODE_PROBE(!forResolver, "encrypting metadata mutations");
|
||||
CODE_PROBE(forResolver, "encrypting resolver mutations");
|
||||
toCommit->writeTypedMessage(m.encryptMetadata(*cipherKeys, arena, BlobCipherMetrics::TLOG));
|
||||
}
|
||||
}
|
||||
|
@ -1343,8 +1347,9 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
ResolverData& resolverData,
|
||||
const VectorRef<MutationRef>& mutations) {
|
||||
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply();
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys) {
|
||||
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations, pCipherKeys).apply();
|
||||
}
|
||||
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
|
|
|
@ -1266,8 +1266,14 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
|
|||
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||
state EncryptCipherDomainId domainId = tenantId;
|
||||
state MutationRef encryptedMutation;
|
||||
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::DISABLED,
|
||||
"using disabled tenant mode");
|
||||
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::OPTIONAL_TENANT,
|
||||
"using optional tenant mode");
|
||||
CODE_PROBE(self->pProxyCommitData->db->get().client.tenantMode == TenantMode::REQUIRED,
|
||||
"using required tenant mode");
|
||||
|
||||
if (encryptedMutationOpt->present()) {
|
||||
if (encryptedMutationOpt && encryptedMutationOpt->present()) {
|
||||
CODE_PROBE(true, "using already encrypted mutation");
|
||||
encryptedMutation = encryptedMutationOpt->get();
|
||||
ASSERT(encryptedMutation.isEncrypted());
|
||||
|
@ -1299,6 +1305,8 @@ ACTOR Future<MutationRef> writeMutation(CommitBatchContext* self,
|
|||
ASSERT_NE(domainId, INVALID_ENCRYPT_DOMAIN_ID);
|
||||
encryptedMutation = mutation->encrypt(self->cipherKeys, domainId, *arena, BlobCipherMetrics::TLOG);
|
||||
}
|
||||
ASSERT(encryptedMutation.isEncrypted());
|
||||
CODE_PROBE(true, "encrypting non-metadata mutations");
|
||||
self->toCommit.writeTypedMessage(encryptedMutation);
|
||||
return encryptedMutation;
|
||||
} else {
|
||||
|
@ -1473,12 +1481,12 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
if (!hasCandidateBackupKeys) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (m.type != MutationRef::Type::ClearRange) {
|
||||
// Add the mutation to the relevant backup tag
|
||||
for (auto backupName : pProxyCommitData->vecBackupKeys[m.param1]) {
|
||||
// If encryption is enabled make sure the mutation we are writing is also encrypted
|
||||
ASSERT(!self->pProxyCommitData->isEncryptionEnabled || writtenMutation.isEncrypted());
|
||||
CODE_PROBE(writtenMutation.isEncrypted(), "using encrypted backup mutation");
|
||||
self->logRangeMutations[backupName].push_back_deep(self->logRangeMutationsArena, writtenMutation);
|
||||
}
|
||||
} else {
|
||||
|
@ -1500,6 +1508,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
// TODO (Nim): Currently clear ranges are encrypted using the default encryption key, this must be
|
||||
// changed to account for clear ranges which span tenant boundaries
|
||||
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||
CODE_PROBE(true, "encrypting clear range backup mutation");
|
||||
if (backupMutation.param1 == m.param1 && backupMutation.param2 == m.param2 &&
|
||||
encryptedMutation.present()) {
|
||||
backupMutation = encryptedMutation.get();
|
||||
|
@ -1510,6 +1519,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
backupMutation =
|
||||
backupMutation.encrypt(self->cipherKeys, domainId, arena, BlobCipherMetrics::BACKUP);
|
||||
}
|
||||
ASSERT(backupMutation.isEncrypted());
|
||||
}
|
||||
|
||||
// Add the mutation to the relevant backup tag
|
||||
|
@ -1613,14 +1623,25 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
|||
idempotencyIdSet.param2 = kv.value;
|
||||
auto& tags = pProxyCommitData->tagsForKey(kv.key);
|
||||
self->toCommit.addTags(tags);
|
||||
self->toCommit.writeTypedMessage(idempotencyIdSet);
|
||||
if (self->pProxyCommitData->isEncryptionEnabled) {
|
||||
CODE_PROBE(true, "encrypting idempotency mutation");
|
||||
std::pair<EncryptCipherDomainName, EncryptCipherDomainId> p =
|
||||
getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet);
|
||||
Arena arena;
|
||||
MutationRef encryptedMutation = idempotencyIdSet.encrypt(
|
||||
self->cipherKeys, p.second, arena, BlobCipherMetrics::TLOG);
|
||||
self->toCommit.writeTypedMessage(encryptedMutation);
|
||||
} else {
|
||||
self->toCommit.writeTypedMessage(idempotencyIdSet);
|
||||
}
|
||||
});
|
||||
|
||||
for (const auto& m : pProxyCommitData->idempotencyClears) {
|
||||
state int i = 0;
|
||||
for (i = 0; i < pProxyCommitData->idempotencyClears.size(); i++) {
|
||||
MutationRef& m = pProxyCommitData->idempotencyClears[i];
|
||||
auto& tags = pProxyCommitData->tagsForKey(m.param1);
|
||||
self->toCommit.addTags(tags);
|
||||
// TODO(nwijetunga): Encrypt these mutations
|
||||
self->toCommit.writeTypedMessage(m);
|
||||
Arena arena;
|
||||
wait(success(writeMutation(self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &m, nullptr, &arena)));
|
||||
}
|
||||
pProxyCommitData->idempotencyClears = Standalone<VectorRef<MutationRef>>();
|
||||
|
||||
|
|
|
@ -205,6 +205,17 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress();
|
||||
state ProxyRequestsInfo& proxyInfo = self->proxyInfoMap[proxyAddress];
|
||||
|
||||
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||
static const std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> metadataDomains = {
|
||||
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME },
|
||||
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_ENCRYPT_HEADER_DOMAIN_NAME }
|
||||
};
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks =
|
||||
wait(getLatestEncryptCipherKeys(db, metadataDomains, BlobCipherMetrics::TLOG));
|
||||
cipherKeys = cks;
|
||||
}
|
||||
|
||||
++self->resolveBatchIn;
|
||||
|
||||
if (req.debugID.present()) {
|
||||
|
@ -351,7 +362,11 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
|||
SpanContext spanContext =
|
||||
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
|
||||
|
||||
applyMetadataMutations(spanContext, *resolverData, req.transactions[t].mutations);
|
||||
applyMetadataMutations(spanContext,
|
||||
*resolverData,
|
||||
req.transactions[t].mutations,
|
||||
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) ? &cipherKeys
|
||||
: nullptr);
|
||||
}
|
||||
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery");
|
||||
}
|
||||
|
@ -506,8 +521,10 @@ struct TransactionStateResolveContext {
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
ACTOR Future<Void> processCompleteTransactionStateRequest(
|
||||
TransactionStateResolveContext* pContext,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys) {
|
||||
state KeyRange txnKeys = allKeys;
|
||||
state std::map<Tag, UID> tag_uid;
|
||||
|
||||
|
@ -574,7 +591,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
|||
bool confChanges; // Ignore configuration changes for initial commits.
|
||||
ResolverData resolverData(
|
||||
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
|
||||
applyMetadataMutations(SpanContext(), resolverData, mutations);
|
||||
applyMetadataMutations(SpanContext(), resolverData, mutations, cipherKeys);
|
||||
} // loop
|
||||
|
||||
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
||||
|
@ -615,7 +632,18 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
|
|||
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
||||
// Received all components of the txnStateRequest
|
||||
ASSERT(!pContext->processed);
|
||||
wait(processCompleteTransactionStateRequest(pContext, db));
|
||||
state std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION)) {
|
||||
static const std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> metadataDomains = {
|
||||
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME },
|
||||
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_ENCRYPT_HEADER_DOMAIN_NAME }
|
||||
};
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cks =
|
||||
wait(getLatestEncryptCipherKeys(db, metadataDomains, BlobCipherMetrics::TLOG));
|
||||
cipherKeys = cks;
|
||||
}
|
||||
wait(processCompleteTransactionStateRequest(
|
||||
pContext, db, isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) ? &cipherKeys : nullptr));
|
||||
pContext->processed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -144,6 +144,7 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
|
|||
// Resolver's version
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
ResolverData& resolverData,
|
||||
const VectorRef<MutationRef>& mutations);
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -8567,6 +8567,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
} else {
|
||||
MutationRef msg;
|
||||
cloneReader >> msg;
|
||||
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
|
||||
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
|
||||
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (msg.isEncrypted()) {
|
||||
if (!cipherKeys.present()) {
|
||||
const BlobCipherEncryptHeader* header = msg.encryptionHeader();
|
||||
|
@ -8720,6 +8725,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
} else {
|
||||
MutationRef msg;
|
||||
rd >> msg;
|
||||
if (isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION) && !msg.isEncrypted() &&
|
||||
!(isSingleKeyMutation((MutationRef::Type)msg.type) &&
|
||||
(backupLogKeys.contains(msg.param1) || (applyLogKeys.contains(msg.param1))))) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (msg.isEncrypted()) {
|
||||
ASSERT(cipherKeys.present());
|
||||
msg = msg.decrypt(cipherKeys.get(), rd.arena(), BlobCipherMetrics::TLOG);
|
||||
|
|
Loading…
Reference in New Issue