Blob metadata refresh (#8456)
* Adding EKP refresh of blob metadata * Adding re-fetching blob metadata from BGTenantMap * adding buggifies from code review comments
This commit is contained in:
parent
3609fc8aef
commit
7cec0a5249
|
@ -33,6 +33,12 @@ public:
|
|||
|
||||
SingleBlobConnectionProvider(std::string url) { conn = BackupContainerFileSystem::openContainerFS(url, {}, {}); }
|
||||
|
||||
bool needsRefresh() const { return false; }
|
||||
|
||||
bool isExpired() const { return false; }
|
||||
|
||||
void update(Standalone<BlobMetadataDetailsRef> newBlobMetadata) { ASSERT(false); }
|
||||
|
||||
private:
|
||||
Reference<BackupContainerFileSystem> conn;
|
||||
};
|
||||
|
@ -44,18 +50,42 @@ struct PartitionedBlobConnectionProvider : BlobConnectionProvider {
|
|||
return std::pair(conn, metadata.partitions[writePartition].toString() + newFileName);
|
||||
}
|
||||
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) { return conn; }
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) {
|
||||
CODE_PROBE(isExpired(), "partitioned blob connection using expired blob metadata for read!");
|
||||
return conn;
|
||||
}
|
||||
|
||||
PartitionedBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) : metadata(metadata) {
|
||||
ASSERT(metadata.base.present());
|
||||
ASSERT(metadata.partitions.size() >= 2);
|
||||
conn = BackupContainerFileSystem::openContainerFS(metadata.base.get().toString(), {}, {});
|
||||
for (auto& it : metadata.partitions) {
|
||||
void updateMetadata(const Standalone<BlobMetadataDetailsRef>& newMetadata, bool checkPrevious) {
|
||||
ASSERT(newMetadata.base.present());
|
||||
ASSERT(newMetadata.partitions.size() >= 2);
|
||||
for (auto& it : newMetadata.partitions) {
|
||||
// these should be suffixes, not whole blob urls
|
||||
ASSERT(it.toString().find("://") == std::string::npos);
|
||||
}
|
||||
if (checkPrevious) {
|
||||
if (newMetadata.expireAt <= metadata.expireAt) {
|
||||
return;
|
||||
}
|
||||
// FIXME: validate only the credentials changed and the location is the same
|
||||
ASSERT(newMetadata.partitions.size() == metadata.partitions.size());
|
||||
for (int i = 0; i < newMetadata.partitions.size(); i++) {
|
||||
ASSERT(newMetadata.partitions[i] == metadata.partitions[i]);
|
||||
}
|
||||
}
|
||||
metadata = newMetadata;
|
||||
conn = BackupContainerFileSystem::openContainerFS(metadata.base.get().toString(), {}, {});
|
||||
}
|
||||
|
||||
PartitionedBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) {
|
||||
updateMetadata(metadata, false);
|
||||
}
|
||||
|
||||
bool needsRefresh() const { return now() >= metadata.refreshAt; }
|
||||
|
||||
bool isExpired() const { return now() >= metadata.expireAt; }
|
||||
|
||||
void update(Standalone<BlobMetadataDetailsRef> newBlobMetadata) { updateMetadata(newBlobMetadata, true); }
|
||||
|
||||
private:
|
||||
Standalone<BlobMetadataDetailsRef> metadata;
|
||||
Reference<BackupContainerFileSystem> conn;
|
||||
|
@ -72,6 +102,7 @@ struct StorageLocationBlobConnectionProvider : BlobConnectionProvider {
|
|||
}
|
||||
|
||||
Reference<BackupContainerFileSystem> getForRead(std::string filePath) {
|
||||
CODE_PROBE(isExpired(), "storage location blob connection using expired blob metadata for read!");
|
||||
size_t slash = filePath.find("/");
|
||||
ASSERT(slash != std::string::npos);
|
||||
int partition = stoi(filePath.substr(0, slash));
|
||||
|
@ -80,9 +111,18 @@ struct StorageLocationBlobConnectionProvider : BlobConnectionProvider {
|
|||
return partitions[partition];
|
||||
}
|
||||
|
||||
StorageLocationBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) {
|
||||
ASSERT(!metadata.base.present());
|
||||
ASSERT(metadata.partitions.size() >= 2);
|
||||
void updateMetadata(const Standalone<BlobMetadataDetailsRef>& newMetadata, bool checkPrevious) {
|
||||
ASSERT(!newMetadata.base.present());
|
||||
ASSERT(newMetadata.partitions.size() >= 2);
|
||||
if (checkPrevious) {
|
||||
// FIXME: validate only the credentials changed and the locations are the same
|
||||
ASSERT(newMetadata.partitions.size() == partitions.size());
|
||||
if (newMetadata.expireAt <= metadata.expireAt) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
metadata = newMetadata;
|
||||
partitions.clear();
|
||||
for (auto& it : metadata.partitions) {
|
||||
// these should be whole blob urls
|
||||
ASSERT(it.toString().find("://") != std::string::npos);
|
||||
|
@ -90,7 +130,18 @@ struct StorageLocationBlobConnectionProvider : BlobConnectionProvider {
|
|||
}
|
||||
}
|
||||
|
||||
StorageLocationBlobConnectionProvider(const Standalone<BlobMetadataDetailsRef> metadata) {
|
||||
updateMetadata(metadata, false);
|
||||
}
|
||||
|
||||
bool needsRefresh() const { return now() >= metadata.refreshAt; }
|
||||
|
||||
bool isExpired() const { return now() >= metadata.expireAt; }
|
||||
|
||||
void update(Standalone<BlobMetadataDetailsRef> newBlobMetadata) { updateMetadata(newBlobMetadata, true); }
|
||||
|
||||
private:
|
||||
Standalone<BlobMetadataDetailsRef> metadata;
|
||||
std::vector<Reference<BackupContainerFileSystem>> partitions;
|
||||
};
|
||||
|
||||
|
|
|
@ -989,8 +989,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
// Blob Metadata
|
||||
init( BLOB_METADATA_CACHE_TTL, isSimulated ? 120 : 24 * 60 * 60 );
|
||||
if ( randomize && BUGGIFY) { BLOB_METADATA_CACHE_TTL = deterministicRandom()->randomInt(50, 100); }
|
||||
init( BLOB_METADATA_REFRESH_INTERVAL, isSimulated ? 60 : 12 * 60 * 60 );
|
||||
if ( randomize && BUGGIFY) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(20, 40); }
|
||||
init( BLOB_METADATA_REFRESH_INTERVAL, isSimulated ? 60 : 60 * 60 );
|
||||
if ( randomize && BUGGIFY) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); }
|
||||
|
||||
// HTTP KMS Connector
|
||||
init( REST_KMS_CONNECTOR_KMS_DISCOVERY_URL_MODE, "file");
|
||||
|
|
|
@ -33,13 +33,15 @@ struct BlobConnectionProvider : NonCopyable, ReferenceCounted<BlobConnectionProv
|
|||
// something returned from createForWrite
|
||||
virtual Reference<BackupContainerFileSystem> getForRead(std::string filePath) = 0;
|
||||
|
||||
virtual bool isExpired() const = 0;
|
||||
virtual bool needsRefresh() const = 0;
|
||||
virtual void update(Standalone<BlobMetadataDetailsRef> newBlobMetadata) = 0;
|
||||
|
||||
virtual ~BlobConnectionProvider() {}
|
||||
|
||||
static Reference<BlobConnectionProvider> newBlobConnectionProvider(std::string blobUrl);
|
||||
|
||||
static Reference<BlobConnectionProvider> newBlobConnectionProvider(Standalone<BlobMetadataDetailsRef> blobMetadata);
|
||||
|
||||
// TODO add update impl
|
||||
};
|
||||
|
||||
#endif
|
|
@ -44,9 +44,14 @@ struct BlobMetadataDetailsRef {
|
|||
Optional<StringRef> base;
|
||||
VectorRef<StringRef> partitions;
|
||||
|
||||
// cache options
|
||||
double refreshAt;
|
||||
double expireAt;
|
||||
|
||||
BlobMetadataDetailsRef() {}
|
||||
BlobMetadataDetailsRef(Arena& arena, const BlobMetadataDetailsRef& from)
|
||||
: domainId(from.domainId), domainName(arena, from.domainName), partitions(arena, from.partitions) {
|
||||
: domainId(from.domainId), domainName(arena, from.domainName), partitions(arena, from.partitions),
|
||||
refreshAt(from.refreshAt), expireAt(from.expireAt) {
|
||||
if (from.base.present()) {
|
||||
base = StringRef(arena, from.base.get());
|
||||
}
|
||||
|
@ -54,8 +59,11 @@ struct BlobMetadataDetailsRef {
|
|||
explicit BlobMetadataDetailsRef(BlobMetadataDomainId domainId,
|
||||
BlobMetadataDomainNameRef domainName,
|
||||
Optional<StringRef> base,
|
||||
VectorRef<StringRef> partitions)
|
||||
: domainId(domainId), domainName(domainName), base(base), partitions(partitions) {}
|
||||
VectorRef<StringRef> partitions,
|
||||
double refreshAt,
|
||||
double expireAt)
|
||||
: domainId(domainId), domainName(domainName), base(base), partitions(partitions), refreshAt(refreshAt),
|
||||
expireAt(expireAt) {}
|
||||
|
||||
int expectedSize() const {
|
||||
return sizeof(BlobMetadataDetailsRef) + domainName.size() + (base.present() ? base.get().size() : 0) +
|
||||
|
@ -64,7 +72,7 @@ struct BlobMetadataDetailsRef {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, domainId, domainName, base, partitions);
|
||||
serializer(ar, domainId, domainName, base, partitions, refreshAt, expireAt);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -483,7 +483,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(
|
|||
}
|
||||
auto dataEntry = self->tenantData.rangeContaining(info->second.prefix);
|
||||
ASSERT(dataEntry.begin() == info->second.prefix);
|
||||
dataEntry.cvalue()->setBStore(BlobConnectionProvider::newBlobConnectionProvider(metadata));
|
||||
dataEntry.cvalue()->updateBStore(metadata);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -492,6 +492,14 @@ ACTOR Future<Void> loadBlobMetadataForTenants(
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> loadBlobMetadataForTenant(BGTenantMap* self,
|
||||
BlobMetadataDomainId domainId,
|
||||
BlobMetadataDomainName domainName) {
|
||||
std::vector<std::pair<BlobMetadataDomainId, BlobMetadataDomainName>> toLoad;
|
||||
toLoad.push_back({ domainId, domainName });
|
||||
return loadBlobMetadataForTenants(self, toLoad);
|
||||
}
|
||||
|
||||
// list of tenants that may or may not already exist
|
||||
void BGTenantMap::addTenants(std::vector<std::pair<TenantName, TenantMapEntry>> tenants) {
|
||||
std::vector<std::pair<BlobMetadataDomainId, BlobMetadataDomainName>> tenantsToLoad;
|
||||
|
@ -526,11 +534,41 @@ Optional<TenantMapEntry> BGTenantMap::getTenantById(int64_t id) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: handle case where tenant isn't loaded yet
|
||||
Reference<GranuleTenantData> BGTenantMap::getDataForGranule(const KeyRangeRef& keyRange) {
|
||||
auto tenant = tenantData.rangeContaining(keyRange.begin);
|
||||
ASSERT(tenant.begin() <= keyRange.begin);
|
||||
ASSERT(tenant.end() >= keyRange.end);
|
||||
// FIXME: batch requests for refresh?
|
||||
// FIXME: don't double fetch if multiple accesses to refreshing/expired metadata
|
||||
// FIXME: log warning if after refresh, data is still expired!
|
||||
ACTOR Future<Reference<GranuleTenantData>> getDataForGranuleActor(BGTenantMap* self, KeyRange keyRange) {
|
||||
state int loopCount = 0;
|
||||
loop {
|
||||
loopCount++;
|
||||
auto tenant = self->tenantData.rangeContaining(keyRange.begin);
|
||||
ASSERT(tenant.begin() <= keyRange.begin);
|
||||
ASSERT(tenant.end() >= keyRange.end);
|
||||
|
||||
return tenant.cvalue();
|
||||
}
|
||||
if (!tenant.cvalue().isValid() || !tenant.cvalue()->bstore.isValid()) {
|
||||
return tenant.cvalue();
|
||||
} else if (tenant.cvalue()->bstore->isExpired()) {
|
||||
CODE_PROBE(true, "re-fetching expired blob metadata");
|
||||
// fetch again
|
||||
Future<Void> reload = loadBlobMetadataForTenant(self, tenant.cvalue()->entry.id, tenant->cvalue()->name);
|
||||
wait(reload);
|
||||
if (loopCount > 1) {
|
||||
TraceEvent(SevWarn, "BlobMetadataStillExpired").suppressFor(5.0).detail("LoopCount", loopCount);
|
||||
wait(delay(0.001));
|
||||
}
|
||||
} else {
|
||||
// handle refresh in background if tenant needs refres
|
||||
if (tenant.cvalue()->bstore->needsRefresh()) {
|
||||
Future<Void> reload =
|
||||
loadBlobMetadataForTenant(self, tenant.cvalue()->entry.id, tenant->cvalue()->name);
|
||||
self->addActor.send(reload);
|
||||
}
|
||||
return tenant.cvalue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle case where tenant isn't loaded yet
|
||||
Future<Reference<GranuleTenantData>> BGTenantMap::getDataForGranule(const KeyRangeRef& keyRange) {
|
||||
return getDataForGranuleActor(this, keyRange);
|
||||
}
|
||||
|
|
|
@ -562,11 +562,12 @@ ACTOR Future<BlobGranuleSplitPoints> alignKeys(Reference<BlobManagerData> bmData
|
|||
|
||||
state Transaction tr = Transaction(bmData->db);
|
||||
state int idx = 1;
|
||||
state Reference<GranuleTenantData> tenantData = bmData->tenantData.getDataForGranule(granuleRange);
|
||||
state Reference<GranuleTenantData> tenantData;
|
||||
wait(store(tenantData, bmData->tenantData.getDataForGranule(granuleRange)));
|
||||
while (SERVER_KNOBS->BG_METADATA_SOURCE == "tenant" && !tenantData.isValid()) {
|
||||
// this is a bit of a hack, but if we know this range is supposed to have a tenant, and it doesn't, just wait
|
||||
wait(delay(1.0));
|
||||
tenantData = bmData->tenantData.getDataForGranule(granuleRange);
|
||||
wait(store(tenantData, bmData->tenantData.getDataForGranule(granuleRange)));
|
||||
}
|
||||
for (; idx < splits.size() - 1; idx++) {
|
||||
loop {
|
||||
|
@ -4212,7 +4213,8 @@ ACTOR Future<Reference<BlobConnectionProvider>> getBStoreForGranule(Reference<Bl
|
|||
return self->bstore;
|
||||
}
|
||||
loop {
|
||||
state Reference<GranuleTenantData> data = self->tenantData.getDataForGranule(granuleRange);
|
||||
state Reference<GranuleTenantData> data;
|
||||
wait(store(data, self->tenantData.getDataForGranule(granuleRange)));
|
||||
if (data.isValid()) {
|
||||
wait(data->bstoreLoaded.getFuture());
|
||||
wait(delay(0));
|
||||
|
|
|
@ -366,7 +366,7 @@ ACTOR Future<BlobGranuleCipherKeysCtx> getLatestGranuleCipherKeys(Reference<Blob
|
|||
KeyRange keyRange,
|
||||
Arena* arena) {
|
||||
state BlobGranuleCipherKeysCtx cipherKeysCtx;
|
||||
state Reference<GranuleTenantData> tenantData = bwData->tenantData.getDataForGranule(keyRange);
|
||||
state Reference<GranuleTenantData> tenantData = wait(bwData->tenantData.getDataForGranule(keyRange));
|
||||
|
||||
ASSERT(tenantData.isValid());
|
||||
|
||||
|
@ -4095,7 +4095,8 @@ ACTOR Future<Reference<BlobConnectionProvider>> loadBStoreForTenant(Reference<Bl
|
|||
KeyRange keyRange) {
|
||||
state int retryCount = 0;
|
||||
loop {
|
||||
state Reference<GranuleTenantData> data = bwData->tenantData.getDataForGranule(keyRange);
|
||||
state Reference<GranuleTenantData> data;
|
||||
wait(store(data, bwData->tenantData.getDataForGranule(keyRange)));
|
||||
if (data.isValid()) {
|
||||
wait(data->bstoreLoaded.getFuture());
|
||||
wait(delay(0));
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/BlobMetadataUtils.h"
|
||||
#include "fdbclient/EncryptKeyProxyInterface.h"
|
||||
|
||||
#include "fdbrpc/Locality.h"
|
||||
|
@ -594,10 +595,21 @@ bool isCipherKeyEligibleForRefresh(const EncryptBaseCipherKey& cipherKey, int64_
|
|||
// Candidate eligible for refresh iff either is true:
|
||||
// 1. CipherKey cell is either expired/needs-refresh right now.
|
||||
// 2. CipherKey cell 'will' be expired/needs-refresh before next refresh cycle interval (proactive refresh)
|
||||
if (BUGGIFY_WITH_PROB(0.01)) {
|
||||
return true;
|
||||
}
|
||||
int64_t nextRefreshCycleTS = currTS + FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL;
|
||||
return nextRefreshCycleTS > cipherKey.expireAt || nextRefreshCycleTS > cipherKey.refreshAt;
|
||||
}
|
||||
|
||||
bool isBlobMetadataEligibleForRefresh(const BlobMetadataDetailsRef& blobMetadata, int64_t currTS) {
|
||||
if (BUGGIFY_WITH_PROB(0.01)) {
|
||||
return true;
|
||||
}
|
||||
int64_t nextRefreshCycleTS = currTS + SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||
return nextRefreshCycleTS > blobMetadata.expireAt || nextRefreshCycleTS > blobMetadata.refreshAt;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
state UID debugId = deterministicRandom()->randomUniqueID();
|
||||
|
@ -710,7 +722,8 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
|
||||
for (auto& info : dedupedDomainInfos) {
|
||||
const auto itr = ekpProxyData->blobMetadataDomainIdCache.find(info.first);
|
||||
if (itr != ekpProxyData->blobMetadataDomainIdCache.end() && itr->second.isValid()) {
|
||||
if (itr != ekpProxyData->blobMetadataDomainIdCache.end() && itr->second.isValid() &&
|
||||
now() <= itr->second.metadataDetails.expireAt) {
|
||||
metadataDetails.arena().dependsOn(itr->second.metadataDetails.arena());
|
||||
metadataDetails.push_back(metadataDetails.arena(), itr->second.metadataDetails);
|
||||
|
||||
|
@ -760,6 +773,7 @@ ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxy
|
|||
ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpProxyData,
|
||||
KmsConnectorInterface kmsConnectorInf) {
|
||||
state UID debugId = deterministicRandom()->randomUniqueID();
|
||||
state double startTime;
|
||||
|
||||
state TraceEvent t("RefreshBlobMetadata_Start", ekpProxyData->myId);
|
||||
t.setMaxEventLength(SERVER_KNOBS->ENCRYPT_PROXY_MAX_DBG_TRACE_LENGTH);
|
||||
|
@ -769,13 +783,28 @@ ACTOR Future<Void> refreshBlobMetadataCore(Reference<EncryptKeyProxyData> ekpPro
|
|||
try {
|
||||
KmsConnBlobMetadataReq req;
|
||||
req.debugId = debugId;
|
||||
req.domainInfos.reserve(req.domainInfos.arena(), ekpProxyData->blobMetadataDomainIdCache.size());
|
||||
|
||||
// TODO add refresh + expire timestamp and filter to only ones that need refreshing
|
||||
for (auto& item : ekpProxyData->blobMetadataDomainIdCache) {
|
||||
req.domainInfos.emplace_back(req.domainInfos.arena(), item.first, item.second.metadataDetails.domainName);
|
||||
int64_t currTS = (int64_t)now();
|
||||
for (auto itr = ekpProxyData->blobMetadataDomainIdCache.begin();
|
||||
itr != ekpProxyData->blobMetadataDomainIdCache.end();) {
|
||||
if (isBlobMetadataEligibleForRefresh(itr->second.metadataDetails, currTS)) {
|
||||
req.domainInfos.emplace_back_deep(
|
||||
req.domainInfos.arena(), itr->first, itr->second.metadataDetails.domainName);
|
||||
}
|
||||
|
||||
// Garbage collect expired cached Blob Metadata
|
||||
if (itr->second.metadataDetails.expireAt >= currTS) {
|
||||
itr = ekpProxyData->blobMetadataDomainIdCache.erase(itr);
|
||||
} else {
|
||||
itr++;
|
||||
}
|
||||
}
|
||||
state double startTime = now();
|
||||
|
||||
if (req.domainInfos.empty()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
startTime = now();
|
||||
KmsConnBlobMetadataRep rep = wait(kmsConnectorInf.blobMetadataReq.getReply(req));
|
||||
ekpProxyData->kmsBlobMetadataReqLatency.addMeasurement(now() - startTime);
|
||||
for (auto& item : rep.metadataDetails) {
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
#include "flow/network.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
@ -227,6 +228,17 @@ static Standalone<BlobMetadataDetailsRef> createBlobMetadata(BlobMetadataDomainI
|
|||
ev.detail("P" + std::to_string(i), metadata.partitions.back());
|
||||
}
|
||||
}
|
||||
|
||||
// set random refresh + expire time
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
metadata.refreshAt = now() + deterministicRandom()->random01() * SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||
metadata.expireAt =
|
||||
metadata.refreshAt + deterministicRandom()->random01() * SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL;
|
||||
} else {
|
||||
metadata.refreshAt = std::numeric_limits<double>::max();
|
||||
metadata.expireAt = metadata.refreshAt;
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
|
@ -245,6 +257,10 @@ ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobM
|
|||
it = simBlobMetadataStore
|
||||
.insert({ domainInfo.domainId, createBlobMetadata(domainInfo.domainId, domainInfo.domainName) })
|
||||
.first;
|
||||
} else if (now() >= it->second.expireAt) {
|
||||
// update random refresh and expire time
|
||||
it->second.refreshAt = now() + deterministicRandom()->random01() * 30;
|
||||
it->second.expireAt = it->second.refreshAt + deterministicRandom()->random01() * 10;
|
||||
}
|
||||
rep.metadataDetails.arena().dependsOn(it->second.arena());
|
||||
rep.metadataDetails.push_back(rep.metadataDetails.arena(), it->second);
|
||||
|
|
|
@ -105,10 +105,15 @@ struct GranuleTenantData : NonCopyable, ReferenceCounted<GranuleTenantData> {
|
|||
GranuleTenantData() {}
|
||||
GranuleTenantData(TenantName name, TenantMapEntry entry) : name(name), entry(entry) {}
|
||||
|
||||
void setBStore(Reference<BlobConnectionProvider> bs) {
|
||||
ASSERT(bstoreLoaded.canBeSet());
|
||||
bstore = bs;
|
||||
bstoreLoaded.send(Void());
|
||||
void updateBStore(const BlobMetadataDetailsRef& metadata) {
|
||||
if (bstoreLoaded.canBeSet()) {
|
||||
// new
|
||||
bstore = BlobConnectionProvider::newBlobConnectionProvider(metadata);
|
||||
bstoreLoaded.send(Void());
|
||||
} else {
|
||||
// update existing
|
||||
bstore->update(metadata);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -119,7 +124,7 @@ public:
|
|||
void removeTenants(std::vector<int64_t> tenantIds);
|
||||
|
||||
Optional<TenantMapEntry> getTenantById(int64_t id);
|
||||
Reference<GranuleTenantData> getDataForGranule(const KeyRangeRef& keyRange);
|
||||
Future<Reference<GranuleTenantData>> getDataForGranule(const KeyRangeRef& keyRange);
|
||||
|
||||
KeyRangeMap<Reference<GranuleTenantData>> tenantData;
|
||||
std::unordered_map<int64_t, TenantMapEntry> tenantInfoById;
|
||||
|
|
|
@ -281,6 +281,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
state int directoryIdx = 0;
|
||||
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants;
|
||||
state BGTenantMap tenantData(self->dbInfo);
|
||||
state Reference<GranuleTenantData> data;
|
||||
for (; directoryIdx < self->directories.size(); directoryIdx++) {
|
||||
// Set up the blob range first
|
||||
TenantMapEntry tenantEntry = wait(self->setUpTenant(cx, self->directories[directoryIdx]->tenantName));
|
||||
|
@ -297,8 +298,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
// wait for tenant data to be loaded
|
||||
|
||||
for (directoryIdx = 0; directoryIdx < self->directories.size(); directoryIdx++) {
|
||||
state Reference<GranuleTenantData> data =
|
||||
tenantData.getDataForGranule(self->directories[directoryIdx]->directoryRange);
|
||||
wait(store(data, tenantData.getDataForGranule(self->directories[directoryIdx]->directoryRange)));
|
||||
wait(data->bstoreLoaded.getFuture());
|
||||
wait(delay(0));
|
||||
self->directories[directoryIdx]->bstore = data->bstore;
|
||||
|
|
Loading…
Reference in New Issue