made blob metadata load lazily from EKP (#10463)

This commit is contained in:
Josh Slocum 2023-06-15 13:36:52 -05:00 committed by GitHub
parent 01ce1ab24d
commit 367088b831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 24 deletions

View File

@ -468,14 +468,19 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Blo
// FIXME: if one tenant gets an error, don't kill whole process
state double startTime = now();
state UID prevEKPID = UID();
state Future<EKPGetLatestBlobMetadataReply> requestFuture = Never();
loop {
try {
Future<EKPGetLatestBlobMetadataReply> requestFuture;
if (self->dbInfo.isValid() && self->dbInfo->get().client.encryptKeyProxy.present()) {
req.reply.reset();
requestFuture = brokenPromiseToNever(
self->dbInfo->get().client.encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
if (self->dbInfo->get().client.encryptKeyProxy.get().myId != prevEKPID) {
prevEKPID = self->dbInfo->get().client.encryptKeyProxy.get().myId;
req.reply.reset();
requestFuture = brokenPromiseToNever(
self->dbInfo->get().client.encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
}
} else {
prevEKPID = UID();
requestFuture = Never();
}
choose {
@ -517,6 +522,11 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Blo
for (auto& id : missingIds) {
req.domainIds.push_back(id);
}
// redo request
// reset request on error
prevEKPID = UID();
requestFuture = Never();
}
when(wait(self->dbInfo->onChange())) {
// reset retry sleep
@ -529,6 +539,9 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Blo
}
CODE_PROBE(true, "blob metadata fetch error");
TraceEvent(SevWarn, "BlobMetadataFetchError").errorUnsuppressed(e).suppressFor(30.0);
// need to reset request on error
prevEKPID = UID();
requestFuture = Never();
}
wait(delay(retrySleep));
retrySleep = std::min(10.0, retrySleep * 1.5);
@ -543,22 +556,16 @@ Future<Void> loadBlobMetadataForTenant(BGTenantMap* self, BlobMetadataDomainId d
// list of tenants that may or may not already exist
void BGTenantMap::addTenants(std::vector<std::pair<int64_t, TenantMapEntry>> tenants) {
std::vector<BlobMetadataDomainId> tenantsToLoad;
for (auto entry : tenants) {
if (tenantInfoById.insert({ entry.first, entry.second }).second) {
auto r = makeReference<GranuleTenantData>(entry.second);
tenantData.insert(KeyRangeRef(entry.second.prefix, entry.second.prefix.withSuffix(normalKeys.end)), r);
if (SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
r->startedLoadingBStore = true;
r->bstoreLoaded.send(Void());
} else {
tenantsToLoad.push_back(entry.second.id);
}
}
}
if (!tenantsToLoad.empty()) {
addActor.send(loadBlobMetadataForTenants(this, tenantsToLoad));
}
}
// TODO: implement
@ -582,28 +589,39 @@ ACTOR Future<Reference<GranuleTenantData>> getDataForGranuleActor(BGTenantMap* s
state int loopCount = 0;
loop {
loopCount++;
auto tenant = self->tenantData.rangeContaining(keyRange.begin);
ASSERT(tenant.begin() <= keyRange.begin);
ASSERT(tenant.end() >= keyRange.end);
auto tenantRange = self->tenantData.rangeContaining(keyRange.begin);
ASSERT(tenantRange.begin() <= keyRange.begin);
ASSERT(tenantRange.end() >= keyRange.end);
state Reference<GranuleTenantData> tenant = tenantRange.cvalue();
if (!tenant.cvalue().isValid() || !tenant.cvalue()->bstore.isValid()) {
return tenant.cvalue();
} else if (tenant.cvalue()->bstore->isExpired()) {
if (!tenant.isValid()) {
return tenant;
} else if (!tenant->startedLoadingBStore || (tenant->bstore.isValid() && tenant->bstore->isExpired())) {
tenant->startedLoadingBStore = true;
CODE_PROBE(true, "re-fetching expired blob metadata");
// fetch again
Future<Void> reload = loadBlobMetadataForTenant(self, tenant.cvalue()->entry.id);
wait(reload);
if (loopCount > 1) {
// even if this actor gets cancelled, we marked it as startedLoading, so finish the load in the actor
// collection
Future<Void> loadFuture = loadBlobMetadataForTenant(self, tenant->entry.id);
self->addActor.send(loadFuture);
wait(loadFuture);
ASSERT(tenant->bstore.isValid());
if (!tenant->bstore->isExpired()) {
return tenant;
}
if (tenant->bstore->isExpired() && loopCount > 1) {
TraceEvent(SevWarn, "BlobMetadataStillExpired").suppressFor(5.0).detail("LoopCount", loopCount);
wait(delay(0.001));
}
} else {
// handle refresh in background if tenant needs refres
if (tenant.cvalue()->bstore->needsRefresh()) {
Future<Void> reload = loadBlobMetadataForTenant(self, tenant.cvalue()->entry.id);
if (tenant->bstore.isValid() && tenant->bstore->needsRefresh()) {
Future<Void> reload = loadBlobMetadataForTenant(self, tenant->entry.id);
self->addActor.send(reload);
}
return tenant.cvalue();
return tenant;
}
}
}

View File

@ -4215,6 +4215,11 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
req.managerSeqno);
}
TraceEvent("GranuleOpenStart", bwData->id)
.detail("Granule", req.keyRange)
.detail("Epoch", req.managerEpoch)
.detail("Seqno", req.managerSeqno);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);

View File

@ -108,12 +108,14 @@ ACTOR Future<ForcedPurgeState> getForcePurgedState(Transaction* tr, KeyRange key
struct GranuleTenantData : NonCopyable, ReferenceCounted<GranuleTenantData> {
TenantMapEntry entry;
Reference<BlobConnectionProvider> bstore;
bool startedLoadingBStore = false;
Promise<Void> bstoreLoaded;
GranuleTenantData() {}
GranuleTenantData(TenantMapEntry entry) : entry(entry) {}
void updateBStore(const BlobMetadataDetailsRef& metadata) {
ASSERT(startedLoadingBStore);
if (bstoreLoaded.canBeSet()) {
// new
bstore = BlobConnectionProvider::newBlobConnectionProvider(metadata);