diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index eae5d8e9d8..a350ae632e 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -870,6 +870,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually // BlobGranuleVerify* simulation tests use blobRangeKeys, BlobGranuleCorrectness* use tenant, default in real clusters is tenant init( BG_RANGE_SOURCE, "tenant" ); + // BlobGranuleVerify* simulation tests use knobs, BlobGranuleCorrectness* use tenant, default in real clusters is knobs + init( BG_METADATA_SOURCE, "knobs" ); init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards ) BG_SNAPSHOT_FILE_TARGET_BYTES = 100000; else if (simulationMediumShards || (randomize && BUGGIFY) ) BG_SNAPSHOT_FILE_TARGET_BYTES = 1000000; init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); @@ -893,6 +895,12 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); + // Blob Metadata + init( BLOB_METADATA_CACHE_TTL, isSimulated ? 120 : 24 * 60 * 60 ); + if ( randomize && BUGGIFY) { BLOB_METADATA_CACHE_TTL = deterministicRandom()->randomInt(50, 100); } + init( BLOB_METADATA_REFRESH_INTERVAL, isSimulated ? 60 : 12 * 60 * 60 ); + if ( randomize && BUGGIFY) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(20, 40); } + // HTTP KMS Connector init( REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE, "file"); init( REST_KMS_CONNECTOR_VALIDATION_TOKEN_MODE, "file"); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index ee5600bb4b..68884fd631 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -836,6 +836,9 @@ public: // whether to use blobRangeKeys or tenants for blob granule range sources std::string BG_RANGE_SOURCE; + // Whether to use knobs or EKP for blob metadata and credentials + std::string BG_METADATA_SOURCE; + int BG_SNAPSHOT_FILE_TARGET_BYTES; int BG_DELTA_FILE_TARGET_BYTES; int BG_DELTA_BYTES_BEFORE_COMPACT; @@ -856,6 +859,10 @@ public: double BGCC_TIMEOUT; double BGCC_MIN_INTERVAL; + // Blob metadata + int64_t BLOB_METADATA_CACHE_TTL; + int64_t BLOB_METADATA_REFRESH_INTERVAL; + // HTTP KMS Connector std::string REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE; std::string REST_KMS_CONNECTOR_DISCOVER_KMS_URL_FILE; diff --git a/fdbserver/BlobConnectionProvider.cpp b/fdbserver/BlobConnectionProvider.cpp new file mode 100644 index 0000000000..bc2abbf2f9 --- /dev/null +++ b/fdbserver/BlobConnectionProvider.cpp @@ -0,0 +1,113 @@ +/* + * BlobConnectionProvider.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "flow/IRandom.h" +#include "fdbserver/BlobConnectionProvider.h" + +struct SingleBlobConnectionProvider : BlobConnectionProvider { +public: + std::pair, std::string> createForWrite(std::string newFileName) { + return std::pair(conn, newFileName); + } + + Reference getForRead(std::string filePath) { return conn; } + + SingleBlobConnectionProvider(std::string url) { conn = BackupContainerFileSystem::openContainerFS(url, {}, {}); } + +private: + Reference conn; +}; + +struct PartitionedBlobConnectionProvider : BlobConnectionProvider { + std::pair, std::string> createForWrite(std::string newFileName) { + // choose a partition randomly, to distribute load + int writePartition = deterministicRandom()->randomInt(0, metadata.partitions.size()); + return std::pair(conn, metadata.partitions[writePartition].toString() + newFileName); + } + + Reference getForRead(std::string filePath) { return conn; } + + PartitionedBlobConnectionProvider(const Standalone metadata) : metadata(metadata) { + ASSERT(metadata.base.present()); + ASSERT(metadata.partitions.size() >= 2); + conn = BackupContainerFileSystem::openContainerFS(metadata.base.get().toString(), {}, {}); + for (auto& it : metadata.partitions) { + // these should be suffixes, not whole blob urls + ASSERT(it.toString().find("://") == std::string::npos); + } + } + +private: + Standalone metadata; + Reference conn; +}; + +// Could always include number of partitions as validation in sanity check or something? +// Ex: partition_numPartitions/filename instead of partition/filename +struct StorageLocationBlobConnectionProvider : BlobConnectionProvider { + std::pair, std::string> createForWrite(std::string newFileName) { + // choose a partition randomly, to distribute load + int writePartition = deterministicRandom()->randomInt(0, partitions.size()); + // include partition information in the filename + return std::pair(partitions[writePartition], std::to_string(writePartition) + "/" + newFileName); + } + + Reference getForRead(std::string filePath) { + size_t slash = filePath.find("/"); + ASSERT(slash != std::string::npos); + int partition = stoi(filePath.substr(0, slash)); + ASSERT(partition >= 0); + ASSERT(partition < partitions.size()); + return partitions[partition]; + } + + StorageLocationBlobConnectionProvider(const Standalone metadata) { + ASSERT(!metadata.base.present()); + ASSERT(metadata.partitions.size() >= 2); + for (auto& it : metadata.partitions) { + // these should be whole blob urls + ASSERT(it.toString().find("://") != std::string::npos); + partitions.push_back(BackupContainerFileSystem::openContainerFS(it.toString(), {}, {})); + } + } + +private: + std::vector> partitions; +}; + +Reference BlobConnectionProvider::newBlobConnectionProvider(std::string blobUrl) { + return makeReference(blobUrl); +} + +Reference BlobConnectionProvider::newBlobConnectionProvider( + Standalone blobMetadata) { + if (blobMetadata.partitions.empty()) { + return makeReference(blobMetadata.base.get().toString()); + } else { + ASSERT(blobMetadata.partitions.size() >= 2); + if (blobMetadata.base.present()) { + return makeReference(blobMetadata); + } else { + return makeReference(blobMetadata); + } + } +} \ No newline at end of file diff --git a/fdbserver/BlobConnectionProvider.h b/fdbserver/BlobConnectionProvider.h new file mode 100644 index 0000000000..6c97f260a2 --- /dev/null +++ b/fdbserver/BlobConnectionProvider.h @@ -0,0 +1,43 @@ +/* + * BlobConnectionProvider.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BLOB_CONNECTION_PROVIDER_H +#define BLOB_CONNECTION_PROVIDER_H + +#include "fdbclient/BackupContainerFileSystem.h" +#include "fdbserver/BlobMetadataUtils.h" + +struct BlobConnectionProvider : NonCopyable, ReferenceCounted { + // chooses a partition and prepends the necessary prefix to the filename (if necessary) for writing a file, and + // returns it and the backup container + virtual std::pair, std::string> createForWrite(std::string newFileName) = 0; + + // given a file, return the backup container and full file path necessary to access it. File path should be + // something returned from createForWrite + virtual Reference getForRead(std::string filePath) = 0; + + virtual ~BlobConnectionProvider() {} + + static Reference newBlobConnectionProvider(std::string blobUrl); + + static Reference newBlobConnectionProvider(Standalone blobMetadata); +}; + +#endif \ No newline at end of file diff --git a/fdbserver/BlobMetadataUtils.h b/fdbserver/BlobMetadataUtils.h new file mode 100644 index 0000000000..d9ed6dd897 --- /dev/null +++ b/fdbserver/BlobMetadataUtils.h @@ -0,0 +1,64 @@ +/* + * BlobMetadataUtils.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BLOB_METADATA_UTILS_H +#define BLOB_METADATA_UTILS_H + +#include "flow/Arena.h" +#include "flow/FileIdentifier.h" + +using BlobMetadataDomainId = int64_t; + +/* + * There are 3 cases for blob metadata. + * 1. A non-partitioned blob store. baseUrl is set, and partitions is empty. Files will be written with this prefix. + * 2. A sub-path partitioned blob store. baseUrl is set, and partitions contains 2 or more sub-paths. Files will be + * written with a prefix of the base url and then one of the sub-paths. + * 3. A separate-storage-location partitioned blob store. baseUrl is NOT set, and partitions contains 2 or more full + * fdb blob urls. Files will be written with one of the partition prefixes. + * Partitioning is desired in blob stores such as s3 that can run into metadata hotspotting issues. + */ +struct BlobMetadataDetailsRef { + constexpr static FileIdentifier file_identifier = 6685526; + BlobMetadataDomainId domainId; + Optional base; + VectorRef partitions; + + BlobMetadataDetailsRef() {} + BlobMetadataDetailsRef(Arena& arena, const BlobMetadataDetailsRef& from) + : domainId(from.domainId), partitions(arena, from.partitions) { + if (from.base.present()) { + base = StringRef(arena, from.base.get()); + } + } + explicit BlobMetadataDetailsRef(BlobMetadataDomainId domainId, + Optional base, + VectorRef partitions) + : domainId(domainId), base(base), partitions(partitions) {} + + int expectedSize() const { return sizeof(BlobMetadataDetailsRef) + partitions.expectedSize(); } + + template + void serialize(Ar& ar) { + serializer(ar, domainId, base, partitions); + } +}; + +#endif \ No newline at end of file diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index ef41ad82b8..ccd769e396 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -36,8 +36,10 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" #include "fdbclient/Tenant.h" +#include "fdbserver/BlobMetadataUtils.h" #include "fdbserver/Knobs.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" +#include "fdbserver/BlobConnectionProvider.h" #include "fdbserver/MutationTracking.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/ServerDBInfo.h" @@ -161,6 +163,8 @@ struct GranuleTenantData : NonCopyable, ReferenceCounted { TenantName name; TenantMapEntry entry; // TODO add other useful stuff like per-tenant blob connection, if necessary + Reference conn; + Promise connLoaded; GranuleTenantData() {} GranuleTenantData(TenantName name, TenantMapEntry entry) : name(name), entry(entry) {} @@ -3111,6 +3115,72 @@ ACTOR Future monitorRemoval(Reference bwData) { } } +// FIXME: if credentials can expire, refresh periodically +ACTOR Future loadBlobMetadataForTenants(Reference bwData, + Reference const> dbInfo, + std::vector tenantMapEntries) { + ASSERT(SERVER_KNOBS->BG_METADATA_SOURCE == "tenant"); + ASSERT(!tenantMapEntries.empty()); + state std::vector domainIds; + for (auto& entry : tenantMapEntries) { + if (BW_DEBUG) { + fmt::print( + "BW {0} loading blob metadata for tenant {1}\n", bwData->id.shortString().substr(0, 5), entry.id); + } + domainIds.push_back(entry.id); + } + + // FIXME: if one tenant gets an error, don't kill whole blob worker + // TODO: add latency metrics + loop { + Future requestFuture; + if (dbInfo.isValid() && dbInfo->get().encryptKeyProxy.present()) { + EKPGetLatestBlobMetadataRequest req; + req.domainIds = domainIds; + requestFuture = + brokenPromiseToNever(dbInfo->get().encryptKeyProxy.get().getLatestBlobMetadata.getReply(req)); + } else { + requestFuture = Never(); + } + choose { + when(EKPGetLatestBlobMetadataReply rep = wait(requestFuture)) { + ASSERT(rep.blobMetadataDetails.size() == domainIds.size()); + // not guaranteed to be in same order in the request as the response + for (auto& metadata : rep.blobMetadataDetails) { + auto info = bwData->tenantInfoById.find(metadata.domainId); + if (info == bwData->tenantInfoById.end()) { + TraceEvent(SevWarn, "BlobWorkerTenantDeletedWhileLoadMetadata", bwData->id) + .detail("TenantId", metadata.domainId); + continue; + } + auto dataEntry = bwData->tenantData.rangeContaining(info->second.prefix); + ASSERT(dataEntry.begin() == info->second.prefix); + dataEntry.cvalue()->conn = BlobConnectionProvider::newBlobConnectionProvider(metadata); + dataEntry.cvalue()->connLoaded.send(Void()); + TraceEvent(SevDebug, "BlobWorkerTenantMetadataLoaded", bwData->id) + .detail("TenantId", metadata.domainId); + if (BW_DEBUG) { + fmt::print("BW {0} loaded blob metadata for {1}: {2}", + bwData->id.shortString().substr(0, 5), + metadata.domainId, + metadata.base.present() ? metadata.base.get().toString() : ""); + if (metadata.partitions.empty()) { + fmt::print("\n"); + } else { + fmt::print(" ({0})\n", metadata.partitions.size()); + for (auto& it : metadata.partitions) { + fmt::print(" {0}\n", it.toString()); + } + } + } + } + return Void(); + } + when(wait(dbInfo->onChange())) {} + } + } +} + // Because change feeds send uncommitted data and explicit rollback messages, we speculatively buffer/write // uncommitted data. This means we must ensure the data is actually committed before "committing" those writes in // the blob granule. The simplest way to do this is to have the blob worker do a periodic GRV, which is guaranteed @@ -3143,7 +3213,7 @@ ACTOR Future runGRVChecks(Reference bwData) { // FIXME: better way to do this? // monitor system keyspace for new tenants -ACTOR Future monitorTenants(Reference bwData) { +ACTOR Future monitorTenants(Reference bwData, Reference const> dbInfo) { loop { state Reference tr = makeReference(bwData->db); loop { @@ -3163,6 +3233,7 @@ ACTOR Future monitorTenants(Reference bwData) { throw internal_error(); } + std::vector tenantsToLoad; for (auto& it : tenantResults) { // FIXME: handle removing/moving tenants! StringRef tenantName = it.key.removePrefix(tenantMapPrefix); @@ -3176,10 +3247,19 @@ ACTOR Future monitorTenants(Reference bwData) { entry.id, entry.prefix.printable()); } + auto r = makeReference(tenantName, entry); bwData->tenantData.insert(KeyRangeRef(entry.prefix, entry.prefix.withSuffix(normalKeys.end)), - makeReference(tenantName, entry)); + r); + if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") { + r->connLoaded.send(Void()); + } else { + tenantsToLoad.push_back(entry); + } } } + if (!tenantsToLoad.empty()) { + bwData->addActor.send(loadBlobMetadataForTenants(bwData, dbInfo, tenantsToLoad)); + } state Future watchChange = tr->watch(tenantLastIdKey); wait(tr->commit()); @@ -3262,7 +3342,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture())); self->addActor.send(runGRVChecks(self)); if (SERVER_KNOBS->BG_RANGE_SOURCE == "tenant") { - self->addActor.send(monitorTenants(self)); + self->addActor.send(monitorTenants(self, dbInfo)); } state Future selfRemoved = monitorRemoval(self); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 9cce0f862d..5325cd68a6 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -5,12 +5,15 @@ set(FDBSERVER_SRCS BackupProgress.actor.cpp BackupProgress.actor.h BackupWorker.actor.cpp + BlobConnectionProvider.h + BlobConnectionProvider.cpp BlobGranuleServerCommon.actor.cpp BlobGranuleServerCommon.actor.h BlobGranuleValidation.actor.cpp BlobGranuleValidation.actor.h BlobManager.actor.cpp BlobManagerInterface.h + BlobMetadataUtils.h BlobWorker.actor.cpp ClusterController.actor.cpp ClusterController.actor.h diff --git a/fdbserver/EncryptKeyProxy.actor.cpp b/fdbserver/EncryptKeyProxy.actor.cpp index d0d401bcee..274e442359 100644 --- a/fdbserver/EncryptKeyProxy.actor.cpp +++ b/fdbserver/EncryptKeyProxy.actor.cpp @@ -79,6 +79,18 @@ struct EncryptBaseCipherKey { bool isValid() { return noExpiry ? true : ((now() - creationTimeSec) < FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL); } }; +// TODO: could refactor both into CacheEntry with T data, creationTimeSec, and noExpiry +struct BlobMetadataCacheEntry { + Standalone metadataDetails; + uint64_t creationTimeSec; + + BlobMetadataCacheEntry() : creationTimeSec(0) {} + explicit BlobMetadataCacheEntry(Standalone metadataDetails) + : metadataDetails(metadataDetails), creationTimeSec(now()) {} + + bool isValid() { return (now() - creationTimeSec) < SERVER_KNOBS->BLOB_METADATA_CACHE_TTL; } +}; + using EncryptBaseDomainIdCache = std::unordered_map; using EncryptBaseCipherDomainIdKeyIdCacheKey = std::pair; @@ -86,15 +98,18 @@ using EncryptBaseCipherDomainIdKeyIdCacheKeyHash = boost::hash; +using BlobMetadataDomainIdCache = std::unordered_map; struct EncryptKeyProxyData : NonCopyable, ReferenceCounted { public: UID myId; PromiseStream> addActor; Future encryptionKeyRefresher; + Future blobMetadataRefresher; EncryptBaseDomainIdCache baseCipherDomainIdCache; EncryptBaseCipherDomainIdKeyIdCache baseCipherDomainIdKeyIdCache; + BlobMetadataDomainIdCache blobMetadataDomainIdCache; std::unique_ptr kmsConnector; @@ -107,6 +122,10 @@ public: Counter baseCipherKeysRefreshed; Counter numResponseWithErrors; Counter numEncryptionKeyRefreshErrors; + Counter blobMetadataCacheHits; + Counter blobMetadataCacheMisses; + Counter blobMetadataRefreshed; + Counter numBlobMetadataRefreshErrors; explicit EncryptKeyProxyData(UID id) : myId(id), ekpCacheMetrics("EKPMetrics", myId.toString()), @@ -116,7 +135,11 @@ public: baseCipherDomainIdCacheHits("EKPCipherDomainIdCacheHits", ekpCacheMetrics), baseCipherKeysRefreshed("EKPCipherKeysRefreshed", ekpCacheMetrics), numResponseWithErrors("EKPNumResponseWithErrors", ekpCacheMetrics), - numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics) {} + numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics), + blobMetadataCacheHits("EKPBlobMetadataCacheHits", ekpCacheMetrics), + blobMetadataCacheMisses("EKPBlobMetadataCacheMisses", ekpCacheMetrics), + blobMetadataRefreshed("EKPBlobMetadataRefreshed", ekpCacheMetrics), + numBlobMetadataRefreshErrors("EKPBlobMetadataRefreshErrors", ekpCacheMetrics) {} EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey( const EncryptCipherDomainId domainId, @@ -146,6 +169,11 @@ public: baseCipherDomainIdKeyIdCache[cacheKey] = EncryptBaseCipherKey(domainId, baseCipherId, baseCipherKey, true); } + void insertIntoBlobMetadataCache(const BlobMetadataDomainId domainId, + const Standalone& entry) { + blobMetadataDomainIdCache[domainId] = BlobMetadataCacheEntry(entry); + } + template using isEKPGetLatestBaseCipherKeysReply = std::is_base_of; template @@ -413,6 +441,131 @@ void refreshEncryptionKeys(Reference ekpProxyData, KmsConne Future ignored = refreshEncryptionKeysCore(ekpProxyData, kmsConnectorInf); } +ACTOR Future getLatestBlobMetadata(Reference ekpProxyData, + KmsConnectorInterface kmsConnectorInf, + EKPGetLatestBlobMetadataRequest req) { + // Scan the cached cipher-keys and filter our baseCipherIds locally cached + // for the rest, reachout to KMS to fetch the required details + state Standalone> metadataDetails; + state Optional dbgTrace = + req.debugId.present() ? TraceEvent("GetBlobMetadata", ekpProxyData->myId) : Optional(); + + if (dbgTrace.present()) { + dbgTrace.get().setMaxEventLength(SERVER_KNOBS->ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH); + dbgTrace.get().detail("DbgId", req.debugId.get()); + } + + // Dedup the requested domainIds. + std::unordered_set dedupedDomainIds; + for (auto id : req.domainIds) { + dedupedDomainIds.emplace(id); + } + + if (dbgTrace.present()) { + dbgTrace.get().detail("NKeys", dedupedDomainIds.size()); + for (BlobMetadataDomainId id : dedupedDomainIds) { + // log encryptDomainIds queried + dbgTrace.get().detail("BMQ" + std::to_string(id), ""); + } + } + + // First, check if the requested information is already cached by the server. + // Ensure the cached information is within SERVER_KNOBS->BLOB_METADATA_CACHE_TTL time window. + std::vector lookupDomains; + for (BlobMetadataDomainId id : dedupedDomainIds) { + const auto itr = ekpProxyData->blobMetadataDomainIdCache.find(id); + if (itr != ekpProxyData->blobMetadataDomainIdCache.end() && itr->second.isValid()) { + metadataDetails.arena().dependsOn(itr->second.metadataDetails.arena()); + metadataDetails.push_back(metadataDetails.arena(), itr->second.metadataDetails); + + if (dbgTrace.present()) { + dbgTrace.get().detail("BMC" + std::to_string(id), ""); + } + ++ekpProxyData->blobMetadataCacheHits; + } else { + lookupDomains.emplace_back(id); + ++ekpProxyData->blobMetadataCacheMisses; + } + } + + ekpProxyData->baseCipherDomainIdCacheHits += metadataDetails.size(); + ekpProxyData->baseCipherDomainIdCacheMisses += lookupDomains.size(); + + if (!lookupDomains.empty()) { + try { + KmsConnBlobMetadataReq kmsReq(lookupDomains, req.debugId); + KmsConnBlobMetadataRep kmsRep = wait(kmsConnectorInf.blobMetadataReq.getReply(kmsReq)); + metadataDetails.arena().dependsOn(kmsRep.metadataDetails.arena()); + + for (auto& item : kmsRep.metadataDetails) { + metadataDetails.push_back(metadataDetails.arena(), item); + + // Record the fetched cipher details to the local cache for the future references + ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item); + + if (dbgTrace.present()) { + // {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains + dbgTrace.get().detail("BMI" + std::to_string(item.domainId), ""); + } + } + } catch (Error& e) { + if (!canReplyWith(e)) { + TraceEvent("GetLatestBlobMetadataUnexpectedError", ekpProxyData->myId).error(e); + throw; + } + TraceEvent("GetLatestBlobMetadataExpectedError", ekpProxyData->myId).error(e); + req.reply.sendError(e); + return Void(); + } + } + + req.reply.send(EKPGetLatestBlobMetadataReply(metadataDetails)); + + return Void(); +} + +ACTOR Future refreshBlobMetadataCore(Reference ekpProxyData, + KmsConnectorInterface kmsConnectorInf) { + state UID debugId = deterministicRandom()->randomUniqueID(); + + state TraceEvent t("RefreshBlobMetadata_Start", ekpProxyData->myId); + t.setMaxEventLength(SERVER_KNOBS->ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH); + t.detail("KmsConnInf", kmsConnectorInf.id()); + t.detail("DebugId", debugId); + + try { + KmsConnBlobMetadataReq req; + req.debugId = debugId; + req.domainIds.reserve(ekpProxyData->blobMetadataDomainIdCache.size()); + + for (auto& item : ekpProxyData->blobMetadataDomainIdCache) { + req.domainIds.emplace_back(item.first); + } + KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req)); + for (auto& item : rep.metadataDetails) { + ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item); + t.detail("BM" + std::to_string(item.domainId), ""); + } + + ekpProxyData->blobMetadataRefreshed += rep.metadataDetails.size(); + + t.detail("nKeys", rep.metadataDetails.size()); + } catch (Error& e) { + if (!canReplyWith(e)) { + TraceEvent("RefreshBlobMetadata_Error").error(e); + throw e; + } + TraceEvent("RefreshBlobMetadata").detail("ErrorCode", e.code()); + ++ekpProxyData->numBlobMetadataRefreshErrors; + } + + return Void(); +} + +void refreshBlobMetadata(Reference ekpProxyData, KmsConnectorInterface kmsConnectorInf) { + Future ignored = refreshBlobMetadataCore(ekpProxyData, kmsConnectorInf); +} + void activateKmsConnector(Reference ekpProxyData, KmsConnectorInterface kmsConnectorInf) { if (g_network->isSimulated()) { ekpProxyData->kmsConnector = std::make_unique(); @@ -438,7 +591,7 @@ ACTOR Future encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface, activateKmsConnector(self, kmsConnectorInf); - // Register a recurring task to refresh the cached Encryption keys. + // Register a recurring task to refresh the cached Encryption keys and blob metadata. // Approach avoids external RPCs due to EncryptionKey refreshes for the inline write encryption codepath such as: // CPs, Redwood Storage Server node flush etc. The process doing the encryption refresh the cached cipher keys based // on FLOW_KNOB->ENCRYPTION_CIPHER_KEY_CACHE_TTL_SEC interval which is intentionally kept longer than @@ -449,13 +602,20 @@ ACTOR Future encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface, FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL, TaskPriority::Worker); + self->blobMetadataRefresher = recurring([&]() { refreshBlobMetadata(self, kmsConnectorInf); }, + SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL, + TaskPriority::Worker); + try { loop choose { when(EKPGetBaseCipherKeysByIdsRequest req = waitNext(ekpInterface.getBaseCipherKeysByIds.getFuture())) { - wait(getCipherKeysByBaseCipherKeyIds(self, kmsConnectorInf, req)); + self->addActor.send(getCipherKeysByBaseCipherKeyIds(self, kmsConnectorInf, req)); } when(EKPGetLatestBaseCipherKeysRequest req = waitNext(ekpInterface.getLatestBaseCipherKeys.getFuture())) { - wait(getLatestCipherKeys(self, kmsConnectorInf, req)); + self->addActor.send(getLatestCipherKeys(self, kmsConnectorInf, req)); + } + when(EKPGetLatestBlobMetadataRequest req = waitNext(ekpInterface.getLatestBlobMetadata.getFuture())) { + self->addActor.send(getLatestBlobMetadata(self, kmsConnectorInf, req)); } when(HaltEncryptKeyProxyRequest req = waitNext(ekpInterface.haltEncryptKeyProxy.getFuture())) { TraceEvent("EKP_Halted", self->myId).detail("ReqID", req.requesterID); diff --git a/fdbserver/EncryptKeyProxyInterface.h b/fdbserver/EncryptKeyProxyInterface.h index 52bb9e1245..f06e3008ae 100644 --- a/fdbserver/EncryptKeyProxyInterface.h +++ b/fdbserver/EncryptKeyProxyInterface.h @@ -29,6 +29,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" +#include "fdbserver/BlobMetadataUtils.h" struct EncryptKeyProxyInterface { constexpr static FileIdentifier file_identifier = 1303419; @@ -38,6 +39,7 @@ struct EncryptKeyProxyInterface { RequestStream haltEncryptKeyProxy; RequestStream getBaseCipherKeysByIds; RequestStream getLatestBaseCipherKeys; + RequestStream getLatestBlobMetadata; EncryptKeyProxyInterface() {} explicit EncryptKeyProxyInterface(const struct LocalityData& loc, UID id) : locality(loc), myId(id) {} @@ -63,6 +65,8 @@ struct EncryptKeyProxyInterface { waitFailure.getEndpoint().getAdjustedEndpoint(2)); getLatestBaseCipherKeys = RequestStream( waitFailure.getEndpoint().getAdjustedEndpoint(3)); + getLatestBlobMetadata = + RequestStream(waitFailure.getEndpoint().getAdjustedEndpoint(4)); } } @@ -72,6 +76,7 @@ struct EncryptKeyProxyInterface { streams.push_back(haltEncryptKeyProxy.getReceiver(TaskPriority::DefaultPromiseEndpoint)); streams.push_back(getBaseCipherKeysByIds.getReceiver(TaskPriority::Worker)); streams.push_back(getLatestBaseCipherKeys.getReceiver(TaskPriority::Worker)); + streams.push_back(getLatestBlobMetadata.getReceiver(TaskPriority::Worker)); FlowTransport::transport().addEndpoints(streams); } }; @@ -170,4 +175,35 @@ struct EKPGetLatestBaseCipherKeysRequest { } }; +// partition and credentials information for a given blob domain + +struct EKPGetLatestBlobMetadataReply { + constexpr static FileIdentifier file_identifier = 5761581; + Standalone> blobMetadataDetails; + + EKPGetLatestBlobMetadataReply() {} + explicit EKPGetLatestBlobMetadataReply(const Standalone>& blobMetadataDetails) + : blobMetadataDetails(blobMetadataDetails) {} + + template + void serialize(Ar& ar) { + serializer(ar, blobMetadataDetails); + } +}; + +struct EKPGetLatestBlobMetadataRequest { + constexpr static FileIdentifier file_identifier = 3821549; + std::vector domainIds; + Optional debugId; + ReplyPromise reply; + + EKPGetLatestBlobMetadataRequest() {} + explicit EKPGetLatestBlobMetadataRequest(const std::vector& ids) : domainIds(ids) {} + + template + void serialize(Ar& ar) { + serializer(ar, domainIds, debugId, reply); + } +}; + #endif \ No newline at end of file diff --git a/fdbserver/KmsConnectorInterface.h b/fdbserver/KmsConnectorInterface.h index 6f0a408e05..eaf02f3a74 100644 --- a/fdbserver/KmsConnectorInterface.h +++ b/fdbserver/KmsConnectorInterface.h @@ -28,12 +28,14 @@ #include "flow/Trace.h" #include "flow/flow.h" #include "flow/network.h" +#include "fdbserver/BlobMetadataUtils.h" struct KmsConnectorInterface { constexpr static FileIdentifier file_identifier = 2416711; RequestStream> waitFailure; RequestStream ekLookupByIds; RequestStream ekLookupByDomainIds; + RequestStream blobMetadataReq; KmsConnectorInterface() {} @@ -49,6 +51,8 @@ struct KmsConnectorInterface { RequestStream(waitFailure.getEndpoint().getAdjustedEndpoint(1)); ekLookupByDomainIds = RequestStream(waitFailure.getEndpoint().getAdjustedEndpoint(2)); + blobMetadataReq = + RequestStream(waitFailure.getEndpoint().getAdjustedEndpoint(3)); } } @@ -57,6 +61,7 @@ struct KmsConnectorInterface { streams.push_back(waitFailure.getReceiver()); streams.push_back(ekLookupByIds.getReceiver(TaskPriority::Worker)); streams.push_back(ekLookupByDomainIds.getReceiver(TaskPriority::Worker)); + streams.push_back(blobMetadataReq.getReceiver(TaskPriority::Worker)); FlowTransport::transport().addEndpoints(streams); } }; @@ -145,4 +150,32 @@ struct KmsConnLookupEKsByDomainIdsReq { } }; +struct KmsConnBlobMetadataRep { + constexpr static FileIdentifier file_identifier = 2919714; + Standalone> metadataDetails; + + KmsConnBlobMetadataRep() {} + + template + void serialize(Ar& ar) { + serializer(ar, metadataDetails); + } +}; + +struct KmsConnBlobMetadataReq { + constexpr static FileIdentifier file_identifier = 3913147; + std::vector domainIds; + Optional debugId; + ReplyPromise reply; + + KmsConnBlobMetadataReq() {} + explicit KmsConnBlobMetadataReq(const std::vector& ids, Optional dbgId) + : domainIds(ids), debugId(dbgId) {} + + template + void serialize(Ar& ar) { + serializer(ar, domainIds, debugId, reply); + } +}; + #endif diff --git a/fdbserver/RESTKmsConnector.actor.cpp b/fdbserver/RESTKmsConnector.actor.cpp index e5c2b05542..4270704736 100644 --- a/fdbserver/RESTKmsConnector.actor.cpp +++ b/fdbserver/RESTKmsConnector.actor.cpp @@ -808,6 +808,11 @@ ACTOR Future connectorCore_impl(KmsConnectorInterface interf) { byDomainIdReq.reply.sendError(e); } } + when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) { + // TODO: implement! + TraceEvent(SevWarn, "RESTKMSBlobMetadataNotImplemented!", interf.id()); + req.reply.sendError(not_implemented()); + } } } } diff --git a/fdbserver/SimKmsConnector.actor.cpp b/fdbserver/SimKmsConnector.actor.cpp index 8282c11906..e6ff02f219 100644 --- a/fdbserver/SimKmsConnector.actor.cpp +++ b/fdbserver/SimKmsConnector.actor.cpp @@ -47,7 +47,11 @@ struct SimEncryptKeyCtx { explicit SimEncryptKeyCtx(EncryptCipherBaseKeyId kId, const char* data) : id(kId), key(data, AES_256_KEY_LENGTH) {} }; -struct SimKmsConnectorContext { +// The credentials may be allowed to change, but the storage locations and partitioning cannot change, even across +// restarts. Keep it as global static state in simulation. +static std::unordered_map> simBlobMetadataStore; + +struct SimKmsConnectorContext : NonCopyable, ReferenceCounted { uint32_t maxEncryptionKeys; std::unordered_map> simEncryptKeyStore; @@ -66,90 +70,155 @@ struct SimKmsConnectorContext { } }; +ACTOR Future ekLookupByIds(Reference ctx, + KmsConnectorInterface interf, + KmsConnLookupEKsByKeyIdsReq req) { + state KmsConnLookupEKsByKeyIdsRep rep; + state bool success = true; + state Optional dbgKIdTrace = + req.debugId.present() ? TraceEvent("SimKmsGetByKeyIds", interf.id()) : Optional(); + + if (dbgKIdTrace.present()) { + dbgKIdTrace.get().setMaxEventLength(100000); + dbgKIdTrace.get().detail("DbgId", req.debugId.get()); + } + + // Lookup corresponding EncryptKeyCtx for input keyId + for (const auto& item : req.encryptKeyIds) { + const auto& itr = ctx->simEncryptKeyStore.find(item.first); + if (itr != ctx->simEncryptKeyStore.end()) { + rep.cipherKeyDetails.emplace_back( + item.second, itr->first, StringRef(rep.arena, itr->second.get()->key), rep.arena); + + if (dbgKIdTrace.present()) { + // {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains + dbgKIdTrace.get().detail( + getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, item.second, itr->first), ""); + } + } else { + success = false; + break; + } + } + + wait(delayJittered(1.0)); // simulate network delay + + success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found()); + + return Void(); +} + +ACTOR Future ekLookupByDomainIds(Reference ctx, + KmsConnectorInterface interf, + KmsConnLookupEKsByDomainIdsReq req) { + state KmsConnLookupEKsByDomainIdsRep rep; + state bool success = true; + state Optional dbgDIdTrace = + req.debugId.present() ? TraceEvent("SimKmsGetsByDomIds", interf.id()) : Optional(); + + if (dbgDIdTrace.present()) { + dbgDIdTrace.get().detail("DbgId", req.debugId.get()); + } + + // Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This + // would mean multiple domains gets mapped to the same encryption key which is fine, the + // EncryptKeyStore guarantees that keyId -> plaintext encryptKey mapping is idempotent. + for (EncryptCipherDomainId domainId : req.encryptDomainIds) { + EncryptCipherBaseKeyId keyId = 1 + abs(domainId) % SERVER_KNOBS->SIM_KMS_MAX_KEYS; + const auto& itr = ctx->simEncryptKeyStore.find(keyId); + if (itr != ctx->simEncryptKeyStore.end()) { + rep.cipherKeyDetails.emplace_back(domainId, keyId, StringRef(itr->second.get()->key), rep.arena); + if (dbgDIdTrace.present()) { + // {encryptId, baseCipherId} forms a unique tuple across encryption domains + dbgDIdTrace.get().detail(getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, domainId, keyId), ""); + } + } else { + success = false; + break; + } + } + + wait(delayJittered(1.0)); // simulate network delay + + success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found()); + + return Void(); +} + +static Standalone createBlobMetadata(BlobMetadataDomainId domainId) { + Standalone metadata; + metadata.domainId = domainId; + // 0 == no partition, 1 == suffix partitioned, 2 == storage location partitioned + int type = deterministicRandom()->randomInt(0, 3); + int partitionCount = (type == 0) ? 0 : deterministicRandom()->randomInt(2, 12); + if (type == 0) { + // single storage location + metadata.base = StringRef(metadata.arena(), "file://fdbblob/" + std::to_string(domainId) + "/"); + } + if (type == 1) { + // simulate hash prefixing in s3 + metadata.base = StringRef(metadata.arena(), "file://fdbblob/"); + for (int i = 0; i < partitionCount; i++) { + metadata.partitions.push_back_deep(metadata.arena(), + deterministicRandom()->randomUniqueID().shortString() + "-" + + std::to_string(domainId) + "/"); + } + } + if (type == 2) { + // simulate separate storage location per partition + for (int i = 0; i < partitionCount; i++) { + metadata.partitions.push_back_deep( + metadata.arena(), "file://fdbblob" + std::to_string(domainId) + "_" + std::to_string(i) + "/"); + } + } + return metadata; +} + +ACTOR Future blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobMetadataReq req) { + state KmsConnBlobMetadataRep rep; + state Optional dbgDIdTrace = + req.debugId.present() ? TraceEvent("SimKmsBlobMetadataLookup", interf.id()) : Optional(); + if (dbgDIdTrace.present()) { + dbgDIdTrace.get().detail("DbgId", req.debugId.get()); + } + + for (BlobMetadataDomainId domainId : req.domainIds) { + auto it = simBlobMetadataStore.find(domainId); + if (it == simBlobMetadataStore.end()) { + // construct new blob metadata + it = simBlobMetadataStore.insert({ domainId, createBlobMetadata(domainId) }).first; + } + rep.metadataDetails.arena().dependsOn(it->second.arena()); + rep.metadataDetails.push_back(rep.metadataDetails.arena(), it->second); + } + + wait(delayJittered(1.0)); // simulate network delay + + req.reply.send(rep); + + return Void(); +} + ACTOR Future simKmsConnectorCore_impl(KmsConnectorInterface interf) { TraceEvent("SimEncryptKmsProxy_Init", interf.id()).detail("MaxEncryptKeys", SERVER_KNOBS->SIM_KMS_MAX_KEYS); - state bool success = true; - state std::unique_ptr ctx = - std::make_unique(SERVER_KNOBS->SIM_KMS_MAX_KEYS); + state Reference ctx = makeReference(SERVER_KNOBS->SIM_KMS_MAX_KEYS); ASSERT_EQ(ctx->simEncryptKeyStore.size(), SERVER_KNOBS->SIM_KMS_MAX_KEYS); + state PromiseStream> addActor; + state Future collection = actorCollection(addActor.getFuture()); + loop { choose { when(KmsConnLookupEKsByKeyIdsReq req = waitNext(interf.ekLookupByIds.getFuture())) { - state KmsConnLookupEKsByKeyIdsReq keysByIdsReq = req; - state KmsConnLookupEKsByKeyIdsRep keysByIdsRep; - state Optional dbgKIdTrace = keysByIdsReq.debugId.present() - ? TraceEvent("SimKmsGetByKeyIds", interf.id()) - : Optional(); - - if (dbgKIdTrace.present()) { - dbgKIdTrace.get().setMaxEventLength(100000); - dbgKIdTrace.get().detail("DbgId", keysByIdsReq.debugId.get()); - } - - // Lookup corresponding EncryptKeyCtx for input keyId - for (const auto& item : req.encryptKeyIds) { - const auto& itr = ctx->simEncryptKeyStore.find(item.first); - if (itr != ctx->simEncryptKeyStore.end()) { - keysByIdsRep.cipherKeyDetails.emplace_back( - item.second, - itr->first, - StringRef(keysByIdsRep.arena, itr->second.get()->key), - keysByIdsRep.arena); - - if (dbgKIdTrace.present()) { - // {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains - dbgKIdTrace.get().detail( - getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, item.second, itr->first), ""); - } - } else { - success = false; - break; - } - } - - wait(delayJittered(1.0)); // simulate network delay - - success ? keysByIdsReq.reply.send(keysByIdsRep) : keysByIdsReq.reply.sendError(encrypt_key_not_found()); + addActor.send(ekLookupByIds(ctx, interf, req)); } when(KmsConnLookupEKsByDomainIdsReq req = waitNext(interf.ekLookupByDomainIds.getFuture())) { - state KmsConnLookupEKsByDomainIdsReq keysByDomainIdReq = req; - state KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep; - state Optional dbgDIdTrace = keysByDomainIdReq.debugId.present() - ? TraceEvent("SimKmsGetsByDomIds", interf.id()) - : Optional(); - - if (dbgDIdTrace.present()) { - dbgDIdTrace.get().detail("DbgId", keysByDomainIdReq.debugId.get()); - } - - // Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This - // would mean multiple domains gets mapped to the same encryption key which is fine, the - // EncryptKeyStore guarantees that keyId -> plaintext encryptKey mapping is idempotent. - for (EncryptCipherDomainId domainId : req.encryptDomainIds) { - EncryptCipherBaseKeyId keyId = 1 + abs(domainId) % SERVER_KNOBS->SIM_KMS_MAX_KEYS; - const auto& itr = ctx->simEncryptKeyStore.find(keyId); - if (itr != ctx->simEncryptKeyStore.end()) { - keysByDomainIdRep.cipherKeyDetails.emplace_back( - domainId, keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena); - - if (dbgDIdTrace.present()) { - // {encryptId, baseCipherId} forms a unique tuple across encryption domains - dbgDIdTrace.get().detail( - getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, domainId, keyId), ""); - } - } else { - success = false; - break; - } - } - - wait(delayJittered(1.0)); // simulate network delay - - success ? keysByDomainIdReq.reply.send(keysByDomainIdRep) - : keysByDomainIdReq.reply.sendError(encrypt_key_not_found()); + addActor.send(ekLookupByDomainIds(ctx, interf, req)); + } + when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) { + addActor.send(blobMetadataLookup(interf, req)); } } } diff --git a/tests/slow/BlobGranuleCorrectness.toml b/tests/slow/BlobGranuleCorrectness.toml index 37f9c171b8..f7ab5204a3 100644 --- a/tests/slow/BlobGranuleCorrectness.toml +++ b/tests/slow/BlobGranuleCorrectness.toml @@ -7,6 +7,8 @@ storageEngineExcludeTypes = [4] [[knobs]] bg_range_source = "tenant" +bg_metadata_source = "tenant" +enable_encryption = true [[test]] testTitle = 'BlobGranuleCorrectness' diff --git a/tests/slow/BlobGranuleCorrectnessClean.toml b/tests/slow/BlobGranuleCorrectnessClean.toml index b15bb1661b..bf58ef1630 100644 --- a/tests/slow/BlobGranuleCorrectnessClean.toml +++ b/tests/slow/BlobGranuleCorrectnessClean.toml @@ -7,6 +7,8 @@ storageEngineExcludeTypes = [4] [[knobs]] bg_range_source = "tenant" +bg_metadata_source = "tenant" +enable_encryption = true [[test]] testTitle = 'BlobGranuleCorrectness'