Merge pull request #7758 from sfc-gh-nwijetunga/nim/refactor-encryption-flag
Refactor use of Encryption Flags
This commit is contained in:
commit
fdf8e431ee
|
@ -26,6 +26,7 @@
|
|||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
|
@ -59,9 +60,10 @@ public:
|
|||
const UID& dbgid_,
|
||||
Arena& arena_,
|
||||
const VectorRef<MutationRef>& mutations_,
|
||||
IKeyValueStore* txnStateStore_)
|
||||
IKeyValueStore* txnStateStore_,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db)
|
||||
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
|
||||
confChange(dummyConfChange) {}
|
||||
confChange(dummyConfChange), dbInfo(db) {}
|
||||
|
||||
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
||||
Arena& arena_,
|
||||
|
@ -82,17 +84,18 @@ public:
|
|||
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
|
||||
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), committedVersion(&proxyCommitData_.committedVersion),
|
||||
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
|
||||
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
|
||||
initialCommit(initialCommit_) {}
|
||||
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap), initialCommit(initialCommit_),
|
||||
dbInfo(proxyCommitData_.db) {}
|
||||
|
||||
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
||||
ResolverData& resolverData_,
|
||||
const VectorRef<MutationRef>& mutations_)
|
||||
const VectorRef<MutationRef>& mutations_,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db)
|
||||
: spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_),
|
||||
txnStateStore(resolverData_.txnStateStore), toCommit(resolverData_.toCommit),
|
||||
confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem), popVersion(resolverData_.popVersion),
|
||||
keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
|
||||
initialCommit(resolverData_.initialCommit), forResolver(true) {}
|
||||
initialCommit(resolverData_.initialCommit), forResolver(true), dbInfo(db) {}
|
||||
|
||||
private:
|
||||
// The following variables are incoming parameters
|
||||
|
@ -139,6 +142,8 @@ private:
|
|||
// true if called from Resolver
|
||||
bool forResolver = false;
|
||||
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
|
||||
|
||||
private:
|
||||
// The following variables are used internally
|
||||
|
||||
|
@ -159,7 +164,7 @@ private:
|
|||
|
||||
private:
|
||||
void writeMutation(const MutationRef& m) {
|
||||
if (forResolver || !SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
if (forResolver || !isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, dbInfo->get().client)) {
|
||||
toCommit->writeTypedMessage(m);
|
||||
} else {
|
||||
ASSERT(cipherKeys != nullptr);
|
||||
|
@ -1281,7 +1286,6 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
Version version,
|
||||
Version popVersion,
|
||||
bool initialCommit) {
|
||||
|
||||
ApplyMetadataMutationsImpl(spanContext,
|
||||
arena,
|
||||
mutations,
|
||||
|
@ -1298,14 +1302,16 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
ResolverData& resolverData,
|
||||
const VectorRef<MutationRef>& mutations) {
|
||||
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply();
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
ApplyMetadataMutationsImpl(spanContext, resolverData, mutations, dbInfo).apply();
|
||||
}
|
||||
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
const UID& dbgid,
|
||||
Arena& arena,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
IKeyValueStore* txnStateStore) {
|
||||
ApplyMetadataMutationsImpl(spanContext, dbgid, arena, mutations, txnStateStore).apply();
|
||||
IKeyValueStore* txnStateStore,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
ApplyMetadataMutationsImpl(spanContext, dbgid, arena, mutations, txnStateStore, dbInfo).apply();
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "fdbclient/Notified.h"
|
||||
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
|
@ -207,9 +208,13 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
|
|||
|
||||
int changeFeedStreamReplyBufferSize = SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES / 2;
|
||||
|
||||
bool isEncryptionEnabled = false;
|
||||
|
||||
BlobWorkerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInf, Database db)
|
||||
: id(id), db(db), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL), tenantData(BGTenantMap(dbInf)), dbInfo(dbInf),
|
||||
initialSnapshotLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM) {}
|
||||
initialSnapshotLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM),
|
||||
isEncryptionEnabled(
|
||||
isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION, db->clientInfo->get())) {}
|
||||
|
||||
bool managerEpochOk(int64_t epoch) {
|
||||
if (epoch < currentManagerEpoch) {
|
||||
|
@ -234,11 +239,6 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
|
|||
};
|
||||
|
||||
namespace {
|
||||
bool isBlobFileEncryptionSupported() {
|
||||
bool supported = SERVER_KNOBS->ENABLE_BLOB_GRANULE_ENCRYPTION && SERVER_KNOBS->BG_METADATA_SOURCE == "tenant";
|
||||
ASSERT((supported && SERVER_KNOBS->ENABLE_ENCRYPTION) || !supported);
|
||||
return supported;
|
||||
}
|
||||
|
||||
Optional<CompressionFilter> getBlobFileCompressFilter() {
|
||||
Optional<CompressionFilter> compFilter;
|
||||
|
@ -630,7 +630,7 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
state Arena arena;
|
||||
|
||||
if (isBlobFileEncryptionSupported()) {
|
||||
if (bwData->isEncryptionEnabled) {
|
||||
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
|
||||
cipherKeysCtx = std::move(ciphKeysCtx);
|
||||
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
|
||||
|
@ -830,7 +830,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
state Arena arena;
|
||||
|
||||
if (isBlobFileEncryptionSupported()) {
|
||||
if (bwData->isEncryptionEnabled) {
|
||||
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
|
||||
cipherKeysCtx = std::move(ciphKeysCtx);
|
||||
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
|
||||
|
@ -1057,7 +1057,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
|||
|
||||
state Optional<BlobGranuleCipherKeysCtx> snapCipherKeysCtx;
|
||||
if (snapshotF.cipherKeysMeta.present()) {
|
||||
ASSERT(isBlobFileEncryptionSupported());
|
||||
ASSERT(bwData->isEncryptionEnabled);
|
||||
|
||||
BlobGranuleCipherKeysCtx keysCtx =
|
||||
wait(getGranuleCipherKeysFromKeysMeta(bwData, snapshotF.cipherKeysMeta.get(), &filenameArena));
|
||||
|
@ -1085,7 +1085,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
|||
deltaF = files.deltaFiles[deltaIdx];
|
||||
|
||||
if (deltaF.cipherKeysMeta.present()) {
|
||||
ASSERT(isBlobFileEncryptionSupported());
|
||||
ASSERT(isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION,
|
||||
bwData->dbInfo->get().client));
|
||||
|
||||
BlobGranuleCipherKeysCtx keysCtx =
|
||||
wait(getGranuleCipherKeysFromKeysMeta(bwData, deltaF.cipherKeysMeta.get(), &filenameArena));
|
||||
|
@ -3372,7 +3373,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
}
|
||||
|
||||
if (encrypted) {
|
||||
ASSERT(isBlobFileEncryptionSupported());
|
||||
ASSERT(bwData->isEncryptionEnabled);
|
||||
ASSERT(!chunk.snapshotFile.get().cipherKeysCtx.present());
|
||||
|
||||
snapCipherKeysCtx = getGranuleCipherKeysFromKeysMetaRef(
|
||||
|
@ -3390,7 +3391,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
}
|
||||
|
||||
if (encrypted) {
|
||||
ASSERT(isBlobFileEncryptionSupported());
|
||||
ASSERT(bwData->isEncryptionEnabled);
|
||||
ASSERT(!chunk.deltaFiles[deltaIdx].cipherKeysCtx.present());
|
||||
|
||||
deltaCipherKeysCtxs.emplace(
|
||||
|
@ -4645,4 +4646,4 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
|||
return Void();
|
||||
}
|
||||
|
||||
// TODO add unit tests for assign/revoke range, especially version ordering
|
||||
// TODO add unit tests for assign/revoke range, especially version ordering
|
|
@ -23,6 +23,7 @@
|
|||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/BackupProgress.actor.h"
|
||||
#include "fdbserver/ClusterRecovery.actor.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
|
||||
|
@ -1058,14 +1059,15 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
|
|||
if (self->txnStateStore)
|
||||
self->txnStateStore->close();
|
||||
self->txnStateLogAdapter = openDiskQueueAdapter(oldLogSystem, myLocality, txsPoppedVersion);
|
||||
self->txnStateStore = keyValueStoreLogSystem(self->txnStateLogAdapter,
|
||||
self->dbInfo,
|
||||
self->dbgid,
|
||||
self->memoryLimit,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION);
|
||||
self->txnStateStore = keyValueStoreLogSystem(
|
||||
self->txnStateLogAdapter,
|
||||
self->dbInfo,
|
||||
self->dbgid,
|
||||
self->memoryLimit,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, self->dbInfo->get().client));
|
||||
|
||||
// Version 0 occurs at the version epoch. The version epoch is the number
|
||||
// of microseconds since the Unix epoch. It can be set through fdbcli.
|
||||
|
@ -1661,7 +1663,8 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
|
|||
self->dbgid,
|
||||
recoveryCommitRequest.arena,
|
||||
tr.mutations.slice(mmApplied, tr.mutations.size()),
|
||||
self->txnStateStore);
|
||||
self->txnStateStore,
|
||||
self->dbInfo);
|
||||
mmApplied = tr.mutations.size();
|
||||
|
||||
tr.read_snapshot = self->recoveryTransactionVersion; // lastEpochEnd would make more sense, but isn't in the initial
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/DataDistributorInterface.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
|
@ -878,7 +879,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
state double resolutionStart = now();
|
||||
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
|
||||
// resolution processing but is still using CPU
|
||||
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
|
||||
state ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
|
||||
std::vector<CommitTransactionRequest>& trs = self->trs;
|
||||
state Span span("MP:getResolution"_loc, self->span.context);
|
||||
|
||||
|
@ -916,7 +917,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
|
||||
// Fetch cipher keys if needed.
|
||||
state Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getCipherKeys;
|
||||
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
if (pProxyCommitData->isEncryptionEnabled) {
|
||||
static std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> defaultDomains = {
|
||||
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME },
|
||||
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME }
|
||||
|
@ -959,8 +960,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
g_traceBatch.addEvent(
|
||||
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
|
||||
}
|
||||
|
||||
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
if (pProxyCommitData->isEncryptionEnabled) {
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys = wait(getCipherKeys);
|
||||
self->cipherKeys = cipherKeys;
|
||||
}
|
||||
|
@ -1104,7 +1104,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
|||
pProxyCommitData->logSystem,
|
||||
trs[t].transaction.mutations,
|
||||
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? nullptr : &self->toCommit,
|
||||
SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION ? &self->cipherKeys : nullptr,
|
||||
pProxyCommitData->isEncryptionEnabled ? &self->cipherKeys : nullptr,
|
||||
self->forceRecovery,
|
||||
self->commitVersion,
|
||||
self->commitVersion + 1,
|
||||
|
@ -1159,7 +1159,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
|||
|
||||
void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef& mutation) {
|
||||
static_assert(TenantInfo::INVALID_TENANT == ENCRYPT_INVALID_DOMAIN_ID);
|
||||
if (!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || tenantId == TenantInfo::INVALID_TENANT) {
|
||||
if (!self->pProxyCommitData->isEncryptionEnabled || tenantId == TenantInfo::INVALID_TENANT) {
|
||||
// TODO(yiwu): In raw access mode, use tenant prefix to figure out tenant id for user data
|
||||
bool isRawAccess = tenantId == TenantInfo::INVALID_TENANT && !isSystemKey(mutation.param1) &&
|
||||
!(mutation.type == MutationRef::ClearRange && isSystemKey(mutation.param2)) &&
|
||||
|
@ -2470,7 +2470,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
// Wait until we can load the "real" logsystem, since we don't support switching them currently
|
||||
while (!(masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
|
||||
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION &&
|
||||
(!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || commitData.db->get().encryptKeyProxy.present()))) {
|
||||
(!isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, db->get().client) ||
|
||||
commitData.db->get().encryptKeyProxy.present()))) {
|
||||
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
|
||||
wait(commitData.db->onChange());
|
||||
}
|
||||
|
@ -2496,8 +2497,15 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
|
||||
commitData.logAdapter =
|
||||
new LogSystemDiskQueueAdapter(commitData.logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
|
||||
commitData.txnStateStore = keyValueStoreLogSystem(
|
||||
commitData.logAdapter, commitData.db, proxy.id(), 2e9, true, true, true, SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION);
|
||||
commitData.txnStateStore =
|
||||
keyValueStoreLogSystem(commitData.logAdapter,
|
||||
commitData.db,
|
||||
proxy.id(),
|
||||
2e9,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, db->get().client));
|
||||
createWhitelistBinPathVec(whitelistBinPaths, commitData.whitelistedBinPathVec);
|
||||
|
||||
commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
|
@ -193,7 +194,9 @@ struct Resolver : ReferenceCounted<Resolver> {
|
|||
};
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatchRequest req) {
|
||||
ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
|
||||
ResolveTransactionBatchRequest req,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
state Optional<UID> debugID;
|
||||
state Span span("R:resolveBatch"_loc, req.spanContext);
|
||||
|
||||
|
@ -348,7 +351,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
|
|||
SpanContext spanContext =
|
||||
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
|
||||
|
||||
applyMetadataMutations(spanContext, *resolverData, req.transactions[t].mutations);
|
||||
applyMetadataMutations(spanContext, *resolverData, req.transactions[t].mutations, db);
|
||||
}
|
||||
CODE_PROBE(self->forceRecovery, "Resolver detects forced recovery");
|
||||
}
|
||||
|
@ -503,7 +506,8 @@ struct TransactionStateResolveContext {
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext) {
|
||||
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
state KeyRange txnKeys = allKeys;
|
||||
state std::map<Tag, UID> tag_uid;
|
||||
|
||||
|
@ -570,8 +574,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
|||
bool confChanges; // Ignore configuration changes for initial commits.
|
||||
ResolverData resolverData(
|
||||
pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges);
|
||||
|
||||
applyMetadataMutations(SpanContext(), resolverData, mutations);
|
||||
applyMetadataMutations(SpanContext(), resolverData, mutations, db);
|
||||
} // loop
|
||||
|
||||
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
||||
|
@ -584,7 +587,8 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
|||
}
|
||||
|
||||
ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveContext* pContext,
|
||||
TxnStateRequest request) {
|
||||
TxnStateRequest request,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
ASSERT(pContext->pResolverData.getPtr() != nullptr);
|
||||
ASSERT(pContext->pActors != nullptr);
|
||||
|
||||
|
@ -611,7 +615,7 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
|
|||
if (pContext->receivedSequences.size() == pContext->maxSequence) {
|
||||
// Received all components of the txnStateRequest
|
||||
ASSERT(!pContext->processed);
|
||||
wait(processCompleteTransactionStateRequest(pContext));
|
||||
wait(processCompleteTransactionStateRequest(pContext, db));
|
||||
pContext->processed = true;
|
||||
}
|
||||
|
||||
|
@ -649,8 +653,15 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
|
|||
state TransactionStateResolveContext transactionStateResolveContext;
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
self->logAdapter = new LogSystemDiskQueueAdapter(self->logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
|
||||
self->txnStateStore = keyValueStoreLogSystem(
|
||||
self->logAdapter, db, resolver.id(), 2e9, true, true, true, SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION);
|
||||
self->txnStateStore =
|
||||
keyValueStoreLogSystem(self->logAdapter,
|
||||
db,
|
||||
resolver.id(),
|
||||
2e9,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, db->get().client));
|
||||
|
||||
// wait for txnStateStore recovery
|
||||
wait(success(self->txnStateStore->readValue(StringRef())));
|
||||
|
@ -667,7 +678,7 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
|
|||
|
||||
loop choose {
|
||||
when(ResolveTransactionBatchRequest batch = waitNext(resolver.resolve.getFuture())) {
|
||||
actors.add(resolveBatch(self, batch));
|
||||
actors.add(resolveBatch(self, batch, db));
|
||||
}
|
||||
when(ResolutionMetricsRequest req = waitNext(resolver.metrics.getFuture())) {
|
||||
++self->metricsRequests;
|
||||
|
@ -690,7 +701,7 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
|
|||
}
|
||||
when(TxnStateRequest request = waitNext(resolver.txnState.getFuture())) {
|
||||
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
|
||||
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request));
|
||||
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request, db));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
|
|
@ -103,7 +103,8 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
const UID& dbgid,
|
||||
Arena& arena,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
IKeyValueStore* txnStateStore);
|
||||
IKeyValueStore* txnStateStore,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
||||
inline bool isSystemKey(KeyRef key) {
|
||||
return key.size() && key[0] == systemKeys.begin[0];
|
||||
|
@ -144,6 +145,7 @@ inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
|
|||
// Resolver's version
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
ResolverData& resolverData,
|
||||
const VectorRef<MutationRef>& mutations);
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -3314,6 +3314,7 @@ public:
|
|||
serverInfo.masterLifetime.ccID = id;
|
||||
serverInfo.clusterInterface = ccInterface;
|
||||
serverInfo.myLocality = locality;
|
||||
serverInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
|
||||
db.serverInfo->set(serverInfo);
|
||||
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* EncryptionOpUtils.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_ENCRYPTION_OPS_UTIL_H
|
||||
#define FDBSERVER_ENCRYPTION_OPS_UTIL_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
|
||||
typedef enum { TLOG_ENCRYPTION = 0, STORAGE_SERVER_ENCRYPTION = 1, BLOB_GRANULE_ENCRYPTION = 2 } EncryptOperationType;
|
||||
|
||||
inline bool isEncryptionOpSupported(EncryptOperationType operation_type, ClientDBInfo dbInfo) {
|
||||
if (!dbInfo.isEncryptionEnabled) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (operation_type == TLOG_ENCRYPTION) {
|
||||
return SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION;
|
||||
} else if (operation_type == BLOB_GRANULE_ENCRYPTION) {
|
||||
bool supported = SERVER_KNOBS->ENABLE_BLOB_GRANULE_ENCRYPTION && SERVER_KNOBS->BG_METADATA_SOURCE == "tenant";
|
||||
ASSERT((supported && SERVER_KNOBS->ENABLE_ENCRYPTION) || !supported);
|
||||
return supported;
|
||||
} else {
|
||||
// TODO (Nim): Add once storage server encryption knob is created
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
#endif // FDBSERVER_ENCRYPTION_OPS_UTIL_H
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#pragma once
|
||||
#include "fdbserver/EncryptionOpsUtils.h"
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H)
|
||||
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H
|
||||
#include "fdbserver/ProxyCommitData.actor.g.h"
|
||||
|
@ -231,6 +232,8 @@ struct ProxyCommitData {
|
|||
double lastResolverReset;
|
||||
int localTLogCount = -1;
|
||||
|
||||
bool isEncryptionEnabled = false;
|
||||
|
||||
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
|
||||
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
|
||||
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
|
||||
|
@ -299,7 +302,8 @@ struct ProxyCommitData {
|
|||
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
|
||||
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), lastTxsPop(0), popRemoteTxs(false),
|
||||
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0),
|
||||
lastMasterReset(now()), lastResolverReset(now()) {
|
||||
lastMasterReset(now()), lastResolverReset(now()),
|
||||
isEncryptionEnabled(isEncryptionOpSupported(EncryptOperationType::TLOG_ENCRYPTION, db->get().client)) {
|
||||
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -3312,7 +3312,9 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
|
|||
auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>();
|
||||
auto asyncPriorityInfo =
|
||||
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
|
||||
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
|
||||
auto serverDBInfo = ServerDBInfo();
|
||||
serverDBInfo.client.isEncryptionEnabled = SERVER_KNOBS->ENABLE_ENCRYPTION;
|
||||
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>(serverDBInfo);
|
||||
|
||||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
|
||||
"MonitorAndWriteCCPriorityInfo"));
|
||||
|
|
Loading…
Reference in New Issue