Added blob metadata concept as new secret type, and verified blob workers can load it
This commit is contained in:
parent
cd2a575e02
commit
ffa4255c65
|
@ -870,6 +870,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BG_URL, isSimulated ? "file://fdbblob/" : "" ); // TODO: store in system key space or something, eventually
|
||||
// BlobGranuleVerify* simulation tests use blobRangeKeys, BlobGranuleCorrectness* use tenant, default in real clusters is tenant
|
||||
init( BG_RANGE_SOURCE, "tenant" );
|
||||
// BlobGranuleVerify* simulation tests use knobs, BlobGranuleCorrectness* use tenant, default in real clusters is knobs
|
||||
init( BG_METADATA_SOURCE, "knobs" );
|
||||
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( buggifySmallShards ) BG_SNAPSHOT_FILE_TARGET_BYTES = 100000; else if (simulationMediumShards || (randomize && BUGGIFY) ) BG_SNAPSHOT_FILE_TARGET_BYTES = 1000000;
|
||||
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
|
||||
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
|
||||
|
@ -893,6 +895,12 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
|
||||
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );
|
||||
|
||||
// Blob Metadata
|
||||
init( BLOB_METADATA_CACHE_TTL, isSimulated ? 120 : 24 * 60 * 60 );
|
||||
if ( randomize && BUGGIFY) { BLOB_METADATA_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
|
||||
init( BLOB_METADATA_REFRESH_INTERVAL, isSimulated ? 60 : 12 * 60 * 60 );
|
||||
if ( randomize && BUGGIFY) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(20, 40); }
|
||||
|
||||
// HTTP KMS Connector
|
||||
init( REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE, "file");
|
||||
init( REST_KMS_CONNECTOR_VALIDATION_TOKEN_MODE, "file");
|
||||
|
|
|
@ -836,6 +836,9 @@ public:
|
|||
|
||||
// whether to use blobRangeKeys or tenants for blob granule range sources
|
||||
std::string BG_RANGE_SOURCE;
|
||||
// Whether to use knobs or EKP for blob metadata and credentials
|
||||
std::string BG_METADATA_SOURCE;
|
||||
|
||||
int BG_SNAPSHOT_FILE_TARGET_BYTES;
|
||||
int BG_DELTA_FILE_TARGET_BYTES;
|
||||
int BG_DELTA_BYTES_BEFORE_COMPACT;
|
||||
|
@ -856,6 +859,10 @@ public:
|
|||
double BGCC_TIMEOUT;
|
||||
double BGCC_MIN_INTERVAL;
|
||||
|
||||
// Blob metadata
|
||||
int64_t BLOB_METADATA_CACHE_TTL;
|
||||
int64_t BLOB_METADATA_REFRESH_INTERVAL;
|
||||
|
||||
// HTTP KMS Connector
|
||||
std::string REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE;
|
||||
std::string REST_KMS_CONNECTOR_DISCOVER_KMS_URL_FILE;
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* BlobConnectionProvider.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "flow/IRandom.h"
|
||||
#include "fdbserver/BlobConnectionProvider.h"
|
||||
|
||||
struct SingleBlobConnectionProvider : BlobConnectionProvider {
|
||||
public:
|
||||
std::pair<Reference<BackupContainerFileSystem>, std::string> createForWrite(std::string newFileName) {
|
||||
return std::pair(conn, newFileName);
|
||||
}
|
||||
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) { return conn; }
|
||||
|
||||
SingleBlobConnectionProvider(std::string url) { conn = BackupContainerFileSystem::openContainerFS(url, {}, {}); }
|
||||
|
||||
private:
|
||||
Reference<BackupContainerFileSystem> conn;
|
||||
};
|
||||
|
||||
struct PartitionedBlobConnectionProvider : BlobConnectionProvider {
|
||||
std::pair<Reference<BackupContainerFileSystem>, std::string> createForWrite(std::string newFileName) {
|
||||
// choose a partition randomly, to distribute load
|
||||
int writePartition = deterministicRandom()->randomInt(0, metadata.partitions.size());
|
||||
return std::pair(conn, metadata.partitions[writePartition].toString() + newFileName);
|
||||
}
|
||||
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) { return conn; }
|
||||
|
||||
PartitionedBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) : metadata(metadata) {
|
||||
ASSERT(metadata.base.present());
|
||||
ASSERT(metadata.partitions.size() >= 2);
|
||||
conn = BackupContainerFileSystem::openContainerFS(metadata.base.get().toString(), {}, {});
|
||||
for (auto& it : metadata.partitions) {
|
||||
// these should be suffixes, not whole blob urls
|
||||
ASSERT(it.toString().find("://") == std::string::npos);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
Standalone<BlobMetadataDetailsRef> metadata;
|
||||
Reference<BackupContainerFileSystem> conn;
|
||||
};
|
||||
|
||||
// Could always include number of partitions as validation in sanity check or something?
|
||||
// Ex: partition_numPartitions/filename instead of partition/filename
|
||||
struct StorageLocationBlobConnectionProvider : BlobConnectionProvider {
|
||||
std::pair<Reference<BackupContainerFileSystem>, std::string> createForWrite(std::string newFileName) {
|
||||
// choose a partition randomly, to distribute load
|
||||
int writePartition = deterministicRandom()->randomInt(0, partitions.size());
|
||||
// include partition information in the filename
|
||||
return std::pair(partitions[writePartition], std::to_string(writePartition) + "/" + newFileName);
|
||||
}
|
||||
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) {
|
||||
size_t slash = filePath.find("/");
|
||||
ASSERT(slash != std::string::npos);
|
||||
int partition = stoi(filePath.substr(0, slash));
|
||||
ASSERT(partition >= 0);
|
||||
ASSERT(partition < partitions.size());
|
||||
return partitions[partition];
|
||||
}
|
||||
|
||||
StorageLocationBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) {
|
||||
ASSERT(!metadata.base.present());
|
||||
ASSERT(metadata.partitions.size() >= 2);
|
||||
for (auto& it : metadata.partitions) {
|
||||
// these should be whole blob urls
|
||||
ASSERT(it.toString().find("://") != std::string::npos);
|
||||
partitions.push_back(BackupContainerFileSystem::openContainerFS(it.toString(), {}, {}));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<Reference<BackupContainerFileSystem>> partitions;
|
||||
};
|
||||
|
||||
Reference<BlobConnectionProvider> BlobConnectionProvider::newBlobConnectionProvider(std::string blobUrl) {
|
||||
return makeReference<SingleBlobConnectionProvider>(blobUrl);
|
||||
}
|
||||
|
||||
Reference<BlobConnectionProvider> BlobConnectionProvider::newBlobConnectionProvider(
|
||||
Standalone<BlobMetadataDetailsRef> blobMetadata) {
|
||||
if (blobMetadata.partitions.empty()) {
|
||||
return makeReference<SingleBlobConnectionProvider>(blobMetadata.base.get().toString());
|
||||
} else {
|
||||
ASSERT(blobMetadata.partitions.size() >= 2);
|
||||
if (blobMetadata.base.present()) {
|
||||
return makeReference<PartitionedBlobConnectionProvider>(blobMetadata);
|
||||
} else {
|
||||
return makeReference<StorageLocationBlobConnectionProvider>(blobMetadata);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* BlobConnectionProvider.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef BLOB_CONNECTION_PROVIDER_H
|
||||
#define BLOB_CONNECTION_PROVIDER_H
|
||||
|
||||
#include "fdbclient/BackupContainerFileSystem.h"
|
||||
#include "fdbserver/BlobMetadataUtils.h"
|
||||
|
||||
struct BlobConnectionProvider : NonCopyable, ReferenceCounted<BlobConnectionProvider> {
|
||||
// chooses a partition and prepends the necessary prefix to the filename (if necessary) for writing a file, and
|
||||
// returns it and the backup container
|
||||
virtual std::pair<Reference<BackupContainerFileSystem>, std::string> createForWrite(std::string newFileName) = 0;
|
||||
|
||||
// given a file, return the backup container and full file path necessary to access it. File path should be
|
||||
// something returned from createForWrite
|
||||
virtual Reference<BackupContainerFileSystem> getForRead(std::string filePath) = 0;
|
||||
|
||||
virtual ~BlobConnectionProvider() {}
|
||||
|
||||
static Reference<BlobConnectionProvider> newBlobConnectionProvider(std::string blobUrl);
|
||||
|
||||
static Reference<BlobConnectionProvider> newBlobConnectionProvider(Standalone<BlobMetadataDetailsRef> blobMetadata);
|
||||
};
|
||||
|
||||
#endif
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* BlobMetadataUtils.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef BLOB_METADATA_UTILS_H
|
||||
#define BLOB_METADATA_UTILS_H
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FileIdentifier.h"
|
||||
|
||||
using BlobMetadataDomainId = int64_t;
|
||||
|
||||
/*
|
||||
* There are 3 cases for blob metadata.
|
||||
* 1. A non-partitioned blob store. baseUrl is set, and partitions is empty. Files will be written with this prefix.
|
||||
* 2. A sub-path partitioned blob store. baseUrl is set, and partitions contains 2 or more sub-paths. Files will be
|
||||
* written with a prefix of the base url and then one of the sub-paths.
|
||||
* 3. A separate-storage-location partitioned blob store. baseUrl is NOT set, and partitions contains 2 or more full
|
||||
* fdb blob urls. Files will be written with one of the partition prefixes.
|
||||
* Partitioning is desired in blob stores such as s3 that can run into metadata hotspotting issues.
|
||||
*/
|
||||
struct BlobMetadataDetailsRef {
|
||||
constexpr static FileIdentifier file_identifier = 6685526;
|
||||
BlobMetadataDomainId domainId;
|
||||
Optional<StringRef> base;
|
||||
VectorRef<StringRef> partitions;
|
||||
|
||||
BlobMetadataDetailsRef() {}
|
||||
BlobMetadataDetailsRef(Arena& arena, const BlobMetadataDetailsRef& from)
|
||||
: domainId(from.domainId), partitions(arena, from.partitions) {
|
||||
if (from.base.present()) {
|
||||
base = StringRef(arena, from.base.get());
|
||||
}
|
||||
}
|
||||
explicit BlobMetadataDetailsRef(BlobMetadataDomainId domainId,
|
||||
Optional<StringRef> base,
|
||||
VectorRef<StringRef> partitions)
|
||||
: domainId(domainId), base(base), partitions(partitions) {}
|
||||
|
||||
int expectedSize() const { return sizeof(BlobMetadataDetailsRef) + partitions.expectedSize(); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainId, base, partitions);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
|
@ -36,8 +36,10 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/BlobMetadataUtils.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "fdbserver/BlobConnectionProvider.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
|
@ -161,6 +163,8 @@ struct GranuleTenantData : NonCopyable, ReferenceCounted<GranuleTenantData> {
|
|||
TenantName name;
|
||||
TenantMapEntry entry;
|
||||
// TODO add other useful stuff like per-tenant blob connection, if necessary
|
||||
Reference<BlobConnectionProvider> conn;
|
||||
Promise<Void> connLoaded;
|
||||
|
||||
GranuleTenantData() {}
|
||||
GranuleTenantData(TenantName name, TenantMapEntry entry) : name(name), entry(entry) {}
|
||||
|
@ -3111,6 +3115,72 @@ ACTOR Future<Void> monitorRemoval(Reference<BlobWorkerData> bwData) {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: if credentials can expire, refresh periodically
|
||||
ACTOR Future<Void> loadBlobMetadataForTenants(Reference<BlobWorkerData> bwData,
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
|
||||
std::vector<TenantMapEntry> tenantMapEntries) {
|
||||
ASSERT(SERVER_KNOBS->BG_METADATA_SOURCE == "tenant");
|
||||
ASSERT(!tenantMapEntries.empty());
|
||||
state std::vector<BlobMetadataDomainId> domainIds;
|
||||
for (auto& entry : tenantMapEntries) {
|
||||
if (BW_DEBUG) {
|
||||
fmt::print(
|
||||
"BW {0} loading blob metadata for tenant {1}\n", bwData->id.shortString().substr(0, 5), entry.id);
|
||||
}
|
||||
domainIds.push_back(entry.id);
|
||||
}
|
||||
|
||||
// FIXME: if one tenant gets an error, don't kill whole blob worker
|
||||
// TODO: add latency metrics
|
||||
loop {
|
||||
Future<EKPGetLatestBlobMetadataReply> requestFuture;
|
||||
if (dbInfo.isValid() && dbInfo->get().encryptKeyProxy.present()) {
|
||||
EKPGetLatestBlobMetadataRequest req;
|
||||
req.domainIds = domainIds;
|
||||
requestFuture =
|
||||
brokenPromiseToNever(dbInfo->get().encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
|
||||
} else {
|
||||
requestFuture = Never();
|
||||
}
|
||||
choose {
|
||||
when(EKPGetLatestBlobMetadataReply rep = wait(requestFuture)) {
|
||||
ASSERT(rep.blobMetadataDetails.size() == domainIds.size());
|
||||
// not guaranteed to be in same order in the request as the response
|
||||
for (auto& metadata : rep.blobMetadataDetails) {
|
||||
auto info = bwData->tenantInfoById.find(metadata.domainId);
|
||||
if (info == bwData->tenantInfoById.end()) {
|
||||
TraceEvent(SevWarn, "BlobWorkerTenantDeletedWhileLoadMetadata", bwData->id)
|
||||
.detail("TenantId", metadata.domainId);
|
||||
continue;
|
||||
}
|
||||
auto dataEntry = bwData->tenantData.rangeContaining(info->second.prefix);
|
||||
ASSERT(dataEntry.begin() == info->second.prefix);
|
||||
dataEntry.cvalue()->conn = BlobConnectionProvider::newBlobConnectionProvider(metadata);
|
||||
dataEntry.cvalue()->connLoaded.send(Void());
|
||||
TraceEvent(SevDebug, "BlobWorkerTenantMetadataLoaded", bwData->id)
|
||||
.detail("TenantId", metadata.domainId);
|
||||
if (BW_DEBUG) {
|
||||
fmt::print("BW {0} loaded blob metadata for {1}: {2}",
|
||||
bwData->id.shortString().substr(0, 5),
|
||||
metadata.domainId,
|
||||
metadata.base.present() ? metadata.base.get().toString() : "");
|
||||
if (metadata.partitions.empty()) {
|
||||
fmt::print("\n");
|
||||
} else {
|
||||
fmt::print(" ({0})\n", metadata.partitions.size());
|
||||
for (auto& it : metadata.partitions) {
|
||||
fmt::print(" {0}\n", it.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
when(wait(dbInfo->onChange())) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Because change feeds send uncommitted data and explicit rollback messages, we speculatively buffer/write
|
||||
// uncommitted data. This means we must ensure the data is actually committed before "committing" those writes in
|
||||
// the blob granule. The simplest way to do this is to have the blob worker do a periodic GRV, which is guaranteed
|
||||
|
@ -3143,7 +3213,7 @@ ACTOR Future<Void> runGRVChecks(Reference<BlobWorkerData> bwData) {
|
|||
|
||||
// FIXME: better way to do this?
|
||||
// monitor system keyspace for new tenants
|
||||
ACTOR Future<Void> monitorTenants(Reference<BlobWorkerData> bwData) {
|
||||
ACTOR Future<Void> monitorTenants(Reference<BlobWorkerData> bwData, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
|
||||
loop {
|
||||
|
@ -3163,6 +3233,7 @@ ACTOR Future<Void> monitorTenants(Reference<BlobWorkerData> bwData) {
|
|||
throw internal_error();
|
||||
}
|
||||
|
||||
std::vector<TenantMapEntry> tenantsToLoad;
|
||||
for (auto& it : tenantResults) {
|
||||
// FIXME: handle removing/moving tenants!
|
||||
StringRef tenantName = it.key.removePrefix(tenantMapPrefix);
|
||||
|
@ -3176,10 +3247,19 @@ ACTOR Future<Void> monitorTenants(Reference<BlobWorkerData> bwData) {
|
|||
entry.id,
|
||||
entry.prefix.printable());
|
||||
}
|
||||
auto r = makeReference<GranuleTenantData>(tenantName, entry);
|
||||
bwData->tenantData.insert(KeyRangeRef(entry.prefix, entry.prefix.withSuffix(normalKeys.end)),
|
||||
makeReference<GranuleTenantData>(tenantName, entry));
|
||||
r);
|
||||
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
|
||||
r->connLoaded.send(Void());
|
||||
} else {
|
||||
tenantsToLoad.push_back(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!tenantsToLoad.empty()) {
|
||||
bwData->addActor.send(loadBlobMetadataForTenants(bwData, dbInfo, tenantsToLoad));
|
||||
}
|
||||
|
||||
state Future<Void> watchChange = tr->watch(tenantLastIdKey);
|
||||
wait(tr->commit());
|
||||
|
@ -3262,7 +3342,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
|
|||
self->addActor.send(waitFailureServer(bwInterf.waitFailure.getFuture()));
|
||||
self->addActor.send(runGRVChecks(self));
|
||||
if (SERVER_KNOBS->BG_RANGE_SOURCE == "tenant") {
|
||||
self->addActor.send(monitorTenants(self));
|
||||
self->addActor.send(monitorTenants(self, dbInfo));
|
||||
}
|
||||
state Future<Void> selfRemoved = monitorRemoval(self);
|
||||
|
||||
|
|
|
@ -5,12 +5,15 @@ set(FDBSERVER_SRCS
|
|||
BackupProgress.actor.cpp
|
||||
BackupProgress.actor.h
|
||||
BackupWorker.actor.cpp
|
||||
BlobConnectionProvider.h
|
||||
BlobConnectionProvider.cpp
|
||||
BlobGranuleServerCommon.actor.cpp
|
||||
BlobGranuleServerCommon.actor.h
|
||||
BlobGranuleValidation.actor.cpp
|
||||
BlobGranuleValidation.actor.h
|
||||
BlobManager.actor.cpp
|
||||
BlobManagerInterface.h
|
||||
BlobMetadataUtils.h
|
||||
BlobWorker.actor.cpp
|
||||
ClusterController.actor.cpp
|
||||
ClusterController.actor.h
|
||||
|
|
|
@ -79,6 +79,18 @@ struct EncryptBaseCipherKey {
|
|||
bool isValid() { return noExpiry ? true : ((now() - creationTimeSec) < FLOW_KNOBS->ENCRYPT_CIPHER_KEY_CACHE_TTL); }
|
||||
};
|
||||
|
||||
// TODO: could refactor both into CacheEntry<T> with T data, creationTimeSec, and noExpiry
|
||||
struct BlobMetadataCacheEntry {
|
||||
Standalone<BlobMetadataDetailsRef> metadataDetails;
|
||||
uint64_t creationTimeSec;
|
||||
|
||||
BlobMetadataCacheEntry() : creationTimeSec(0) {}
|
||||
explicit BlobMetadataCacheEntry(Standalone<BlobMetadataDetailsRef> metadataDetails)
|
||||
: metadataDetails(metadataDetails), creationTimeSec(now()) {}
|
||||
|
||||
bool isValid() { return (now() - creationTimeSec) < SERVER_KNOBS->BLOB_METADATA_CACHE_TTL; }
|
||||
};
|
||||
|
||||
using EncryptBaseDomainIdCache = std::unordered_map<EncryptCipherDomainId, EncryptBaseCipherKey>;
|
||||
|
||||
using EncryptBaseCipherDomainIdKeyIdCacheKey = std::pair<EncryptCipherDomainId, EncryptCipherBaseKeyId>;
|
||||
|
@ -86,15 +98,18 @@ using EncryptBaseCipherDomainIdKeyIdCacheKeyHash = boost::hash<EncryptBaseCipher
|
|||
using EncryptBaseCipherDomainIdKeyIdCache = std::unordered_map<EncryptBaseCipherDomainIdKeyIdCacheKey,
|
||||
EncryptBaseCipherKey,
|
||||
EncryptBaseCipherDomainIdKeyIdCacheKeyHash>;
|
||||
using BlobMetadataDomainIdCache = std::unordered_map<BlobMetadataDomainId, BlobMetadataCacheEntry>;
|
||||
|
||||
struct EncryptKeyProxyData : NonCopyable, ReferenceCounted<EncryptKeyProxyData> {
|
||||
public:
|
||||
UID myId;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
Future<Void> encryptionKeyRefresher;
|
||||
Future<Void> blobMetadataRefresher;
|
||||
|
||||
EncryptBaseDomainIdCache baseCipherDomainIdCache;
|
||||
EncryptBaseCipherDomainIdKeyIdCache baseCipherDomainIdKeyIdCache;
|
||||
BlobMetadataDomainIdCache blobMetadataDomainIdCache;
|
||||
|
||||
std::unique_ptr<KmsConnector> kmsConnector;
|
||||
|
||||
|
@ -107,6 +122,10 @@ public:
|
|||
Counter baseCipherKeysRefreshed;
|
||||
Counter numResponseWithErrors;
|
||||
Counter numEncryptionKeyRefreshErrors;
|
||||
Counter blobMetadataCacheHits;
|
||||
Counter blobMetadataCacheMisses;
|
||||
Counter blobMetadataRefreshed;
|
||||
Counter numBlobMetadataRefreshErrors;
|
||||
|
||||
explicit EncryptKeyProxyData(UID id)
|
||||
: myId(id), ekpCacheMetrics("EKPMetrics", myId.toString()),
|
||||
|
@ -116,7 +135,11 @@ public:
|
|||
baseCipherDomainIdCacheHits("EKPCipherDomainIdCacheHits", ekpCacheMetrics),
|
||||
baseCipherKeysRefreshed("EKPCipherKeysRefreshed", ekpCacheMetrics),
|
||||
numResponseWithErrors("EKPNumResponseWithErrors", ekpCacheMetrics),
|
||||
numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics) {}
|
||||
numEncryptionKeyRefreshErrors("EKPNumEncryptionKeyRefreshErrors", ekpCacheMetrics),
|
||||
blobMetadataCacheHits("EKPBlobMetadataCacheHits", ekpCacheMetrics),
|
||||
blobMetadataCacheMisses("EKPBlobMetadataCacheMisses", ekpCacheMetrics),
|
||||
blobMetadataRefreshed("EKPBlobMetadataRefreshed", ekpCacheMetrics),
|
||||
numBlobMetadataRefreshErrors("EKPBlobMetadataRefreshErrors", ekpCacheMetrics) {}
|
||||
|
||||
EncryptBaseCipherDomainIdKeyIdCacheKey getBaseCipherDomainIdKeyIdCacheKey(
|
||||
const EncryptCipherDomainId domainId,
|
||||
|
@ -146,6 +169,11 @@ public:
|
|||
baseCipherDomainIdKeyIdCache[cacheKey] = EncryptBaseCipherKey(domainId, baseCipherId, baseCipherKey, true);
|
||||
}
|
||||
|
||||
void insertIntoBlobMetadataCache(const BlobMetadataDomainId domainId,
|
||||
const Standalone<BlobMetadataDetailsRef>& entry) {
|
||||
blobMetadataDomainIdCache[domainId] = BlobMetadataCacheEntry(entry);
|
||||
}
|
||||
|
||||
template <class Reply>
|
||||
using isEKPGetLatestBaseCipherKeysReply = std::is_base_of<EKPGetLatestBaseCipherKeysReply, Reply>;
|
||||
template <class Reply>
|
||||
|
@ -413,6 +441,131 @@ void refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData, KmsConne
|
|||
Future<Void> ignored = refreshEncryptionKeysCore(ekpProxyData, kmsConnectorInf);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf,
|
||||
EKPGetLatestBlobMetadataRequest req) {
|
||||
// Scan the cached cipher-keys and filter our baseCipherIds locally cached
|
||||
// for the rest, reachout to KMS to fetch the required details
|
||||
state Standalone<VectorRef<BlobMetadataDetailsRef>> metadataDetails;
|
||||
state Optional<TraceEvent> dbgTrace =
|
||||
req.debugId.present() ? TraceEvent("GetBlobMetadata", ekpProxyData->myId) : Optional<TraceEvent>();
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
dbgTrace.get().setMaxEventLength(SERVER_KNOBS->ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH);
|
||||
dbgTrace.get().detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
// Dedup the requested domainIds.
|
||||
std::unordered_set<BlobMetadataDomainId> dedupedDomainIds;
|
||||
for (auto id : req.domainIds) {
|
||||
dedupedDomainIds.emplace(id);
|
||||
}
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
dbgTrace.get().detail("NKeys", dedupedDomainIds.size());
|
||||
for (BlobMetadataDomainId id : dedupedDomainIds) {
|
||||
// log encryptDomainIds queried
|
||||
dbgTrace.get().detail("BMQ" + std::to_string(id), "");
|
||||
}
|
||||
}
|
||||
|
||||
// First, check if the requested information is already cached by the server.
|
||||
// Ensure the cached information is within SERVER_KNOBS->BLOB_METADATA_CACHE_TTL time window.
|
||||
std::vector<BlobMetadataDomainId> lookupDomains;
|
||||
for (BlobMetadataDomainId id : dedupedDomainIds) {
|
||||
const auto itr = ekpProxyData->blobMetadataDomainIdCache.find(id);
|
||||
if (itr != ekpProxyData->blobMetadataDomainIdCache.end() && itr->second.isValid()) {
|
||||
metadataDetails.arena().dependsOn(itr->second.metadataDetails.arena());
|
||||
metadataDetails.push_back(metadataDetails.arena(), itr->second.metadataDetails);
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
dbgTrace.get().detail("BMC" + std::to_string(id), "");
|
||||
}
|
||||
++ekpProxyData->blobMetadataCacheHits;
|
||||
} else {
|
||||
lookupDomains.emplace_back(id);
|
||||
++ekpProxyData->blobMetadataCacheMisses;
|
||||
}
|
||||
}
|
||||
|
||||
ekpProxyData->baseCipherDomainIdCacheHits += metadataDetails.size();
|
||||
ekpProxyData->baseCipherDomainIdCacheMisses += lookupDomains.size();
|
||||
|
||||
if (!lookupDomains.empty()) {
|
||||
try {
|
||||
KmsConnBlobMetadataReq kmsReq(lookupDomains, req.debugId);
|
||||
KmsConnBlobMetadataRep kmsRep = wait(kmsConnectorInf.blobMetadataReq.getReply(kmsReq));
|
||||
metadataDetails.arena().dependsOn(kmsRep.metadataDetails.arena());
|
||||
|
||||
for (auto& item : kmsRep.metadataDetails) {
|
||||
metadataDetails.push_back(metadataDetails.arena(), item);
|
||||
|
||||
// Record the fetched cipher details to the local cache for the future references
|
||||
ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item);
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
// {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains
|
||||
dbgTrace.get().detail("BMI" + std::to_string(item.domainId), "");
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (!canReplyWith(e)) {
|
||||
TraceEvent("GetLatestBlobMetadataUnexpectedError", ekpProxyData->myId).error(e);
|
||||
throw;
|
||||
}
|
||||
TraceEvent("GetLatestBlobMetadataExpectedError", ekpProxyData->myId).error(e);
|
||||
req.reply.sendError(e);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
req.reply.send(EKPGetLatestBlobMetadataReply(metadataDetails));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
state UID debugId = deterministicRandom()->randomUniqueID();
|
||||
|
||||
state TraceEvent t("RefreshBlobMetadata_Start", ekpProxyData->myId);
|
||||
t.setMaxEventLength(SERVER_KNOBS->ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH);
|
||||
t.detail("KmsConnInf", kmsConnectorInf.id());
|
||||
t.detail("DebugId", debugId);
|
||||
|
||||
try {
|
||||
KmsConnBlobMetadataReq req;
|
||||
req.debugId = debugId;
|
||||
req.domainIds.reserve(ekpProxyData->blobMetadataDomainIdCache.size());
|
||||
|
||||
for (auto& item : ekpProxyData->blobMetadataDomainIdCache) {
|
||||
req.domainIds.emplace_back(item.first);
|
||||
}
|
||||
KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req));
|
||||
for (auto& item : rep.metadataDetails) {
|
||||
ekpProxyData->insertIntoBlobMetadataCache(item.domainId, item);
|
||||
t.detail("BM" + std::to_string(item.domainId), "");
|
||||
}
|
||||
|
||||
ekpProxyData->blobMetadataRefreshed += rep.metadataDetails.size();
|
||||
|
||||
t.detail("nKeys", rep.metadataDetails.size());
|
||||
} catch (Error& e) {
|
||||
if (!canReplyWith(e)) {
|
||||
TraceEvent("RefreshBlobMetadata_Error").error(e);
|
||||
throw e;
|
||||
}
|
||||
TraceEvent("RefreshBlobMetadata").detail("ErrorCode", e.code());
|
||||
++ekpProxyData->numBlobMetadataRefreshErrors;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
void refreshBlobMetadata(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
|
||||
Future<Void> ignored = refreshBlobMetadataCore(ekpProxyData, kmsConnectorInf);
|
||||
}
|
||||
|
||||
void activateKmsConnector(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
|
||||
if (g_network->isSimulated()) {
|
||||
ekpProxyData->kmsConnector = std::make_unique<SimKmsConnector>();
|
||||
|
@ -438,7 +591,7 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
|
|||
|
||||
activateKmsConnector(self, kmsConnectorInf);
|
||||
|
||||
// Register a recurring task to refresh the cached Encryption keys.
|
||||
// Register a recurring task to refresh the cached Encryption keys and blob metadata.
|
||||
// Approach avoids external RPCs due to EncryptionKey refreshes for the inline write encryption codepath such as:
|
||||
// CPs, Redwood Storage Server node flush etc. The process doing the encryption refresh the cached cipher keys based
|
||||
// on FLOW_KNOB->ENCRYPTION_CIPHER_KEY_CACHE_TTL_SEC interval which is intentionally kept longer than
|
||||
|
@ -449,13 +602,20 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
|
|||
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL,
|
||||
TaskPriority::Worker);
|
||||
|
||||
self->blobMetadataRefresher = recurring([&]() { refreshBlobMetadata(self, kmsConnectorInf); },
|
||||
SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL,
|
||||
TaskPriority::Worker);
|
||||
|
||||
try {
|
||||
loop choose {
|
||||
when(EKPGetBaseCipherKeysByIdsRequest req = waitNext(ekpInterface.getBaseCipherKeysByIds.getFuture())) {
|
||||
wait(getCipherKeysByBaseCipherKeyIds(self, kmsConnectorInf, req));
|
||||
self->addActor.send(getCipherKeysByBaseCipherKeyIds(self, kmsConnectorInf, req));
|
||||
}
|
||||
when(EKPGetLatestBaseCipherKeysRequest req = waitNext(ekpInterface.getLatestBaseCipherKeys.getFuture())) {
|
||||
wait(getLatestCipherKeys(self, kmsConnectorInf, req));
|
||||
self->addActor.send(getLatestCipherKeys(self, kmsConnectorInf, req));
|
||||
}
|
||||
when(EKPGetLatestBlobMetadataRequest req = waitNext(ekpInterface.getLatestBlobMetadata.getFuture())) {
|
||||
self->addActor.send(getLatestBlobMetadata(self, kmsConnectorInf, req));
|
||||
}
|
||||
when(HaltEncryptKeyProxyRequest req = waitNext(ekpInterface.haltEncryptKeyProxy.getFuture())) {
|
||||
TraceEvent("EKP_Halted", self->myId).detail("ReqID", req.requesterID);
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbserver/BlobMetadataUtils.h"
|
||||
|
||||
struct EncryptKeyProxyInterface {
|
||||
constexpr static FileIdentifier file_identifier = 1303419;
|
||||
|
@ -38,6 +39,7 @@ struct EncryptKeyProxyInterface {
|
|||
RequestStream<struct HaltEncryptKeyProxyRequest> haltEncryptKeyProxy;
|
||||
RequestStream<struct EKPGetBaseCipherKeysByIdsRequest> getBaseCipherKeysByIds;
|
||||
RequestStream<struct EKPGetLatestBaseCipherKeysRequest> getLatestBaseCipherKeys;
|
||||
RequestStream<struct EKPGetLatestBlobMetadataRequest> getLatestBlobMetadata;
|
||||
|
||||
EncryptKeyProxyInterface() {}
|
||||
explicit EncryptKeyProxyInterface(const struct LocalityData& loc, UID id) : locality(loc), myId(id) {}
|
||||
|
@ -63,6 +65,8 @@ struct EncryptKeyProxyInterface {
|
|||
waitFailure.getEndpoint().getAdjustedEndpoint(2));
|
||||
getLatestBaseCipherKeys = RequestStream<struct EKPGetLatestBaseCipherKeysRequest>(
|
||||
waitFailure.getEndpoint().getAdjustedEndpoint(3));
|
||||
getLatestBlobMetadata =
|
||||
RequestStream<struct EKPGetLatestBlobMetadataRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,6 +76,7 @@ struct EncryptKeyProxyInterface {
|
|||
streams.push_back(haltEncryptKeyProxy.getReceiver(TaskPriority::DefaultPromiseEndpoint));
|
||||
streams.push_back(getBaseCipherKeysByIds.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(getLatestBaseCipherKeys.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(getLatestBlobMetadata.getReceiver(TaskPriority::Worker));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -170,4 +175,35 @@ struct EKPGetLatestBaseCipherKeysRequest {
|
|||
}
|
||||
};
|
||||
|
||||
// partition and credentials information for a given blob domain
|
||||
|
||||
struct EKPGetLatestBlobMetadataReply {
|
||||
constexpr static FileIdentifier file_identifier = 5761581;
|
||||
Standalone<VectorRef<BlobMetadataDetailsRef>> blobMetadataDetails;
|
||||
|
||||
EKPGetLatestBlobMetadataReply() {}
|
||||
explicit EKPGetLatestBlobMetadataReply(const Standalone<VectorRef<BlobMetadataDetailsRef>>& blobMetadataDetails)
|
||||
: blobMetadataDetails(blobMetadataDetails) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, blobMetadataDetails);
|
||||
}
|
||||
};
|
||||
|
||||
struct EKPGetLatestBlobMetadataRequest {
|
||||
constexpr static FileIdentifier file_identifier = 3821549;
|
||||
std::vector<BlobMetadataDomainId> domainIds;
|
||||
Optional<UID> debugId;
|
||||
ReplyPromise<EKPGetLatestBlobMetadataReply> reply;
|
||||
|
||||
EKPGetLatestBlobMetadataRequest() {}
|
||||
explicit EKPGetLatestBlobMetadataRequest(const std::vector<BlobMetadataDomainId>& ids) : domainIds(ids) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainIds, debugId, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
|
@ -28,12 +28,14 @@
|
|||
#include "flow/Trace.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/network.h"
|
||||
#include "fdbserver/BlobMetadataUtils.h"
|
||||
|
||||
struct KmsConnectorInterface {
|
||||
constexpr static FileIdentifier file_identifier = 2416711;
|
||||
RequestStream<ReplyPromise<Void>> waitFailure;
|
||||
RequestStream<struct KmsConnLookupEKsByKeyIdsReq> ekLookupByIds;
|
||||
RequestStream<struct KmsConnLookupEKsByDomainIdsReq> ekLookupByDomainIds;
|
||||
RequestStream<struct KmsConnBlobMetadataReq> blobMetadataReq;
|
||||
|
||||
KmsConnectorInterface() {}
|
||||
|
||||
|
@ -49,6 +51,8 @@ struct KmsConnectorInterface {
|
|||
RequestStream<struct KmsConnLookupEKsByKeyIdsReq>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
|
||||
ekLookupByDomainIds =
|
||||
RequestStream<struct KmsConnLookupEKsByDomainIdsReq>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
|
||||
blobMetadataReq =
|
||||
RequestStream<struct KmsConnBlobMetadataReq>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,6 +61,7 @@ struct KmsConnectorInterface {
|
|||
streams.push_back(waitFailure.getReceiver());
|
||||
streams.push_back(ekLookupByIds.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(ekLookupByDomainIds.getReceiver(TaskPriority::Worker));
|
||||
streams.push_back(blobMetadataReq.getReceiver(TaskPriority::Worker));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -145,4 +150,32 @@ struct KmsConnLookupEKsByDomainIdsReq {
|
|||
}
|
||||
};
|
||||
|
||||
struct KmsConnBlobMetadataRep {
|
||||
constexpr static FileIdentifier file_identifier = 2919714;
|
||||
Standalone<VectorRef<BlobMetadataDetailsRef>> metadataDetails;
|
||||
|
||||
KmsConnBlobMetadataRep() {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, metadataDetails);
|
||||
}
|
||||
};
|
||||
|
||||
struct KmsConnBlobMetadataReq {
|
||||
constexpr static FileIdentifier file_identifier = 3913147;
|
||||
std::vector<BlobMetadataDomainId> domainIds;
|
||||
Optional<UID> debugId;
|
||||
ReplyPromise<KmsConnBlobMetadataRep> reply;
|
||||
|
||||
KmsConnBlobMetadataReq() {}
|
||||
explicit KmsConnBlobMetadataReq(const std::vector<BlobMetadataDomainId>& ids, Optional<UID> dbgId)
|
||||
: domainIds(ids), debugId(dbgId) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainIds, debugId, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -808,6 +808,11 @@ ACTOR Future<Void> connectorCore_impl(KmsConnectorInterface interf) {
|
|||
byDomainIdReq.reply.sendError(e);
|
||||
}
|
||||
}
|
||||
when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) {
|
||||
// TODO: implement!
|
||||
TraceEvent(SevWarn, "RESTKMSBlobMetadataNotImplemented!", interf.id());
|
||||
req.reply.sendError(not_implemented());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,11 @@ struct SimEncryptKeyCtx {
|
|||
explicit SimEncryptKeyCtx(EncryptCipherBaseKeyId kId, const char* data) : id(kId), key(data, AES_256_KEY_LENGTH) {}
|
||||
};
|
||||
|
||||
struct SimKmsConnectorContext {
|
||||
// The credentials may be allowed to change, but the storage locations and partitioning cannot change, even across
|
||||
// restarts. Keep it as global static state in simulation.
|
||||
static std::unordered_map<BlobMetadataDomainId, Standalone<BlobMetadataDetailsRef>> simBlobMetadataStore;
|
||||
|
||||
struct SimKmsConnectorContext : NonCopyable, ReferenceCounted<SimKmsConnectorContext> {
|
||||
uint32_t maxEncryptionKeys;
|
||||
std::unordered_map<EncryptCipherBaseKeyId, std::unique_ptr<SimEncryptKeyCtx>> simEncryptKeyStore;
|
||||
|
||||
|
@ -66,90 +70,155 @@ struct SimKmsConnectorContext {
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> ekLookupByIds(Reference<SimKmsConnectorContext> ctx,
|
||||
KmsConnectorInterface interf,
|
||||
KmsConnLookupEKsByKeyIdsReq req) {
|
||||
state KmsConnLookupEKsByKeyIdsRep rep;
|
||||
state bool success = true;
|
||||
state Optional<TraceEvent> dbgKIdTrace =
|
||||
req.debugId.present() ? TraceEvent("SimKmsGetByKeyIds", interf.id()) : Optional<TraceEvent>();
|
||||
|
||||
if (dbgKIdTrace.present()) {
|
||||
dbgKIdTrace.get().setMaxEventLength(100000);
|
||||
dbgKIdTrace.get().detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
// Lookup corresponding EncryptKeyCtx for input keyId
|
||||
for (const auto& item : req.encryptKeyIds) {
|
||||
const auto& itr = ctx->simEncryptKeyStore.find(item.first);
|
||||
if (itr != ctx->simEncryptKeyStore.end()) {
|
||||
rep.cipherKeyDetails.emplace_back(
|
||||
item.second, itr->first, StringRef(rep.arena, itr->second.get()->key), rep.arena);
|
||||
|
||||
if (dbgKIdTrace.present()) {
|
||||
// {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains
|
||||
dbgKIdTrace.get().detail(
|
||||
getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, item.second, itr->first), "");
|
||||
}
|
||||
} else {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
|
||||
success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
|
||||
KmsConnectorInterface interf,
|
||||
KmsConnLookupEKsByDomainIdsReq req) {
|
||||
state KmsConnLookupEKsByDomainIdsRep rep;
|
||||
state bool success = true;
|
||||
state Optional<TraceEvent> dbgDIdTrace =
|
||||
req.debugId.present() ? TraceEvent("SimKmsGetsByDomIds", interf.id()) : Optional<TraceEvent>();
|
||||
|
||||
if (dbgDIdTrace.present()) {
|
||||
dbgDIdTrace.get().detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This
|
||||
// would mean multiple domains gets mapped to the same encryption key which is fine, the
|
||||
// EncryptKeyStore guarantees that keyId -> plaintext encryptKey mapping is idempotent.
|
||||
for (EncryptCipherDomainId domainId : req.encryptDomainIds) {
|
||||
EncryptCipherBaseKeyId keyId = 1 + abs(domainId) % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
|
||||
const auto& itr = ctx->simEncryptKeyStore.find(keyId);
|
||||
if (itr != ctx->simEncryptKeyStore.end()) {
|
||||
rep.cipherKeyDetails.emplace_back(domainId, keyId, StringRef(itr->second.get()->key), rep.arena);
|
||||
if (dbgDIdTrace.present()) {
|
||||
// {encryptId, baseCipherId} forms a unique tuple across encryption domains
|
||||
dbgDIdTrace.get().detail(getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, domainId, keyId), "");
|
||||
}
|
||||
} else {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
|
||||
success ? req.reply.send(rep) : req.reply.sendError(encrypt_key_not_found());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
static Standalone<BlobMetadataDetailsRef> createBlobMetadata(BlobMetadataDomainId domainId) {
|
||||
Standalone<BlobMetadataDetailsRef> metadata;
|
||||
metadata.domainId = domainId;
|
||||
// 0 == no partition, 1 == suffix partitioned, 2 == storage location partitioned
|
||||
int type = deterministicRandom()->randomInt(0, 3);
|
||||
int partitionCount = (type == 0) ? 0 : deterministicRandom()->randomInt(2, 12);
|
||||
if (type == 0) {
|
||||
// single storage location
|
||||
metadata.base = StringRef(metadata.arena(), "file://fdbblob/" + std::to_string(domainId) + "/");
|
||||
}
|
||||
if (type == 1) {
|
||||
// simulate hash prefixing in s3
|
||||
metadata.base = StringRef(metadata.arena(), "file://fdbblob/");
|
||||
for (int i = 0; i < partitionCount; i++) {
|
||||
metadata.partitions.push_back_deep(metadata.arena(),
|
||||
deterministicRandom()->randomUniqueID().shortString() + "-" +
|
||||
std::to_string(domainId) + "/");
|
||||
}
|
||||
}
|
||||
if (type == 2) {
|
||||
// simulate separate storage location per partition
|
||||
for (int i = 0; i < partitionCount; i++) {
|
||||
metadata.partitions.push_back_deep(
|
||||
metadata.arena(), "file://fdbblob" + std::to_string(domainId) + "_" + std::to_string(i) + "/");
|
||||
}
|
||||
}
|
||||
return metadata;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobMetadataReq req) {
|
||||
state KmsConnBlobMetadataRep rep;
|
||||
state Optional<TraceEvent> dbgDIdTrace =
|
||||
req.debugId.present() ? TraceEvent("SimKmsBlobMetadataLookup", interf.id()) : Optional<TraceEvent>();
|
||||
if (dbgDIdTrace.present()) {
|
||||
dbgDIdTrace.get().detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
for (BlobMetadataDomainId domainId : req.domainIds) {
|
||||
auto it = simBlobMetadataStore.find(domainId);
|
||||
if (it == simBlobMetadataStore.end()) {
|
||||
// construct new blob metadata
|
||||
it = simBlobMetadataStore.insert({ domainId, createBlobMetadata(domainId) }).first;
|
||||
}
|
||||
rep.metadataDetails.arena().dependsOn(it->second.arena());
|
||||
rep.metadataDetails.push_back(rep.metadataDetails.arena(), it->second);
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
|
||||
req.reply.send(rep);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> simKmsConnectorCore_impl(KmsConnectorInterface interf) {
|
||||
TraceEvent("SimEncryptKmsProxy_Init", interf.id()).detail("MaxEncryptKeys", SERVER_KNOBS->SIM_KMS_MAX_KEYS);
|
||||
|
||||
state bool success = true;
|
||||
state std::unique_ptr<SimKmsConnectorContext> ctx =
|
||||
std::make_unique<SimKmsConnectorContext>(SERVER_KNOBS->SIM_KMS_MAX_KEYS);
|
||||
state Reference<SimKmsConnectorContext> ctx = makeReference<SimKmsConnectorContext>(SERVER_KNOBS->SIM_KMS_MAX_KEYS);
|
||||
|
||||
ASSERT_EQ(ctx->simEncryptKeyStore.size(), SERVER_KNOBS->SIM_KMS_MAX_KEYS);
|
||||
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
state Future<Void> collection = actorCollection(addActor.getFuture());
|
||||
|
||||
loop {
|
||||
choose {
|
||||
when(KmsConnLookupEKsByKeyIdsReq req = waitNext(interf.ekLookupByIds.getFuture())) {
|
||||
state KmsConnLookupEKsByKeyIdsReq keysByIdsReq = req;
|
||||
state KmsConnLookupEKsByKeyIdsRep keysByIdsRep;
|
||||
state Optional<TraceEvent> dbgKIdTrace = keysByIdsReq.debugId.present()
|
||||
? TraceEvent("SimKmsGetByKeyIds", interf.id())
|
||||
: Optional<TraceEvent>();
|
||||
|
||||
if (dbgKIdTrace.present()) {
|
||||
dbgKIdTrace.get().setMaxEventLength(100000);
|
||||
dbgKIdTrace.get().detail("DbgId", keysByIdsReq.debugId.get());
|
||||
}
|
||||
|
||||
// Lookup corresponding EncryptKeyCtx for input keyId
|
||||
for (const auto& item : req.encryptKeyIds) {
|
||||
const auto& itr = ctx->simEncryptKeyStore.find(item.first);
|
||||
if (itr != ctx->simEncryptKeyStore.end()) {
|
||||
keysByIdsRep.cipherKeyDetails.emplace_back(
|
||||
item.second,
|
||||
itr->first,
|
||||
StringRef(keysByIdsRep.arena, itr->second.get()->key),
|
||||
keysByIdsRep.arena);
|
||||
|
||||
if (dbgKIdTrace.present()) {
|
||||
// {encryptDomainId, baseCipherId} forms a unique tuple across encryption domains
|
||||
dbgKIdTrace.get().detail(
|
||||
getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, item.second, itr->first), "");
|
||||
}
|
||||
} else {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
|
||||
success ? keysByIdsReq.reply.send(keysByIdsRep) : keysByIdsReq.reply.sendError(encrypt_key_not_found());
|
||||
addActor.send(ekLookupByIds(ctx, interf, req));
|
||||
}
|
||||
when(KmsConnLookupEKsByDomainIdsReq req = waitNext(interf.ekLookupByDomainIds.getFuture())) {
|
||||
state KmsConnLookupEKsByDomainIdsReq keysByDomainIdReq = req;
|
||||
state KmsConnLookupEKsByDomainIdsRep keysByDomainIdRep;
|
||||
state Optional<TraceEvent> dbgDIdTrace = keysByDomainIdReq.debugId.present()
|
||||
? TraceEvent("SimKmsGetsByDomIds", interf.id())
|
||||
: Optional<TraceEvent>();
|
||||
|
||||
if (dbgDIdTrace.present()) {
|
||||
dbgDIdTrace.get().detail("DbgId", keysByDomainIdReq.debugId.get());
|
||||
}
|
||||
|
||||
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This
|
||||
// would mean multiple domains gets mapped to the same encryption key which is fine, the
|
||||
// EncryptKeyStore guarantees that keyId -> plaintext encryptKey mapping is idempotent.
|
||||
for (EncryptCipherDomainId domainId : req.encryptDomainIds) {
|
||||
EncryptCipherBaseKeyId keyId = 1 + abs(domainId) % SERVER_KNOBS->SIM_KMS_MAX_KEYS;
|
||||
const auto& itr = ctx->simEncryptKeyStore.find(keyId);
|
||||
if (itr != ctx->simEncryptKeyStore.end()) {
|
||||
keysByDomainIdRep.cipherKeyDetails.emplace_back(
|
||||
domainId, keyId, StringRef(itr->second.get()->key), keysByDomainIdRep.arena);
|
||||
|
||||
if (dbgDIdTrace.present()) {
|
||||
// {encryptId, baseCipherId} forms a unique tuple across encryption domains
|
||||
dbgDIdTrace.get().detail(
|
||||
getEncryptDbgTraceKey(ENCRYPT_DBG_TRACE_RESULT_PREFIX, domainId, keyId), "");
|
||||
}
|
||||
} else {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
|
||||
success ? keysByDomainIdReq.reply.send(keysByDomainIdRep)
|
||||
: keysByDomainIdReq.reply.sendError(encrypt_key_not_found());
|
||||
addActor.send(ekLookupByDomainIds(ctx, interf, req));
|
||||
}
|
||||
when(KmsConnBlobMetadataReq req = waitNext(interf.blobMetadataReq.getFuture())) {
|
||||
addActor.send(blobMetadataLookup(interf, req));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ storageEngineExcludeTypes = [4]
|
|||
|
||||
[[knobs]]
|
||||
bg_range_source = "tenant"
|
||||
bg_metadata_source = "tenant"
|
||||
enable_encryption = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleCorrectness'
|
||||
|
|
|
@ -7,6 +7,8 @@ storageEngineExcludeTypes = [4]
|
|||
|
||||
[[knobs]]
|
||||
bg_range_source = "tenant"
|
||||
bg_metadata_source = "tenant"
|
||||
enable_encryption = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleCorrectness'
|
||||
|
|
Loading…
Reference in New Issue