Adding domain name to blob metadata requests (#8415)
This commit is contained in:
parent
8e616009ce
commit
06d9ebd620
|
@ -25,6 +25,8 @@
|
|||
#include "flow/FileIdentifier.h"
|
||||
|
||||
using BlobMetadataDomainId = int64_t;
|
||||
using BlobMetadataDomainNameRef = StringRef;
|
||||
using BlobMetadataDomainName = Standalone<BlobMetadataDomainNameRef>;
|
||||
|
||||
/*
|
||||
* There are 3 cases for blob metadata.
|
||||
|
@ -38,26 +40,31 @@ using BlobMetadataDomainId = int64_t;
|
|||
struct BlobMetadataDetailsRef {
|
||||
constexpr static FileIdentifier file_identifier = 6685526;
|
||||
BlobMetadataDomainId domainId;
|
||||
BlobMetadataDomainNameRef domainName;
|
||||
Optional<StringRef> base;
|
||||
VectorRef<StringRef> partitions;
|
||||
|
||||
BlobMetadataDetailsRef() {}
|
||||
BlobMetadataDetailsRef(Arena& arena, const BlobMetadataDetailsRef& from)
|
||||
: domainId(from.domainId), partitions(arena, from.partitions) {
|
||||
: domainId(from.domainId), domainName(arena, from.domainName), partitions(arena, from.partitions) {
|
||||
if (from.base.present()) {
|
||||
base = StringRef(arena, from.base.get());
|
||||
}
|
||||
}
|
||||
explicit BlobMetadataDetailsRef(BlobMetadataDomainId domainId,
|
||||
BlobMetadataDomainNameRef domainName,
|
||||
Optional<StringRef> base,
|
||||
VectorRef<StringRef> partitions)
|
||||
: domainId(domainId), base(base), partitions(partitions) {}
|
||||
: domainId(domainId), domainName(domainName), base(base), partitions(partitions) {}
|
||||
|
||||
int expectedSize() const { return sizeof(BlobMetadataDetailsRef) + partitions.expectedSize(); }
|
||||
int expectedSize() const {
|
||||
return sizeof(BlobMetadataDetailsRef) + domainName.size() + (base.present() ? base.get().size() : 0) +
|
||||
partitions.expectedSize();
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainId, base, partitions);
|
||||
serializer(ar, domainId, domainName, base, partitions);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -197,6 +197,7 @@ struct EKPGetLatestBaseCipherKeysReply {
|
|||
}
|
||||
};
|
||||
|
||||
// TODO: also used for blob metadata, fix name
|
||||
struct EKPGetLatestCipherKeysRequestInfo {
|
||||
constexpr static FileIdentifier file_identifier = 2180516;
|
||||
// Encryption domain identifier
|
||||
|
@ -206,7 +207,7 @@ struct EKPGetLatestCipherKeysRequestInfo {
|
|||
EncryptCipherDomainNameRef domainName;
|
||||
|
||||
EKPGetLatestCipherKeysRequestInfo() : domainId(INVALID_ENCRYPT_DOMAIN_ID) {}
|
||||
EKPGetLatestCipherKeysRequestInfo(const EncryptCipherDomainId dId, StringRef name, Arena& arena)
|
||||
explicit EKPGetLatestCipherKeysRequestInfo(Arena& arena, const EncryptCipherDomainId dId, StringRef name)
|
||||
: domainId(dId), domainName(StringRef(arena, name)) {}
|
||||
|
||||
bool operator==(const EKPGetLatestCipherKeysRequestInfo& info) const {
|
||||
|
@ -261,16 +262,15 @@ struct EKPGetLatestBlobMetadataReply {
|
|||
|
||||
struct EKPGetLatestBlobMetadataRequest {
|
||||
constexpr static FileIdentifier file_identifier = 3821549;
|
||||
std::vector<BlobMetadataDomainId> domainIds;
|
||||
Standalone<VectorRef<EKPGetLatestCipherKeysRequestInfo>> domainInfos;
|
||||
Optional<UID> debugId;
|
||||
ReplyPromise<EKPGetLatestBlobMetadataReply> reply;
|
||||
|
||||
EKPGetLatestBlobMetadataRequest() {}
|
||||
explicit EKPGetLatestBlobMetadataRequest(const std::vector<BlobMetadataDomainId>& ids) : domainIds(ids) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainIds, debugId, reply);
|
||||
serializer(ar, domainInfos, debugId, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getL
|
|||
cipherKeys[domain.first] = cachedCipherKey;
|
||||
} else {
|
||||
request.encryptDomainInfos.emplace_back(
|
||||
domain.first /*domainId*/, domain.second /*domainName*/, request.arena);
|
||||
request.arena, domain.first /*domainId*/, domain.second /*domainName*/);
|
||||
}
|
||||
}
|
||||
if (request.encryptDomainInfos.empty()) {
|
||||
|
|
|
@ -451,12 +451,14 @@ TEST_CASE("/blobgranule/server/common/granulesummary") {
|
|||
}
|
||||
|
||||
// FIXME: if credentials can expire, refresh periodically
|
||||
ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<TenantMapEntry> tenantMapEntries) {
|
||||
ACTOR Future<Void> loadBlobMetadataForTenants(
|
||||
BGTenantMap* self,
|
||||
std::vector<std::pair<BlobMetadataDomainId, BlobMetadataDomainName>> tenantsToLoad) {
|
||||
ASSERT(SERVER_KNOBS->BG_METADATA_SOURCE == "tenant");
|
||||
ASSERT(!tenantMapEntries.empty());
|
||||
state std::vector<BlobMetadataDomainId> domainIds;
|
||||
for (auto& entry : tenantMapEntries) {
|
||||
domainIds.push_back(entry.id);
|
||||
ASSERT(!tenantsToLoad.empty());
|
||||
state EKPGetLatestBlobMetadataRequest req;
|
||||
for (auto& tenant : tenantsToLoad) {
|
||||
req.domainInfos.emplace_back_deep(req.domainInfos.arena(), tenant.first, StringRef(tenant.second));
|
||||
}
|
||||
|
||||
// FIXME: if one tenant gets an error, don't kill whole process
|
||||
|
@ -464,8 +466,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Ten
|
|||
loop {
|
||||
Future<EKPGetLatestBlobMetadataReply> requestFuture;
|
||||
if (self->dbInfo.isValid() && self->dbInfo->get().encryptKeyProxy.present()) {
|
||||
EKPGetLatestBlobMetadataRequest req;
|
||||
req.domainIds = domainIds;
|
||||
req.reply.reset();
|
||||
requestFuture =
|
||||
brokenPromiseToNever(self->dbInfo->get().encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
|
||||
} else {
|
||||
|
@ -473,7 +474,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Ten
|
|||
}
|
||||
choose {
|
||||
when(EKPGetLatestBlobMetadataReply rep = wait(requestFuture)) {
|
||||
ASSERT(rep.blobMetadataDetails.size() == domainIds.size());
|
||||
ASSERT(rep.blobMetadataDetails.size() == req.domainInfos.size());
|
||||
// not guaranteed to be in same order in the request as the response
|
||||
for (auto& metadata : rep.blobMetadataDetails) {
|
||||
auto info = self->tenantInfoById.find(metadata.domainId);
|
||||
|
@ -493,7 +494,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Ten
|
|||
|
||||
// list of tenants that may or may not already exist
|
||||
void BGTenantMap::addTenants(std::vector<std::pair<TenantName, TenantMapEntry>> tenants) {
|
||||
std::vector<TenantMapEntry> tenantsToLoad;
|
||||
std::vector<std::pair<BlobMetadataDomainId, BlobMetadataDomainName>> tenantsToLoad;
|
||||
for (auto entry : tenants) {
|
||||
if (tenantInfoById.insert({ entry.second.id, entry.second }).second) {
|
||||
auto r = makeReference<GranuleTenantData>(entry.first, entry.second);
|
||||
|
@ -501,7 +502,7 @@ void BGTenantMap::addTenants(std::vector<std::pair<TenantName, TenantMapEntry>>
|
|||
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
|
||||
r->bstoreLoaded.send(Void());
|
||||
} else {
|
||||
tenantsToLoad.push_back(entry.second);
|
||||
tenantsToLoad.push_back({ entry.second.id, entry.first });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -690,44 +690,43 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
}
|
||||
|
||||
// Dedup the requested domainIds.
|
||||
std::unordered_set<BlobMetadataDomainId> dedupedDomainIds;
|
||||
for (auto id : req.domainIds) {
|
||||
dedupedDomainIds.emplace(id);
|
||||
std::unordered_map<BlobMetadataDomainId, BlobMetadataDomainName> dedupedDomainInfos;
|
||||
for (auto info : req.domainInfos) {
|
||||
dedupedDomainInfos.insert({ info.domainId, info.domainName });
|
||||
}
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
dbgTrace.get().detail("NKeys", dedupedDomainIds.size());
|
||||
for (BlobMetadataDomainId id : dedupedDomainIds) {
|
||||
dbgTrace.get().detail("NKeys", dedupedDomainInfos.size());
|
||||
for (auto& info : dedupedDomainInfos) {
|
||||
// log domainids queried
|
||||
dbgTrace.get().detail("BMQ" + std::to_string(id), "");
|
||||
dbgTrace.get().detail("BMQ" + std::to_string(info.first), "");
|
||||
}
|
||||
}
|
||||
|
||||
// First, check if the requested information is already cached by the server.
|
||||
// Ensure the cached information is within SERVER_KNOBS->BLOB_METADATA_CACHE_TTL time window.
|
||||
std::vector<BlobMetadataDomainId> lookupDomains;
|
||||
for (BlobMetadataDomainId id : dedupedDomainIds) {
|
||||
const auto itr = ekpProxyData->blobMetadataDomainIdCache.find(id);
|
||||
state KmsConnBlobMetadataReq kmsReq;
|
||||
kmsReq.debugId = req.debugId;
|
||||
|
||||
for (auto& info : dedupedDomainInfos) {
|
||||
const auto itr = ekpProxyData->blobMetadataDomainIdCache.find(info.first);
|
||||
if (itr != ekpProxyData->blobMetadataDomainIdCache.end() && itr->second.isValid()) {
|
||||
metadataDetails.arena().dependsOn(itr->second.metadataDetails.arena());
|
||||
metadataDetails.push_back(metadataDetails.arena(), itr->second.metadataDetails);
|
||||
|
||||
if (dbgTrace.present()) {
|
||||
dbgTrace.get().detail("BMC" + std::to_string(id), "");
|
||||
dbgTrace.get().detail("BMC" + std::to_string(info.first), "");
|
||||
}
|
||||
++ekpProxyData->blobMetadataCacheHits;
|
||||
} else {
|
||||
lookupDomains.emplace_back(id);
|
||||
++ekpProxyData->blobMetadataCacheMisses;
|
||||
kmsReq.domainInfos.emplace_back(kmsReq.domainInfos.arena(), info.first, info.second);
|
||||
}
|
||||
}
|
||||
|
||||
ekpProxyData->baseCipherDomainIdCacheHits += metadataDetails.size();
|
||||
ekpProxyData->baseCipherDomainIdCacheMisses += lookupDomains.size();
|
||||
ekpProxyData->blobMetadataCacheHits += metadataDetails.size();
|
||||
|
||||
if (!lookupDomains.empty()) {
|
||||
if (!kmsReq.domainInfos.empty()) {
|
||||
ekpProxyData->blobMetadataCacheMisses += kmsReq.domainInfos.size();
|
||||
try {
|
||||
KmsConnBlobMetadataReq kmsReq(lookupDomains, req.debugId);
|
||||
state double startTime = now();
|
||||
KmsConnBlobMetadataRep kmsRep = wait(kmsConnectorInf.blobMetadataReq.getReply(kmsReq));
|
||||
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
|
||||
|
@ -755,7 +754,6 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
}
|
||||
|
||||
req.reply.send(EKPGetLatestBlobMetadataReply(metadataDetails));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -771,10 +769,11 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
|
|||
try {
|
||||
KmsConnBlobMetadataReq req;
|
||||
req.debugId = debugId;
|
||||
req.domainIds.reserve(ekpProxyData->blobMetadataDomainIdCache.size());
|
||||
req.domainInfos.reserve(req.domainInfos.arena(), ekpProxyData->blobMetadataDomainIdCache.size());
|
||||
|
||||
// TODO add refresh + expire timestamp and filter to only ones that need refreshing
|
||||
for (auto& item : ekpProxyData->blobMetadataDomainIdCache) {
|
||||
req.domainIds.emplace_back(item.first);
|
||||
req.domainInfos.emplace_back(req.domainInfos.arena(), item.first, item.second.metadataDetails.domainName);
|
||||
}
|
||||
state double startTime = now();
|
||||
KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req));
|
||||
|
|
|
@ -185,10 +185,13 @@ ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
|
|||
|
||||
return Void();
|
||||
}
|
||||
|
||||
static Standalone<BlobMetadataDetailsRef> createBlobMetadata(BlobMetadataDomainId domainId) {
|
||||
// TODO: switch this to use bg_url instead of hardcoding file://fdbblob, so it works as FDBPerfKmsConnector
|
||||
// FIXME: make this (more) deterministic outside of simulation for FDBPerfKmsConnector
|
||||
static Standalone<BlobMetadataDetailsRef> createBlobMetadata(BlobMetadataDomainId domainId,
|
||||
BlobMetadataDomainName domainName) {
|
||||
Standalone<BlobMetadataDetailsRef> metadata;
|
||||
metadata.domainId = domainId;
|
||||
metadata.domainName = domainName;
|
||||
// 0 == no partition, 1 == suffix partitioned, 2 == storage location partitioned
|
||||
int type = deterministicRandom()->randomInt(0, 3);
|
||||
int partitionCount = (type == 0) ? 0 : deterministicRandom()->randomInt(2, 12);
|
||||
|
@ -234,17 +237,19 @@ ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobM
|
|||
dbgDIdTrace.get().detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
for (BlobMetadataDomainId domainId : req.domainIds) {
|
||||
auto it = simBlobMetadataStore.find(domainId);
|
||||
for (auto const& domainInfo : req.domainInfos) {
|
||||
auto it = simBlobMetadataStore.find(domainInfo.domainId);
|
||||
if (it == simBlobMetadataStore.end()) {
|
||||
// construct new blob metadata
|
||||
it = simBlobMetadataStore.insert({ domainId, createBlobMetadata(domainId) }).first;
|
||||
it = simBlobMetadataStore
|
||||
.insert({ domainInfo.domainId, createBlobMetadata(domainInfo.domainId, domainInfo.domainName) })
|
||||
.first;
|
||||
}
|
||||
rep.metadataDetails.arena().dependsOn(it->second.arena());
|
||||
rep.metadataDetails.push_back(rep.metadataDetails.arena(), it->second);
|
||||
}
|
||||
|
||||
wait(delayJittered(1.0)); // simulate network delay
|
||||
wait(delay(deterministicRandom()->random01())); // simulate network delay
|
||||
|
||||
req.reply.send(rep);
|
||||
|
||||
|
|
|
@ -232,17 +232,15 @@ struct KmsConnBlobMetadataRep {
|
|||
|
||||
struct KmsConnBlobMetadataReq {
|
||||
constexpr static FileIdentifier file_identifier = 3913147;
|
||||
std::vector<BlobMetadataDomainId> domainIds;
|
||||
Standalone<VectorRef<KmsConnLookupDomainIdsReqInfoRef>> domainInfos;
|
||||
Optional<UID> debugId;
|
||||
ReplyPromise<KmsConnBlobMetadataRep> reply;
|
||||
|
||||
KmsConnBlobMetadataReq() {}
|
||||
explicit KmsConnBlobMetadataReq(const std::vector<BlobMetadataDomainId>& ids, Optional<UID> dbgId)
|
||||
: domainIds(ids), debugId(dbgId) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainIds, debugId, reply);
|
||||
serializer(ar, domainInfos, debugId, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
|
|||
for (int i = 0; i < self->numDomains / 2; i++) {
|
||||
const EncryptCipherDomainId domainId = self->minDomainId + i;
|
||||
self->domainInfos.emplace_back(
|
||||
EKPGetLatestCipherKeysRequestInfo(domainId, StringRef(std::to_string(domainId)), self->arena));
|
||||
EKPGetLatestCipherKeysRequestInfo(self->arena, domainId, StringRef(std::to_string(domainId))));
|
||||
}
|
||||
|
||||
state int nAttempts = 0;
|
||||
|
@ -127,14 +127,14 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
|
|||
for (int i = 0; i < expectedHits; i++) {
|
||||
const EncryptCipherDomainId domainId = self->minDomainId + i;
|
||||
self->domainInfos.emplace_back(
|
||||
EKPGetLatestCipherKeysRequestInfo(domainId, StringRef(std::to_string(domainId)), self->arena));
|
||||
EKPGetLatestCipherKeysRequestInfo(self->arena, domainId, StringRef(std::to_string(domainId))));
|
||||
}
|
||||
|
||||
expectedMisses = deterministicRandom()->randomInt(1, self->numDomains / 2);
|
||||
for (int i = 0; i < expectedMisses; i++) {
|
||||
const EncryptCipherDomainId domainId = self->minDomainId + i + self->numDomains / 2 + 1;
|
||||
self->domainInfos.emplace_back(
|
||||
EKPGetLatestCipherKeysRequestInfo(domainId, StringRef(std::to_string(domainId)), self->arena));
|
||||
EKPGetLatestCipherKeysRequestInfo(self->arena, domainId, StringRef(std::to_string(domainId))));
|
||||
}
|
||||
|
||||
state int nAttempts = 0;
|
||||
|
@ -191,7 +191,7 @@ struct EncryptKeyProxyTestWorkload : TestWorkload {
|
|||
for (int i = 0; i < self->numDomains; i++) {
|
||||
const EncryptCipherDomainId domainId = self->minDomainId + i;
|
||||
self->domainInfos.emplace_back(
|
||||
EKPGetLatestCipherKeysRequestInfo(domainId, StringRef(std::to_string(domainId)), self->arena));
|
||||
EKPGetLatestCipherKeysRequestInfo(self->arena, domainId, StringRef(std::to_string(domainId))));
|
||||
}
|
||||
|
||||
EKPGetLatestBaseCipherKeysRequest req;
|
||||
|
|
Loading…
Reference in New Issue