blob: refactor blob get tenant code
This commit is contained in:
parent
1ab432e49d
commit
f7b608e53f
|
@ -7994,6 +7994,21 @@ ACTOR Future<TenantMapEntry> blobGranuleGetTenantEntry(Transaction* self,
|
|||
return tme;
|
||||
}
|
||||
|
||||
// Tenant's are supposed to be unique and therefore can be loaded once.
|
||||
// There is an assumption that a tenant exists as long as operations are happening against said tenant.
|
||||
ACTOR Future<TenantMapEntry> blobLoadTenantMapEntry(Database* db, Key rangeStartKey, Optional<TenantName> tenantName) {
|
||||
state Transaction tr(*db);
|
||||
|
||||
loop {
|
||||
try {
|
||||
TenantMapEntry tme = wait(blobGranuleGetTenantEntry(&tr, rangeStartKey, tenantName));
|
||||
return tme;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Standalone<VectorRef<KeyRef>>> Transaction::getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize) {
|
||||
return ::getRangeSplitPoints(
|
||||
trState, keys, chunkSize, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion);
|
||||
|
@ -8465,7 +8480,6 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx,
|
|||
state Version readVersionOut = invalidVersion;
|
||||
state int batchSize = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : CLIENT_KNOBS->BG_TOO_MANY_GRANULES / 2;
|
||||
state int loadSize = (BUGGIFY ? deterministicRandom()->randomInt(1, 20) : 20) * batchSize;
|
||||
state bool loadedTenantEntry = false;
|
||||
|
||||
if (version.present()) {
|
||||
if (version.get() == latestVersion) {
|
||||
|
@ -8485,16 +8499,16 @@ ACTOR Future<Version> verifyBlobRangeActor(Reference<DatabaseContext> cx,
|
|||
}
|
||||
}
|
||||
|
||||
if (tenantName.present()) {
|
||||
TenantMapEntry tme = wait(blobLoadTenantMapEntry(&db, range.begin, tenantName));
|
||||
range = range.withPrefix(tme.prefix);
|
||||
curRegion = KeyRangeRef(range.begin, range.begin);
|
||||
}
|
||||
|
||||
loop {
|
||||
if (curRegion.begin >= range.end) {
|
||||
return readVersionOut;
|
||||
}
|
||||
if (tenantName.present() && !loadedTenantEntry) {
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin, tenantName));
|
||||
loadedTenantEntry = true;
|
||||
range = range.withPrefix(tenantEntry.prefix);
|
||||
curRegion = KeyRangeRef(range.begin, range.begin);
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
wait(store(allRanges, tr.getBlobGranuleRanges(KeyRangeRef(curRegion.begin, range.end), loadSize)));
|
||||
|
@ -10651,7 +10665,6 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
state Transaction tr(cx);
|
||||
state Key purgeKey;
|
||||
state KeyRange purgeRange = range;
|
||||
state bool loadedTenantPrefix = false;
|
||||
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
if (purgeVersion == latestVersion) {
|
||||
|
@ -10671,18 +10684,17 @@ ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
|
|||
throw unsupported_operation();
|
||||
}
|
||||
|
||||
if (tenant.present()) {
|
||||
TenantMapEntry tme = wait(blobLoadTenantMapEntry(&cx, range.begin, tenant));
|
||||
purgeRange = purgeRange.withPrefix(tme.prefix);
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
if (tenant.present() && !loadedTenantPrefix) {
|
||||
TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(&tr, range.begin, tenant));
|
||||
loadedTenantPrefix = true;
|
||||
purgeRange = purgeRange.withPrefix(tenantEntry.prefix);
|
||||
}
|
||||
|
||||
// must be aligned to blob range(s)
|
||||
state Future<Standalone<VectorRef<KeyRangeRef>>> blobbifiedBegin =
|
||||
getBlobRanges(&tr, KeyRangeRef(purgeRange.begin, purgeRange.begin), 2);
|
||||
|
@ -10773,7 +10785,11 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx,
|
|||
Optional<TenantName> tenantName) {
|
||||
state Database db(cx);
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
||||
state bool loadedTenantEntry = false;
|
||||
|
||||
if (tenantName.present()) {
|
||||
TenantMapEntry tme = wait(blobLoadTenantMapEntry(&db, range.begin, tenantName));
|
||||
range = range.withPrefix(tme.prefix);
|
||||
}
|
||||
|
||||
state Value value = active ? blobRangeActive : blobRangeInactive;
|
||||
loop {
|
||||
|
@ -10781,13 +10797,6 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx,
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
if (tenantName.present() && !loadedTenantEntry) {
|
||||
TenantMapEntry tenantEntry =
|
||||
wait(blobGranuleGetTenantEntry(&tr->getTransaction(), range.begin, tenantName));
|
||||
loadedTenantEntry = true;
|
||||
range = range.withPrefix(tenantEntry.prefix);
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> startBlobRanges = wait(getBlobRanges(&tr->getTransaction(), range, 1));
|
||||
|
||||
if (active) {
|
||||
|
@ -10842,16 +10851,9 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRangesActor(Refer
|
|||
state TenantMapEntry tme;
|
||||
state Standalone<VectorRef<KeyRangeRef>> blobRanges;
|
||||
|
||||
loop {
|
||||
try {
|
||||
if (tenantName.present()) {
|
||||
wait(store(tme, blobGranuleGetTenantEntry(&tr, range.begin, tenantName)));
|
||||
range = range.withPrefix(tme.prefix);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
if (tenantName.present()) {
|
||||
wait(store(tme, blobLoadTenantMapEntry(&db, range.begin, tenantName)));
|
||||
range = range.withPrefix(tme.prefix);
|
||||
}
|
||||
|
||||
loop {
|
||||
|
|
Loading…
Reference in New Issue