Merge pull request #9128 from sfc-gh-ajbeamon/transactions-use-tenant-object
Transactions take a tenant object rather than a tenant name
This commit is contained in:
commit
d47a2ab60f
|
@ -59,7 +59,7 @@ Reference<ISingleThreadTransaction> ISingleThreadTransaction::create(Type type,
|
|||
|
||||
Reference<ISingleThreadTransaction> ISingleThreadTransaction::create(Type type,
|
||||
Database const& cx,
|
||||
TenantName const& tenant) {
|
||||
Reference<Tenant> const& tenant) {
|
||||
Reference<ISingleThreadTransaction> result;
|
||||
if (type == Type::RYW) {
|
||||
result = makeReference<ReadYourWritesTransaction>();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1475,8 +1475,8 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenantName)
|
||||
: ISingleThreadTransaction(cx->deferredError), tr(cx, tenantName), cache(&arena), writes(&arena), retries(0),
|
||||
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optional<Reference<Tenant>> const& tenant)
|
||||
: ISingleThreadTransaction(cx->deferredError), tr(cx, tenant), cache(&arena), writes(&arena), retries(0),
|
||||
approximateSize(0), creationTime(now()), commitStarted(false), versionStampFuture(tr.getVersionstamp()),
|
||||
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end), options(tr) {
|
||||
std::copy(
|
||||
|
@ -1485,11 +1485,11 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optiona
|
|||
}
|
||||
|
||||
void ReadYourWritesTransaction::construct(Database const& cx) {
|
||||
*this = ReadYourWritesTransaction(cx, Optional<TenantName>());
|
||||
*this = ReadYourWritesTransaction(cx);
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::construct(Database const& cx, TenantName const& tenantName) {
|
||||
*this = ReadYourWritesTransaction(cx, tenantName);
|
||||
void ReadYourWritesTransaction::construct(Database const& cx, Reference<Tenant> const& tenant) {
|
||||
*this = ReadYourWritesTransaction(cx, tenant);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
|
||||
|
@ -1828,7 +1828,6 @@ Future<Standalone<VectorRef<BlobGranuleChunkRef>>> ReadYourWritesTransaction::re
|
|||
Version begin,
|
||||
Optional<Version> readVersion,
|
||||
Version* readVersionOut) {
|
||||
|
||||
if (!options.readYourWritesDisabled) {
|
||||
return blob_granule_no_ryw();
|
||||
}
|
||||
|
@ -1851,7 +1850,6 @@ Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> ReadYourWritesTransaction::
|
|||
const KeyRange& range,
|
||||
Optional<Version> summaryVersion,
|
||||
int rangeLimit) {
|
||||
|
||||
if (checkUsedDuringCommit()) {
|
||||
return used_during_commit();
|
||||
}
|
||||
|
|
|
@ -239,7 +239,8 @@ ThreadFuture<Key> ThreadSafeTenant::purgeBlobGranules(const KeyRangeRef& keyRang
|
|||
TenantName tenantName = this->name;
|
||||
KeyRange range = keyRange;
|
||||
return onMainThread([db, range, purgeVersion, tenantName, force]() -> Future<Key> {
|
||||
return db->purgeBlobGranules(range, purgeVersion, tenantName, force);
|
||||
db->addref();
|
||||
return db->purgeBlobGranules(range, purgeVersion, makeReference<Tenant>(Database(db), tenantName), force);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -258,7 +259,8 @@ ThreadFuture<bool> ThreadSafeTenant::blobbifyRange(const KeyRangeRef& keyRange)
|
|||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<bool> {
|
||||
db->checkDeferredError();
|
||||
return db->blobbifyRange(range, tenantName);
|
||||
db->addref();
|
||||
return db->blobbifyRange(range, makeReference<Tenant>(Database(db), tenantName));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -268,7 +270,8 @@ ThreadFuture<bool> ThreadSafeTenant::unblobbifyRange(const KeyRangeRef& keyRange
|
|||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<bool> {
|
||||
db->checkDeferredError();
|
||||
return db->unblobbifyRange(range, tenantName);
|
||||
db->addref();
|
||||
return db->unblobbifyRange(range, makeReference<Tenant>(Database(db), tenantName));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -279,7 +282,8 @@ ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeTenant::listBlobbifie
|
|||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<Standalone<VectorRef<KeyRangeRef>>> {
|
||||
db->checkDeferredError();
|
||||
return db->listBlobbifiedRanges(range, rangeLimit, tenantName);
|
||||
db->addref();
|
||||
return db->listBlobbifiedRanges(range, rangeLimit, makeReference<Tenant>(Database(db), tenantName));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -289,7 +293,8 @@ ThreadFuture<Version> ThreadSafeTenant::verifyBlobRange(const KeyRangeRef& keyRa
|
|||
KeyRange range = keyRange;
|
||||
return onMainThread([=]() -> Future<Version> {
|
||||
db->checkDeferredError();
|
||||
return db->verifyBlobRange(range, version, tenantName);
|
||||
db->addref();
|
||||
return db->verifyBlobRange(range, version, makeReference<Tenant>(Database(db), tenantName));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -297,8 +302,8 @@ ThreadSafeTenant::~ThreadSafeTenant() {}
|
|||
|
||||
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
|
||||
ISingleThreadTransaction::Type type,
|
||||
Optional<TenantName> tenant)
|
||||
: tenantName(tenant), initialized(std::make_shared<std::atomic_bool>(false)) {
|
||||
Optional<TenantName> tenantName)
|
||||
: tenantName(tenantName), initialized(std::make_shared<std::atomic_bool>(false)) {
|
||||
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
|
||||
// but run its constructor on the main thread
|
||||
|
||||
|
@ -309,19 +314,21 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
|
|||
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
|
||||
auto init = this->initialized;
|
||||
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
|
||||
onMainThreadVoid([tr, cx, type, tenant, init]() {
|
||||
onMainThreadVoid([tr, cx, type, tenantName, init]() {
|
||||
cx->addref();
|
||||
if (tenant.present()) {
|
||||
Database db(cx);
|
||||
if (tenantName.present()) {
|
||||
Reference<Tenant> tenant = makeReference<Tenant>(db, tenantName.get());
|
||||
if (type == ISingleThreadTransaction::Type::RYW) {
|
||||
new (tr) ReadYourWritesTransaction(Database(cx), tenant.get());
|
||||
new (tr) ReadYourWritesTransaction(db, tenant);
|
||||
} else {
|
||||
tr->construct(Database(cx), tenant.get());
|
||||
tr->construct(db, tenant);
|
||||
}
|
||||
} else {
|
||||
if (type == ISingleThreadTransaction::Type::RYW) {
|
||||
new (tr) ReadYourWritesTransaction(Database(cx));
|
||||
new (tr) ReadYourWritesTransaction(db);
|
||||
} else {
|
||||
tr->construct(Database(cx));
|
||||
tr->construct(db);
|
||||
}
|
||||
}
|
||||
*init = true;
|
||||
|
|
|
@ -379,7 +379,6 @@ struct GetTenantIdReply {
|
|||
|
||||
struct GetTenantIdRequest {
|
||||
constexpr static FileIdentifier file_identifier = 11299717;
|
||||
SpanContext spanContext;
|
||||
TenantName tenantName;
|
||||
ReplyPromise<GetTenantIdReply> reply;
|
||||
|
||||
|
@ -390,14 +389,14 @@ struct GetTenantIdRequest {
|
|||
Version minTenantVersion;
|
||||
|
||||
GetTenantIdRequest() : minTenantVersion(latestVersion) {}
|
||||
GetTenantIdRequest(SpanContext spanContext, TenantNameRef const& tenantName, Version minTenantVersion)
|
||||
: spanContext(spanContext), tenantName(tenantName), minTenantVersion(minTenantVersion) {}
|
||||
GetTenantIdRequest(TenantNameRef const& tenantName, Version minTenantVersion)
|
||||
: tenantName(tenantName), minTenantVersion(minTenantVersion) {}
|
||||
|
||||
bool verify() const { return true; }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply, spanContext, tenantName, minTenantVersion);
|
||||
serializer(ar, reply, tenantName, minTenantVersion);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -400,18 +400,18 @@ public:
|
|||
// BlobGranule API.
|
||||
Future<Key> purgeBlobGranules(KeyRange keyRange,
|
||||
Version purgeVersion,
|
||||
Optional<TenantName> tenant,
|
||||
Optional<Reference<Tenant>> tenant,
|
||||
bool force = false);
|
||||
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
|
||||
|
||||
Future<bool> blobbifyRange(KeyRange range, Optional<TenantName> tenantName = {});
|
||||
Future<bool> unblobbifyRange(KeyRange range, Optional<TenantName> tenantName = {});
|
||||
Future<bool> blobbifyRange(KeyRange range, Optional<Reference<Tenant>> tenant = {});
|
||||
Future<bool> unblobbifyRange(KeyRange range, Optional<Reference<Tenant>> tenant = {});
|
||||
Future<Standalone<VectorRef<KeyRangeRef>>> listBlobbifiedRanges(KeyRange range,
|
||||
int rangeLimit,
|
||||
Optional<TenantName> tenantName = {});
|
||||
Optional<Reference<Tenant>> tenant = {});
|
||||
Future<Version> verifyBlobRange(const KeyRange& range,
|
||||
Optional<Version> version,
|
||||
Optional<TenantName> tenantName = {});
|
||||
Optional<Reference<Tenant>> tenant = {});
|
||||
Future<bool> blobRestore(const KeyRange range, Optional<Version> version);
|
||||
|
||||
// private:
|
||||
|
@ -692,9 +692,8 @@ public:
|
|||
|
||||
// Returns the latest commit versions that mutated the specified storage servers
|
||||
/// @note returns the latest commit version for a storage server only if the latest
|
||||
// commit version of that storage server is below the specified "readVersion".
|
||||
// commit version of that storage server is below the transaction's readVersion.
|
||||
void getLatestCommitVersions(const Reference<LocationInfo>& locationInfo,
|
||||
Version readVersion,
|
||||
Reference<TransactionState> info,
|
||||
VersionVector& latestCommitVersions);
|
||||
|
||||
|
@ -715,6 +714,8 @@ public:
|
|||
std::unique_ptr<GlobalConfig> globalConfig;
|
||||
EventCacheHolder connectToDatabaseEventCacheHolder;
|
||||
|
||||
Future<int64_t> lookupTenant(TenantName tenant);
|
||||
|
||||
// Get client-side status information as a JSON string with the following schema:
|
||||
// { "Healthy" : <overall health status: true or false>,
|
||||
// "ClusterID" : <UUID>,
|
||||
|
|
|
@ -47,10 +47,10 @@ public:
|
|||
static ISingleThreadTransaction* allocateOnForeignThread(Type);
|
||||
|
||||
static Reference<ISingleThreadTransaction> create(Type, Database const&);
|
||||
static Reference<ISingleThreadTransaction> create(Type, Database const&, TenantName const&);
|
||||
static Reference<ISingleThreadTransaction> create(Type, Database const&, Reference<Tenant> const&);
|
||||
|
||||
virtual void construct(Database const&) = 0;
|
||||
virtual void construct(Database const&, TenantName const&) {
|
||||
virtual void construct(Database const&, Reference<Tenant> const&) {
|
||||
// By default, a transaction implementation does not support tenants.
|
||||
ASSERT(false);
|
||||
}
|
||||
|
|
|
@ -239,35 +239,20 @@ struct Watch : public ReferenceCounted<Watch>, NonCopyable {
|
|||
|
||||
class Tenant : public ReferenceCounted<Tenant> {
|
||||
public:
|
||||
Tenant(Future<int64_t> id, Optional<TenantName> name) : idFuture(id), name(name) {}
|
||||
Tenant(Database cx, TenantName name);
|
||||
explicit Tenant(int64_t id);
|
||||
Tenant(Future<int64_t> id, Optional<TenantName> name);
|
||||
|
||||
int64_t id() const {
|
||||
ASSERT(idFuture.isReady());
|
||||
return idFuture.get();
|
||||
}
|
||||
Future<Void> ready() const { return success(idFuture); }
|
||||
int64_t id() const;
|
||||
KeyRef prefix() const;
|
||||
std::string description() const;
|
||||
|
||||
StringRef prefix() const {
|
||||
ASSERT(idFuture.isReady());
|
||||
if (bigEndianId == -1) {
|
||||
bigEndianId = bigEndian64(idFuture.get());
|
||||
}
|
||||
return StringRef(reinterpret_cast<const uint8_t*>(&bigEndianId), TenantAPI::PREFIX_SIZE);
|
||||
}
|
||||
|
||||
std::string description() const {
|
||||
StringRef nameStr = name.castTo<TenantNameRef>().orDefault("<unspecified>"_sr);
|
||||
if (idFuture.canGet()) {
|
||||
return format("%*s (%lld)", nameStr.size(), nameStr.begin(), idFuture.get());
|
||||
} else {
|
||||
return format("%*s", nameStr.size(), nameStr.begin());
|
||||
}
|
||||
}
|
||||
|
||||
Future<int64_t> idFuture;
|
||||
Optional<TenantName> name;
|
||||
|
||||
private:
|
||||
mutable int64_t bigEndianId = -1;
|
||||
Future<int64_t> idFuture;
|
||||
};
|
||||
|
||||
template <>
|
||||
|
@ -276,9 +261,12 @@ struct Traceable<Tenant> : std::true_type {
|
|||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(AllowInvalidTenantID);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ResolveDefaultTenant);
|
||||
|
||||
struct TransactionState : ReferenceCounted<TransactionState> {
|
||||
Database cx;
|
||||
Future<Version> readVersionFuture;
|
||||
Promise<Optional<Value>> metadataVersion;
|
||||
Optional<Standalone<StringRef>> authToken;
|
||||
Reference<TransactionLogInfo> trLogInfo;
|
||||
TransactionOptions options;
|
||||
|
@ -312,25 +300,35 @@ struct TransactionState : ReferenceCounted<TransactionState> {
|
|||
|
||||
bool automaticIdempotency = false;
|
||||
|
||||
Future<Void> startFuture;
|
||||
|
||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||
TransactionState(TaskPriority taskID, SpanContext spanContext)
|
||||
: taskID(taskID), spanContext(spanContext), tenantSet(false) {}
|
||||
|
||||
// VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy
|
||||
TransactionState(Database cx,
|
||||
Optional<TenantName> tenantName,
|
||||
Optional<Reference<Tenant>> tenant,
|
||||
TaskPriority taskID,
|
||||
SpanContext spanContext,
|
||||
Reference<TransactionLogInfo> trLogInfo);
|
||||
|
||||
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
|
||||
|
||||
Version readVersion() {
|
||||
ASSERT(readVersionFuture.isValid() && readVersionFuture.isReady());
|
||||
return readVersionFuture.get();
|
||||
}
|
||||
|
||||
TenantInfo getTenantInfo(AllowInvalidTenantID allowInvalidTenantId = AllowInvalidTenantID::False);
|
||||
|
||||
Optional<Reference<Tenant>> const& tenant();
|
||||
bool hasTenant() const;
|
||||
|
||||
bool hasTenant(ResolveDefaultTenant ResolveDefaultTenant = ResolveDefaultTenant::True);
|
||||
int64_t tenantId() const { return tenant_.present() ? tenant_.get()->id() : TenantInfo::INVALID_TENANT; }
|
||||
|
||||
Future<Void> startTransaction(uint32_t readVersionFlags = 0);
|
||||
Future<Version> getReadVersion(uint32_t flags);
|
||||
|
||||
private:
|
||||
Optional<Reference<Tenant>> tenant_;
|
||||
bool tenantSet;
|
||||
|
@ -338,11 +336,16 @@ private:
|
|||
|
||||
class Transaction : NonCopyable {
|
||||
public:
|
||||
explicit Transaction(Database const& cx, Optional<TenantName> const& tenant = Optional<TenantName>());
|
||||
explicit Transaction(Database const& cx, Optional<Reference<Tenant>> const& tenant = Optional<Reference<Tenant>>());
|
||||
~Transaction();
|
||||
|
||||
void setVersion(Version v);
|
||||
Future<Version> getReadVersion() { return getReadVersion(0); }
|
||||
Future<Version> getReadVersion() {
|
||||
if (!trState->readVersionFuture.isValid()) {
|
||||
trState->readVersionFuture = trState->getReadVersion(0);
|
||||
}
|
||||
return trState->readVersionFuture;
|
||||
}
|
||||
Future<Version> getRawReadVersion();
|
||||
Optional<Version> getCachedReadVersion() const;
|
||||
|
||||
|
@ -544,8 +547,6 @@ public:
|
|||
using FutureT = Future<Type>;
|
||||
|
||||
private:
|
||||
Future<Version> getReadVersion(uint32_t flags);
|
||||
|
||||
template <class GetKeyValuesFamilyRequest, class GetKeyValuesFamilyReply>
|
||||
Future<RangeResult> getRangeInternal(const KeySelector& begin,
|
||||
const KeySelector& end,
|
||||
|
@ -558,8 +559,6 @@ private:
|
|||
|
||||
double backoff;
|
||||
CommitTransactionRequest tr;
|
||||
Future<Version> readVersion;
|
||||
Promise<Optional<Value>> metadataVersion;
|
||||
std::vector<Future<std::pair<Key, Key>>> extraConflictRanges;
|
||||
Promise<Void> commitResult;
|
||||
Future<Void> committing;
|
||||
|
|
|
@ -69,11 +69,12 @@ class ReadYourWritesTransaction final : NonCopyable,
|
|||
public ISingleThreadTransaction,
|
||||
public FastAllocated<ReadYourWritesTransaction> {
|
||||
public:
|
||||
explicit ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenant = Optional<TenantName>());
|
||||
explicit ReadYourWritesTransaction(Database const& cx,
|
||||
Optional<Reference<Tenant>> const& tenant = Optional<Reference<Tenant>>());
|
||||
~ReadYourWritesTransaction();
|
||||
|
||||
void construct(Database const&) override;
|
||||
void construct(Database const&, TenantName const& tenant) override;
|
||||
void construct(Database const&, Reference<Tenant> const& tenant) override;
|
||||
void setVersion(Version v) override { tr.setVersion(v); }
|
||||
Future<Version> getReadVersion() override;
|
||||
Optional<Version> getCachedReadVersion() const override { return tr.getCachedReadVersion(); }
|
||||
|
|
|
@ -145,6 +145,7 @@ Future<std::pair<Optional<TenantMapEntry>, bool>> createTenantTransaction(
|
|||
throw invalid_tenant_group_name();
|
||||
}
|
||||
|
||||
tenantEntry.tenantName = name;
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
||||
state Future<Optional<TenantMapEntry>> existingEntryFuture = tryGetTenantTransaction(tr, name);
|
||||
|
|
|
@ -91,7 +91,7 @@ public: // Internal use only
|
|||
|
||||
class ThreadSafeTenant : public ITenant, ThreadSafeReferenceCounted<ThreadSafeTenant>, NonCopyable {
|
||||
public:
|
||||
ThreadSafeTenant(Reference<ThreadSafeDatabase> db, StringRef name) : db(db), name(name) {}
|
||||
ThreadSafeTenant(Reference<ThreadSafeDatabase> db, TenantName name) : db(db), name(name) {}
|
||||
~ThreadSafeTenant() override;
|
||||
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
|
@ -120,7 +120,7 @@ class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted<Th
|
|||
public:
|
||||
explicit ThreadSafeTransaction(DatabaseContext* cx,
|
||||
ISingleThreadTransaction::Type type,
|
||||
Optional<TenantName> tenant);
|
||||
Optional<TenantName> tenantName);
|
||||
~ThreadSafeTransaction() override;
|
||||
|
||||
// Note: used while refactoring fdbcli, need to be removed later
|
||||
|
|
|
@ -66,10 +66,10 @@ ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
|
|||
KeyRange range,
|
||||
Version beginVersion,
|
||||
Version readVersion,
|
||||
Optional<TenantName> tenantName) {
|
||||
Optional<Reference<Tenant>> tenant) {
|
||||
state RangeResult out;
|
||||
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
|
||||
state Transaction tr(cx, tenantName);
|
||||
state Transaction tr(cx, tenant);
|
||||
|
||||
loop {
|
||||
try {
|
||||
|
@ -83,7 +83,7 @@ ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
|
|||
}
|
||||
|
||||
for (const BlobGranuleChunkRef& chunk : chunks) {
|
||||
ASSERT(chunk.tenantPrefix.present() == tenantName.present());
|
||||
ASSERT(chunk.tenantPrefix.present() == tenant.present());
|
||||
RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, bstore));
|
||||
out.arena().dependsOn(chunkRows.arena());
|
||||
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
|
||||
|
@ -217,8 +217,8 @@ ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
|
|||
ACTOR Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> getSummaries(Database cx,
|
||||
KeyRange range,
|
||||
Version summaryVersion,
|
||||
Optional<TenantName> tenantName) {
|
||||
state Transaction tr(cx, tenantName);
|
||||
Optional<Reference<Tenant>> tenant) {
|
||||
state Transaction tr(cx, tenant);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<BlobGranuleSummaryRef>> summaries =
|
||||
|
@ -242,12 +242,12 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> getSummaries(Database
|
|||
|
||||
ACTOR Future<Void> validateGranuleSummaries(Database cx,
|
||||
KeyRange range,
|
||||
Optional<TenantName> tenantName,
|
||||
Optional<Reference<Tenant>> tenant,
|
||||
Promise<Void> testComplete) {
|
||||
state Arena lastSummaryArena;
|
||||
state KeyRangeMap<Optional<BlobGranuleSummaryRef>> lastSummary;
|
||||
state Version lastSummaryVersion = invalidVersion;
|
||||
state Transaction tr(cx, tenantName);
|
||||
state Transaction tr(cx, tenant);
|
||||
state int successCount = 0;
|
||||
try {
|
||||
loop {
|
||||
|
@ -266,7 +266,7 @@ ACTOR Future<Void> validateGranuleSummaries(Database cx,
|
|||
|
||||
state Standalone<VectorRef<BlobGranuleSummaryRef>> nextSummary;
|
||||
try {
|
||||
wait(store(nextSummary, getSummaries(cx, range, nextSummaryVersion, tenantName)));
|
||||
wait(store(nextSummary, getSummaries(cx, range, nextSummaryVersion, tenant)));
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_blob_granule_transaction_too_old) {
|
||||
ASSERT(lastSummaryVersion == invalidVersion);
|
||||
|
|
|
@ -999,9 +999,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
for (int t = 0; t < trs.size(); t++) {
|
||||
TenantInfo const& tenantInfo = trs[t].tenantInfo;
|
||||
int64_t tenantId = tenantInfo.tenantId;
|
||||
Optional<TenantNameRef> const& tenantName = tenantInfo.name;
|
||||
if (tenantId != TenantInfo::INVALID_TENANT) {
|
||||
ASSERT(tenantName.present());
|
||||
encryptDomainIds.emplace(tenantId);
|
||||
} else {
|
||||
// Optimization: avoid enumerating mutations if cluster only serves default encryption domains
|
||||
|
@ -2233,7 +2231,9 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
|
|||
|
||||
wait(commitData->validState.getFuture());
|
||||
|
||||
state Version minVersion = commitData->stats.lastCommitVersionAssigned + 1;
|
||||
state Version minVersion =
|
||||
req.minTenantVersion == latestVersion ? commitData->stats.lastCommitVersionAssigned + 1 : req.minTenantVersion;
|
||||
|
||||
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||
|
||||
bool validTenant = wait(checkTenant(commitData, req.tenant.tenantId, minVersion, "GetKeyServerLocation"));
|
||||
|
|
|
@ -149,7 +149,8 @@ public:
|
|||
state std::unordered_set<TenantName>::iterator iter = tenants.begin();
|
||||
for (; iter != tenants.end(); iter++) {
|
||||
state TenantName tenant = *iter;
|
||||
state ReadYourWritesTransaction tr(tenantCache->dbcx(), tenant);
|
||||
state ReadYourWritesTransaction tr(tenantCache->dbcx(),
|
||||
makeReference<Tenant>(tenantCache->dbcx(), tenant));
|
||||
loop {
|
||||
try {
|
||||
state int64_t size = wait(tr.getEstimatedRangeSizeBytes(normalKeys));
|
||||
|
|
|
@ -41,7 +41,7 @@ ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
|
|||
KeyRange range,
|
||||
Version beginVersion,
|
||||
Version readVersion,
|
||||
Optional<TenantName> tenantName = Optional<TenantName>());
|
||||
Optional<Reference<Tenant>> tenant = Optional<Reference<Tenant>>());
|
||||
|
||||
ACTOR Future<std::pair<RangeResult, Version>> readFromFDB(Database cx, KeyRange range);
|
||||
|
||||
|
@ -57,7 +57,7 @@ ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
|
|||
|
||||
ACTOR Future<Void> validateGranuleSummaries(Database cx,
|
||||
KeyRange range,
|
||||
Optional<TenantName> tenantName,
|
||||
Optional<Reference<Tenant>> tenantName,
|
||||
Promise<Void> testComplete);
|
||||
|
||||
ACTOR Future<Void> checkFeedCleanup(Database cx, bool debug);
|
||||
|
|
|
@ -37,7 +37,6 @@
|
|||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbrpc/TenantName.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
template <class T>
|
||||
|
@ -63,7 +62,7 @@ Future<bool> checkRangeSimpleValueSize(Database cx,
|
|||
T* workload,
|
||||
uint64_t begin,
|
||||
uint64_t end,
|
||||
Optional<TenantName> tenant) {
|
||||
Optional<Reference<Tenant>> tenant) {
|
||||
loop {
|
||||
state Transaction tr(cx, tenant);
|
||||
setAuthToken(*workload, tr);
|
||||
|
@ -83,10 +82,14 @@ Future<bool> checkRangeSimpleValueSize(Database cx,
|
|||
|
||||
// Returns true if the range was added
|
||||
ACTOR template <class T>
|
||||
Future<uint64_t> setupRange(Database cx, T* workload, uint64_t begin, uint64_t end, std::vector<TenantName> tenants) {
|
||||
Future<uint64_t> setupRange(Database cx,
|
||||
T* workload,
|
||||
uint64_t begin,
|
||||
uint64_t end,
|
||||
std::vector<Reference<Tenant>> tenants) {
|
||||
state uint64_t bytesInserted = 0;
|
||||
loop {
|
||||
Optional<TenantName> tenant;
|
||||
Optional<Reference<Tenant>> tenant;
|
||||
if (tenants.size() > 0) {
|
||||
tenant = deterministicRandom()->randomChoice(tenants);
|
||||
}
|
||||
|
@ -138,7 +141,7 @@ Future<uint64_t> setupRangeWorker(Database cx,
|
|||
double maxKeyInsertRate,
|
||||
int keySaveIncrement,
|
||||
int actorId,
|
||||
std::vector<TenantName> tenants) {
|
||||
std::vector<Reference<Tenant>> tenants) {
|
||||
state double nextStart;
|
||||
state uint64_t loadedRanges = 0;
|
||||
state int lastStoredKeysLoaded = 0;
|
||||
|
@ -152,7 +155,7 @@ Future<uint64_t> setupRangeWorker(Database cx,
|
|||
if (numBytes > 0)
|
||||
loadedRanges++;
|
||||
|
||||
Optional<TenantName> tenant;
|
||||
Optional<Reference<Tenant>> tenant;
|
||||
if (tenants.size() > 0) {
|
||||
tenant = deterministicRandom()->randomChoice(tenants);
|
||||
}
|
||||
|
@ -240,7 +243,7 @@ Future<Void> bulkSetup(Database cx,
|
|||
double keyCheckInterval = 0.1,
|
||||
uint64_t startNodeIdx = 0,
|
||||
uint64_t endNodeIdx = 0,
|
||||
std::vector<TenantName> tenants = std::vector<TenantName>()) {
|
||||
std::vector<Reference<Tenant>> tenants = std::vector<Reference<Tenant>>()) {
|
||||
|
||||
state std::vector<std::pair<uint64_t, uint64_t>> jobs;
|
||||
state uint64_t startNode = startNodeIdx ? startNodeIdx : (nodeCount * workload->clientId) / workload->clientCount;
|
||||
|
@ -258,7 +261,7 @@ Future<Void> bulkSetup(Database cx,
|
|||
// For bulk data schemes where the value of the key is not critical to operation, check to
|
||||
// see if the database has already been set up.
|
||||
if (valuesInconsequential) {
|
||||
Optional<TenantName> tenant;
|
||||
Optional<Reference<Tenant>> tenant;
|
||||
if (tenants.size() > 0) {
|
||||
tenant = deterministicRandom()->randomChoice(tenants);
|
||||
}
|
||||
|
|
|
@ -3652,7 +3652,7 @@ ACTOR Future<GetValueReqAndResultRef> quickGetValue(StorageServer* data,
|
|||
|
||||
++data->counters.quickGetValueMiss;
|
||||
if (SERVER_KNOBS->QUICK_GET_VALUE_FALLBACK) {
|
||||
state Transaction tr(data->cx, pOriginalReq->tenantInfo.name.castTo<TenantName>());
|
||||
state Transaction tr(data->cx, makeReference<Tenant>(pOriginalReq->tenantInfo.tenantId));
|
||||
tr.setVersion(version);
|
||||
// TODO: is DefaultPromiseEndpoint the best priority for this?
|
||||
tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint;
|
||||
|
@ -4317,7 +4317,7 @@ ACTOR Future<GetRangeReqAndResultRef> quickGetKeyValues(
|
|||
|
||||
++data->counters.quickGetKeyValuesMiss;
|
||||
if (SERVER_KNOBS->QUICK_GET_KEY_VALUES_FALLBACK) {
|
||||
state Transaction tr(data->cx, pOriginalReq->tenantInfo.name.castTo<TenantName>());
|
||||
state Transaction tr(data->cx, makeReference<Tenant>(pOriginalReq->tenantInfo.tenantId));
|
||||
tr.setVersion(version);
|
||||
if (pOriginalReq->options.present() && pOriginalReq->options.get().debugID.present()) {
|
||||
tr.debugTransaction(pOriginalReq->options.get().debugID.get());
|
||||
|
|
|
@ -45,8 +45,9 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
|
||||
std::vector<Future<Void>> clients;
|
||||
Arena arena;
|
||||
TenantName tenant;
|
||||
TenantName anotherTenant;
|
||||
Reference<Tenant> tenant;
|
||||
TenantName tenantName;
|
||||
TenantName anotherTenantName;
|
||||
Standalone<StringRef> signedToken;
|
||||
Standalone<StringRef> signedTokenAnotherTenant;
|
||||
Standalone<StringRef> tLogConfigKey;
|
||||
|
@ -62,15 +63,15 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
transactionsPerSecond = getOption(options, "transactionsPerSecond"_sr, 500.0) / clientCount;
|
||||
actorCount = getOption(options, "actorsPerClient"_sr, transactionsPerSecond / 5);
|
||||
tenant = getOption(options, "tenantA"_sr, "authzSecurityTestTenant"_sr);
|
||||
anotherTenant = getOption(options, "tenantB"_sr, "authzSecurityTestTenant"_sr);
|
||||
tenantName = getOption(options, "tenantA"_sr, "authzSecurityTestTenant"_sr);
|
||||
anotherTenantName = getOption(options, "tenantB"_sr, "authzSecurityTestTenant"_sr);
|
||||
tLogConfigKey = getOption(options, "tLogConfigKey"_sr, "TLogInterface"_sr);
|
||||
ASSERT(g_network->isSimulated());
|
||||
// make it comfortably longer than the timeout of the workload
|
||||
signedToken = g_simulator->makeToken(
|
||||
tenant, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
|
||||
tenantName, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
|
||||
signedTokenAnotherTenant = g_simulator->makeToken(
|
||||
anotherTenant, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
|
||||
anotherTenantName, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
|
||||
testFunctions.push_back(
|
||||
[this](Database cx) { return testCrossTenantGetDisallowed(this, cx, PositiveTestcase::True); });
|
||||
testFunctions.push_back(
|
||||
|
@ -84,7 +85,10 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
testFunctions.push_back([this](Database cx) { return testTLogReadDisallowed(this, cx); });
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
tenant = makeReference<Tenant>(cx, tenantName);
|
||||
return tenant->ready();
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
for (int c = 0; c < actorCount; c++)
|
||||
|
@ -119,7 +123,7 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<Version> setAndCommitKeyValueAndGetVersion(AuthzSecurityWorkload* self,
|
||||
Database cx,
|
||||
TenantName tenant,
|
||||
Reference<Tenant> tenant,
|
||||
Standalone<StringRef> token,
|
||||
StringRef key,
|
||||
StringRef value) {
|
||||
|
@ -136,22 +140,20 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<std::pair<KeyRangeLocationInfo, int64_t>> refreshAndGetCachedLocation(
|
||||
AuthzSecurityWorkload* self,
|
||||
Database cx,
|
||||
TenantName tenant,
|
||||
Standalone<StringRef> token,
|
||||
StringRef key) {
|
||||
ACTOR static Future<KeyRangeLocationInfo> refreshAndGetCachedLocation(AuthzSecurityWorkload* self,
|
||||
Database cx,
|
||||
Reference<Tenant> tenant,
|
||||
Standalone<StringRef> token,
|
||||
StringRef key) {
|
||||
state Transaction tr(cx, tenant);
|
||||
self->setAuthToken(tr, token);
|
||||
loop {
|
||||
try {
|
||||
// trigger GetKeyServerLocationsRequest and subsequent cache update
|
||||
wait(success(tr.get(key)));
|
||||
int64_t tenantId = wait(tr.getTenant().get()->idFuture);
|
||||
auto loc = cx->getCachedLocation(tr.trState->getTenantInfo(), key);
|
||||
if (loc.present()) {
|
||||
return std::make_pair(loc.get(), tenantId);
|
||||
return loc.get();
|
||||
} else {
|
||||
wait(delay(0.1));
|
||||
}
|
||||
|
@ -168,22 +170,22 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR static Future<Optional<Error>> tryGetValue(AuthzSecurityWorkload* self,
|
||||
TenantName tenant,
|
||||
Reference<Tenant> tenant,
|
||||
Version committedVersion,
|
||||
Standalone<StringRef> key,
|
||||
Optional<Standalone<StringRef>> expectedValue,
|
||||
Standalone<StringRef> token,
|
||||
Database cx,
|
||||
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant) {
|
||||
KeyRangeLocationInfo loc) {
|
||||
loop {
|
||||
GetValueRequest req;
|
||||
req.key = key;
|
||||
req.version = committedVersion;
|
||||
req.tenantInfo.tenantId = locationAndTenant.second;
|
||||
req.tenantInfo.name = tenant;
|
||||
req.tenantInfo.tenantId = tenant->id();
|
||||
req.tenantInfo.name = tenant->name.get();
|
||||
req.tenantInfo.token = token;
|
||||
try {
|
||||
GetValueReply reply = wait(loadBalance(locationAndTenant.first.locations->locations(),
|
||||
GetValueReply reply = wait(loadBalance(loc.locations->locations(),
|
||||
&StorageServerInterface::getValue,
|
||||
req,
|
||||
TaskPriority::DefaultPromiseEndpoint,
|
||||
|
@ -213,8 +215,7 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
state Version committedVersion =
|
||||
wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value));
|
||||
// refresh key location cache via get()
|
||||
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant =
|
||||
wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
|
||||
KeyRangeLocationInfo loc = wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
|
||||
if (positive) {
|
||||
// Supposed to succeed. Expected to occasionally fail because of buggify, faultInjection, or data
|
||||
// distribution, but should not return permission_denied
|
||||
|
@ -225,7 +226,7 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
value,
|
||||
self->signedToken /* passing correct token */,
|
||||
cx,
|
||||
locationAndTenant));
|
||||
loc));
|
||||
if (!outcome.present()) {
|
||||
++self->crossTenantGetPositive;
|
||||
} else if (outcome.get().code() == error_code_permission_denied) {
|
||||
|
@ -243,7 +244,7 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
value,
|
||||
self->signedTokenAnotherTenant /* deliberately passing bad token */,
|
||||
cx,
|
||||
locationAndTenant));
|
||||
loc));
|
||||
// Should always fail. Expected to return permission_denied, but expected to occasionally fail with
|
||||
// different errors
|
||||
if (!outcome.present()) {
|
||||
|
@ -259,21 +260,20 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR static Future<Optional<Error>> tryCommit(AuthzSecurityWorkload* self,
|
||||
TenantName tenant,
|
||||
Reference<Tenant> tenant,
|
||||
Standalone<StringRef> token,
|
||||
Key key,
|
||||
Value newValue,
|
||||
Version readVersion,
|
||||
Database cx,
|
||||
int64_t tenantId) {
|
||||
Database cx) {
|
||||
loop {
|
||||
state Key prefixedKey = key.withPrefix(TenantAPI::idToPrefix(tenantId));
|
||||
state Key prefixedKey = key.withPrefix(tenant->prefix());
|
||||
CommitTransactionRequest req;
|
||||
req.transaction.mutations.push_back(req.arena, MutationRef(MutationRef::SetValue, prefixedKey, newValue));
|
||||
req.transaction.read_snapshot = readVersion;
|
||||
req.tenantInfo.name = tenant;
|
||||
req.tenantInfo.name = tenant->name.get();
|
||||
req.tenantInfo.token = token;
|
||||
req.tenantInfo.tenantId = tenantId;
|
||||
req.tenantInfo.tenantId = tenant->id();
|
||||
try {
|
||||
CommitID reply = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
|
||||
&CommitProxyInterface::commit,
|
||||
|
@ -296,13 +296,10 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
state Value newValue = self->randomString();
|
||||
state Version committedVersion =
|
||||
wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value));
|
||||
// refresh key location cache to extract tenant prefix
|
||||
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant =
|
||||
wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
|
||||
if (positive) {
|
||||
// Expected to succeed, may occasionally fail
|
||||
Optional<Error> outcome = wait(tryCommit(
|
||||
self, self->tenant, self->signedToken, key, newValue, committedVersion, cx, locationAndTenant.second));
|
||||
Optional<Error> outcome =
|
||||
wait(tryCommit(self, self->tenant, self->signedToken, key, newValue, committedVersion, cx));
|
||||
if (!outcome.present()) {
|
||||
++self->crossTenantCommitPositive;
|
||||
} else if (outcome.get().code() == error_code_permission_denied) {
|
||||
|
@ -312,14 +309,8 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
.log();
|
||||
}
|
||||
} else {
|
||||
Optional<Error> outcome = wait(tryCommit(self,
|
||||
self->tenant,
|
||||
self->signedTokenAnotherTenant,
|
||||
key,
|
||||
newValue,
|
||||
committedVersion,
|
||||
cx,
|
||||
locationAndTenant.second));
|
||||
Optional<Error> outcome = wait(
|
||||
tryCommit(self, self->tenant, self->signedTokenAnotherTenant, key, newValue, committedVersion, cx));
|
||||
if (!outcome.present()) {
|
||||
TraceEvent(SevError, "AuthzSecurityError")
|
||||
.detail("Case", "CrossTenantGetDisallowed")
|
||||
|
|
|
@ -68,7 +68,8 @@ struct ThreadData : ReferenceCounted<ThreadData>, NonCopyable {
|
|||
int32_t directoryID;
|
||||
KeyRange directoryRange;
|
||||
TenantName tenantName;
|
||||
TenantMapEntry tenant;
|
||||
Reference<Tenant> tenant;
|
||||
TenantMapEntry tenantEntry;
|
||||
Reference<BlobConnectionProvider> bstore;
|
||||
|
||||
// key + value gen data
|
||||
|
@ -127,6 +128,8 @@ struct ThreadData : ReferenceCounted<ThreadData>, NonCopyable {
|
|||
}
|
||||
}
|
||||
|
||||
void openTenant(Database const& cx) { tenant = makeReference<Tenant>(cx, tenantName); }
|
||||
|
||||
// TODO could make keys variable length?
|
||||
Key getKey(uint32_t key, uint32_t id) {
|
||||
std::stringstream ss;
|
||||
|
@ -155,7 +158,7 @@ struct ThreadData : ReferenceCounted<ThreadData>, NonCopyable {
|
|||
if (t2.size() > 0 && t.getInt(0) != t2.getInt(0)) {
|
||||
if (t.size() > BGW_TUPLE_KEY_SIZE - SERVER_KNOBS->BG_KEY_TUPLE_TRUNCATE_OFFSET) {
|
||||
fmt::print("Tenant: {0}, K={1}, E={2}, LK={3}. {4} != {5}\n",
|
||||
tenant.prefix.printable(),
|
||||
tenantEntry.prefix.printable(),
|
||||
k.printable(),
|
||||
e.printable(),
|
||||
lastKey.printable(),
|
||||
|
@ -249,17 +252,18 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<TenantMapEntry> setUpTenant(Database cx, TenantName name) {
|
||||
ACTOR Future<TenantMapEntry> setUpTenant(Database cx, TenantName tenantName) {
|
||||
if (BGW_DEBUG) {
|
||||
fmt::print("Setting up blob granule range for tenant {0}\n", name.printable());
|
||||
fmt::print("Setting up blob granule range for tenant {0}\n", tenantName.printable());
|
||||
}
|
||||
|
||||
Optional<TenantMapEntry> entry = wait(TenantAPI::createTenant(cx.getReference(), name));
|
||||
Optional<TenantMapEntry> entry = wait(TenantAPI::createTenant(cx.getReference(), tenantName));
|
||||
ASSERT(entry.present());
|
||||
|
||||
if (BGW_DEBUG) {
|
||||
fmt::print(
|
||||
"Set up blob granule range for tenant {0}: {1}\n", name.printable(), entry.get().prefix.printable());
|
||||
fmt::print("Set up blob granule range for tenant {0}: {1}\n",
|
||||
tenantName.printable(),
|
||||
entry.get().prefix.printable());
|
||||
}
|
||||
|
||||
return entry.get();
|
||||
|
@ -284,11 +288,11 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
for (; directoryIdx < self->directories.size(); directoryIdx++) {
|
||||
// Set up the blob range first
|
||||
TenantMapEntry tenantEntry = wait(self->setUpTenant(cx, self->directories[directoryIdx]->tenantName));
|
||||
|
||||
self->directories[directoryIdx]->tenant = tenantEntry;
|
||||
self->directories[directoryIdx]->openTenant(cx);
|
||||
self->directories[directoryIdx]->tenantEntry = tenantEntry;
|
||||
self->directories[directoryIdx]->directoryRange =
|
||||
KeyRangeRef(tenantEntry.prefix, tenantEntry.prefix.withSuffix(normalKeys.end));
|
||||
tenants.push_back({ self->directories[directoryIdx]->tenantName, tenantEntry });
|
||||
tenants.push_back({ self->directories[directoryIdx]->tenant->name.get(), tenantEntry });
|
||||
bool _success = wait(cx->blobbifyRange(self->directories[directoryIdx]->directoryRange));
|
||||
ASSERT(_success);
|
||||
}
|
||||
|
@ -326,17 +330,12 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
bool doSetup) {
|
||||
// read entire keyspace at the start until granules for the entire thing are available
|
||||
loop {
|
||||
state Transaction tr(cx, threadData->tenantName);
|
||||
state Transaction tr(cx, threadData->tenant);
|
||||
try {
|
||||
Version rv = wait(self->doGrv(&tr));
|
||||
state Version readVersion = rv;
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
||||
wait(readFromBlob(cx,
|
||||
threadData->bstore,
|
||||
normalKeys /* tenant handles range */,
|
||||
0,
|
||||
readVersion,
|
||||
threadData->tenantName));
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob = wait(readFromBlob(
|
||||
cx, threadData->bstore, normalKeys /* tenant handles range */, 0, readVersion, threadData->tenant));
|
||||
fmt::print("Directory {0} got {1} RV {2}\n",
|
||||
threadData->directoryID,
|
||||
doSetup ? "initial" : "final",
|
||||
|
@ -720,8 +719,8 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
beginVersion = threadData->writeVersions[beginVersionIdx];
|
||||
}
|
||||
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob = wait(
|
||||
readFromBlob(cx, threadData->bstore, range, beginVersion, readVersion, threadData->tenantName));
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
|
||||
wait(readFromBlob(cx, threadData->bstore, range, beginVersion, readVersion, threadData->tenant));
|
||||
self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion);
|
||||
|
||||
int resultBytes = blob.first.expectedSize();
|
||||
|
@ -739,7 +738,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
range.begin.printable(),
|
||||
range.end.printable(),
|
||||
readVersion,
|
||||
threadData->tenantName.printable());
|
||||
printable(threadData->tenant->description()));
|
||||
}
|
||||
threadData->timeTravelTooOld++;
|
||||
} else {
|
||||
|
@ -770,7 +769,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
TraceEvent("BlobGranuleCorrectnessWriterReady").log();
|
||||
|
||||
loop {
|
||||
state Transaction tr(cx, threadData->tenantName);
|
||||
state Transaction tr(cx, threadData->tenant);
|
||||
|
||||
// pick rows to write and clear, generate values for writes
|
||||
state std::vector<std::tuple<uint32_t, uint32_t, uint32_t, uint16_t>> keyAndIdToWrite;
|
||||
|
@ -894,7 +893,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
for (auto& it : directories) {
|
||||
// Wait for blob worker to initialize snapshot before starting test for that range
|
||||
Future<Void> start = waitFirstSnapshot(this, cx, it, true);
|
||||
it->summaryClient = validateGranuleSummaries(cx, normalKeys, it->tenantName, it->triggerSummaryComplete);
|
||||
it->summaryClient = validateGranuleSummaries(cx, normalKeys, it->tenant, it->triggerSummaryComplete);
|
||||
clients.push_back(timeout(writeWorker(this, start, cx, it), testDuration, Void()));
|
||||
clients.push_back(timeout(readWorker(this, start, cx, it), testDuration, Void()));
|
||||
}
|
||||
|
@ -908,7 +907,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
// check that reading ranges with tenant name gives valid result of ranges just for tenant, with no tenant
|
||||
// prefix
|
||||
loop {
|
||||
state Transaction tr(cx, threadData->tenantName);
|
||||
state Transaction tr(cx, threadData->tenant);
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(normalKeys, 1000000));
|
||||
ASSERT(ranges.size() >= 1 && ranges.size() < 1000000);
|
||||
|
@ -949,7 +948,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion);
|
||||
}
|
||||
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob = wait(readFromBlob(
|
||||
cx, threadData->bstore, normalKeys /*tenant handles range*/, 0, readVersion, threadData->tenantName));
|
||||
cx, threadData->bstore, normalKeys /*tenant handles range*/, 0, readVersion, threadData->tenant));
|
||||
result = self->validateResult(threadData, blob, 0, std::numeric_limits<uint32_t>::max(), 0, readVersion);
|
||||
finalRowsValidated = blob.first.size();
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
Future<Void> unitClient;
|
||||
bool stopUnitClient;
|
||||
Optional<TenantName> tenantName;
|
||||
Optional<Reference<Tenant>> tenant;
|
||||
|
||||
int32_t nextKey;
|
||||
|
||||
|
@ -105,7 +106,9 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> registerNewRange(Database cx, BlobGranuleRangesWorkload* self, Optional<TenantName> tenantName) {
|
||||
ACTOR Future<Void> registerNewRange(Database cx,
|
||||
BlobGranuleRangesWorkload* self,
|
||||
Optional<Reference<Tenant>> alternateTenant) {
|
||||
std::string nextRangeKey = "R_" + self->newKey();
|
||||
state KeyRange range(KeyRangeRef(StringRef(nextRangeKey), strinc(StringRef(nextRangeKey))));
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -114,7 +117,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
// don't put in active ranges until AFTER set range command succeeds, to avoid checking a range that maybe
|
||||
// wasn't initialized
|
||||
bool success = wait(cx->blobbifyRange(range, tenantName.present() ? tenantName.get() : self->tenantName));
|
||||
bool success = wait(cx->blobbifyRange(range, alternateTenant.present() ? alternateTenant : self->tenant));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -125,9 +128,9 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Key> versionedForcePurge(Database cx, KeyRange range, Optional<TenantName> tenantName) {
|
||||
ACTOR Future<Key> versionedForcePurge(Database cx, KeyRange range, Optional<Reference<Tenant>> tenant) {
|
||||
Version rv = deterministicRandom()->coinflip() ? latestVersion : 1;
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, rv, tenantName, true));
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(range, rv, tenant, true));
|
||||
|
||||
return purgeKey;
|
||||
}
|
||||
|
@ -149,10 +152,10 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
range.begin.printable(),
|
||||
range.end.printable());
|
||||
}
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenant));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
}
|
||||
bool success = wait(cx->unblobbifyRange(range, self->tenantName));
|
||||
bool success = wait(cx->unblobbifyRange(range, self->tenant));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -182,6 +185,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
ACTOR Future<Void> _setup(Database cx, BlobGranuleRangesWorkload* self) {
|
||||
// create initial target ranges
|
||||
TraceEvent("BlobGranuleRangesSetup").detail("InitialRanges", self->targetRanges).log();
|
||||
|
||||
// set up blob granules
|
||||
wait(success(ManagementAPI::changeConfig(cx.getReference(), "blob_granules_enabled=1", true)));
|
||||
|
||||
|
@ -189,8 +193,10 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(success(ManagementAPI::changeConfig(cx.getReference(), "tenant_mode=optional_experimental", true)));
|
||||
wait(success(self->setupTenant(cx, self->tenantName.get())));
|
||||
|
||||
self->tenant = makeReference<Tenant>(cx, self->tenantName.get());
|
||||
|
||||
try {
|
||||
wait(self->registerNewRange(cx, self, "BogusTenant"_sr));
|
||||
wait(self->registerNewRange(cx, self, makeReference<Tenant>(cx, "BogusTenant"_sr)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_tenant_not_found) {
|
||||
|
@ -224,19 +230,19 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
return _check(cx, this);
|
||||
}
|
||||
|
||||
ACTOR Future<bool> isRangeActive(Database cx, KeyRange range, Optional<TenantName> tenantName) {
|
||||
ACTOR Future<bool> isRangeActive(Database cx, KeyRange range, Optional<Reference<Tenant>> tenant) {
|
||||
Optional<Version> rv;
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
rv = latestVersion;
|
||||
}
|
||||
state Version v = wait(cx->verifyBlobRange(range, rv, tenantName));
|
||||
state Version v = wait(cx->verifyBlobRange(range, rv, tenant));
|
||||
return v != invalidVersion;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkRange(Database cx, BlobGranuleRangesWorkload* self, KeyRange range, bool isActive) {
|
||||
// Check that a read completes for the range. If not loop around and try again
|
||||
loop {
|
||||
bool completed = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
bool completed = wait(self->isRangeActive(cx, range, self->tenant));
|
||||
|
||||
if (completed == isActive) {
|
||||
break;
|
||||
|
@ -252,8 +258,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(delay(1.0));
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges = wait(cx->listBlobbifiedRanges(range, 1000000, self->tenant));
|
||||
if (isActive) {
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0].begin <= range.begin);
|
||||
|
@ -262,7 +267,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
ASSERT(blobRanges.empty());
|
||||
}
|
||||
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
state Transaction tr(cx, self->tenant);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(range, 1000000));
|
||||
|
@ -343,9 +348,9 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// tear down range at end
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenant));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
bool success = wait(cx->unblobbifyRange(range, self->tenantName));
|
||||
bool success = wait(cx->unblobbifyRange(range, self->tenant));
|
||||
ASSERT(success);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
|
@ -362,35 +367,35 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
if (BGRW_DEBUG) {
|
||||
fmt::print("VerifyRangeUnit: [{0} - {1})\n", range.begin.printable(), range.end.printable());
|
||||
}
|
||||
bool setSuccess = wait(cx->blobbifyRange(activeRange, self->tenantName));
|
||||
bool setSuccess = wait(cx->blobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
bool success1 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, middleKey), self->tenantName));
|
||||
bool success1 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, middleKey), self->tenant));
|
||||
ASSERT(success1);
|
||||
|
||||
bool success2 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, activeRange.end), self->tenantName));
|
||||
bool success2 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, activeRange.end), self->tenant));
|
||||
ASSERT(success2);
|
||||
|
||||
bool fail1 = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
bool fail1 = wait(self->isRangeActive(cx, range, self->tenant));
|
||||
ASSERT(!fail1);
|
||||
|
||||
bool fail2 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.begin), self->tenantName));
|
||||
bool fail2 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.begin), self->tenant));
|
||||
ASSERT(!fail2);
|
||||
|
||||
bool fail3 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.end, range.end), self->tenantName));
|
||||
bool fail3 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.end, range.end), self->tenant));
|
||||
ASSERT(!fail3);
|
||||
|
||||
bool fail4 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, middleKey), self->tenantName));
|
||||
bool fail4 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, middleKey), self->tenant));
|
||||
ASSERT(!fail4);
|
||||
|
||||
bool fail5 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, range.end), self->tenantName));
|
||||
bool fail5 = wait(self->isRangeActive(cx, KeyRangeRef(middleKey, range.end), self->tenant));
|
||||
ASSERT(!fail5);
|
||||
|
||||
bool fail6 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.end), self->tenantName));
|
||||
bool fail6 = wait(self->isRangeActive(cx, KeyRangeRef(range.begin, activeRange.end), self->tenant));
|
||||
ASSERT(!fail6);
|
||||
|
||||
bool fail7 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, range.end), self->tenantName));
|
||||
bool fail7 = wait(self->isRangeActive(cx, KeyRangeRef(activeRange.begin, range.end), self->tenant));
|
||||
ASSERT(!fail7);
|
||||
|
||||
wait(self->tearDownRangeAfterUnit(cx, self, activeRange));
|
||||
|
@ -415,7 +420,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
for (i = 0; i < rangeCount; i++) {
|
||||
state KeyRange subRange(KeyRangeRef(boundaries[i], boundaries[i + 1]));
|
||||
if (i != rangeToNotBlobbify) {
|
||||
bool setSuccess = wait(cx->blobbifyRange(subRange, self->tenantName));
|
||||
bool setSuccess = wait(cx->blobbifyRange(subRange, self->tenant));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, subRange, true));
|
||||
} else {
|
||||
|
@ -423,7 +428,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
bool success = wait(self->isRangeActive(cx, range, self->tenantName));
|
||||
bool success = wait(self->isRangeActive(cx, range, self->tenant));
|
||||
ASSERT(!success);
|
||||
|
||||
if (rangeToNotBlobbify != 0) {
|
||||
|
@ -442,11 +447,11 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
KeyRange expectedRange,
|
||||
KeyRange queryRange) {
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(queryRange, 1000000, self->tenantName));
|
||||
wait(cx->listBlobbifiedRanges(queryRange, 1000000, self->tenant));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == expectedRange);
|
||||
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
state Transaction tr(cx, self->tenant);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(queryRange, 1000000));
|
||||
|
@ -462,7 +467,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> rangesMisalignedUnit(Database cx, BlobGranuleRangesWorkload* self, KeyRange range) {
|
||||
bool setSuccess = wait(cx->blobbifyRange(range, self->tenantName));
|
||||
bool setSuccess = wait(cx->blobbifyRange(range, self->tenant));
|
||||
ASSERT(setSuccess);
|
||||
state KeyRange subRange(KeyRangeRef(range.begin.withSuffix("A"_sr), range.begin.withSuffix("B"_sr)));
|
||||
|
||||
|
@ -477,7 +482,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
wait(self->checkRangesMisaligned(cx, self, range, KeyRangeRef(subRange.begin, range.end)));
|
||||
|
||||
try {
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenantName, false)));
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenant, false)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
@ -487,7 +492,7 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
try {
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenantName, true)));
|
||||
wait(success(cx->purgeBlobGranules(subRange, 1, self->tenant, true)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
|
@ -515,51 +520,51 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
// unblobbifying range that already doesn't exist should be no-op
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
bool unblobbifyStartSuccess = wait(cx->blobbifyRange(activeRange, self->tenantName));
|
||||
bool unblobbifyStartSuccess = wait(cx->blobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(unblobbifyStartSuccess);
|
||||
}
|
||||
|
||||
bool success = wait(cx->blobbifyRange(activeRange, self->tenantName));
|
||||
bool success = wait(cx->blobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(success);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
// check that re-blobbifying same range is successful
|
||||
bool retrySuccess = wait(cx->blobbifyRange(activeRange, self->tenantName));
|
||||
bool retrySuccess = wait(cx->blobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(retrySuccess);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
||||
// check that blobbifying range that overlaps but does not match existing blob range fails
|
||||
bool fail1 = wait(cx->blobbifyRange(range, self->tenantName));
|
||||
bool fail1 = wait(cx->blobbifyRange(range, self->tenant));
|
||||
ASSERT(!fail1);
|
||||
|
||||
bool fail2 = wait(cx->blobbifyRange(KeyRangeRef(range.begin, activeRange.end), self->tenantName));
|
||||
bool fail2 = wait(cx->blobbifyRange(KeyRangeRef(range.begin, activeRange.end), self->tenant));
|
||||
ASSERT(!fail2);
|
||||
|
||||
bool fail3 = wait(cx->blobbifyRange(KeyRangeRef(activeRange.begin, range.end), self->tenantName));
|
||||
bool fail3 = wait(cx->blobbifyRange(KeyRangeRef(activeRange.begin, range.end), self->tenant));
|
||||
ASSERT(!fail3);
|
||||
|
||||
bool fail4 = wait(cx->blobbifyRange(KeyRangeRef(range.begin, middleKey), self->tenantName));
|
||||
bool fail4 = wait(cx->blobbifyRange(KeyRangeRef(range.begin, middleKey), self->tenant));
|
||||
ASSERT(!fail4);
|
||||
|
||||
bool fail5 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, range.end), self->tenantName));
|
||||
bool fail5 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, range.end), self->tenant));
|
||||
ASSERT(!fail5);
|
||||
|
||||
bool fail6 = wait(cx->blobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenantName));
|
||||
bool fail6 = wait(cx->blobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenant));
|
||||
ASSERT(!fail6);
|
||||
|
||||
bool fail7 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenantName));
|
||||
bool fail7 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenant));
|
||||
ASSERT(!fail7);
|
||||
|
||||
bool fail8 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, middleKey2), self->tenantName));
|
||||
bool fail8 = wait(cx->blobbifyRange(KeyRangeRef(middleKey, middleKey2), self->tenant));
|
||||
ASSERT(!fail8);
|
||||
|
||||
{
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenant));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == activeRange);
|
||||
|
||||
state Transaction tr(cx, self->tenantName);
|
||||
state Transaction tr(cx, self->tenant);
|
||||
loop {
|
||||
try {
|
||||
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(range, 1000000));
|
||||
|
@ -573,12 +578,12 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
state Version purgeVersion = deterministicRandom()->coinflip() ? latestVersion : 1;
|
||||
state KeyRangeRef purgeRange = deterministicRandom()->coinflip() ? activeRange : range;
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(purgeRange, purgeVersion, self->tenantName, true));
|
||||
Key purgeKey = wait(cx->purgeBlobGranules(purgeRange, purgeVersion, self->tenant, true));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
// force purge again and ensure it is idempotent
|
||||
Key purgeKeyAgain = wait(cx->purgeBlobGranules(purgeRange, purgeVersion, self->tenantName, true));
|
||||
Key purgeKeyAgain = wait(cx->purgeBlobGranules(purgeRange, purgeVersion, self->tenant, true));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKeyAgain));
|
||||
}
|
||||
}
|
||||
|
@ -586,42 +591,38 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
// Check that the blob range is still listed
|
||||
{
|
||||
Standalone<VectorRef<KeyRangeRef>> blobRanges =
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenantName));
|
||||
wait(cx->listBlobbifiedRanges(range, 1000000, self->tenant));
|
||||
ASSERT(blobRanges.size() == 1);
|
||||
ASSERT(blobRanges[0] == activeRange);
|
||||
|
||||
bool unblobbifyFail1 = wait(cx->unblobbifyRange(range, self->tenantName));
|
||||
bool unblobbifyFail1 = wait(cx->unblobbifyRange(range, self->tenant));
|
||||
ASSERT(!unblobbifyFail1);
|
||||
|
||||
bool unblobbifyFail2 =
|
||||
wait(cx->unblobbifyRange(KeyRangeRef(range.begin, activeRange.end), self->tenantName));
|
||||
bool unblobbifyFail2 = wait(cx->unblobbifyRange(KeyRangeRef(range.begin, activeRange.end), self->tenant));
|
||||
ASSERT(!unblobbifyFail2);
|
||||
|
||||
bool unblobbifyFail3 =
|
||||
wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, range.end), self->tenantName));
|
||||
bool unblobbifyFail3 = wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, range.end), self->tenant));
|
||||
ASSERT(!unblobbifyFail3);
|
||||
|
||||
bool unblobbifyFail4 =
|
||||
wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenantName));
|
||||
bool unblobbifyFail4 = wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenant));
|
||||
ASSERT(!unblobbifyFail4);
|
||||
|
||||
bool unblobbifyFail5 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenantName));
|
||||
bool unblobbifyFail5 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenant));
|
||||
ASSERT(!unblobbifyFail5);
|
||||
|
||||
bool unblobbifyFail6 =
|
||||
wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenantName));
|
||||
bool unblobbifyFail6 = wait(cx->unblobbifyRange(KeyRangeRef(activeRange.begin, middleKey), self->tenant));
|
||||
ASSERT(!unblobbifyFail6);
|
||||
|
||||
bool unblobbifyFail7 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenantName));
|
||||
bool unblobbifyFail7 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, activeRange.end), self->tenant));
|
||||
ASSERT(!unblobbifyFail7);
|
||||
|
||||
bool unblobbifyFail8 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, middleKey2), self->tenantName));
|
||||
bool unblobbifyFail8 = wait(cx->unblobbifyRange(KeyRangeRef(middleKey, middleKey2), self->tenant));
|
||||
ASSERT(!unblobbifyFail8);
|
||||
|
||||
bool unblobbifySuccess = wait(cx->unblobbifyRange(activeRange, self->tenantName));
|
||||
bool unblobbifySuccess = wait(cx->unblobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(unblobbifySuccess);
|
||||
|
||||
bool unblobbifySuccessAgain = wait(cx->unblobbifyRange(activeRange, self->tenantName));
|
||||
bool unblobbifySuccessAgain = wait(cx->unblobbifyRange(activeRange, self->tenant));
|
||||
ASSERT(unblobbifySuccessAgain);
|
||||
}
|
||||
|
||||
|
@ -629,20 +630,20 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> reBlobbifyUnit(Database cx, BlobGranuleRangesWorkload* self, KeyRange range) {
|
||||
bool setSuccess = wait(cx->blobbifyRange(range, self->tenantName));
|
||||
bool setSuccess = wait(cx->blobbifyRange(range, self->tenant));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, range, true));
|
||||
|
||||
// force purge range
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenantName));
|
||||
Key purgeKey = wait(self->versionedForcePurge(cx, range, self->tenant));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
wait(self->checkRange(cx, self, range, false));
|
||||
|
||||
bool unsetSuccess = wait(cx->unblobbifyRange(range, self->tenantName));
|
||||
bool unsetSuccess = wait(cx->unblobbifyRange(range, self->tenant));
|
||||
ASSERT(unsetSuccess);
|
||||
wait(self->checkRange(cx, self, range, false));
|
||||
|
||||
bool reSetSuccess = wait(cx->blobbifyRange(range, self->tenantName));
|
||||
bool reSetSuccess = wait(cx->blobbifyRange(range, self->tenant));
|
||||
ASSERT(reSetSuccess);
|
||||
wait(self->checkRange(cx, self, range, true));
|
||||
|
||||
|
@ -658,21 +659,21 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
state KeyRange range2(KeyRangeRef(midKey, range.end));
|
||||
|
||||
state bool setSuccess = false;
|
||||
wait(store(setSuccess, cx->blobbifyRange(range1, self->tenantName)));
|
||||
wait(store(setSuccess, cx->blobbifyRange(range1, self->tenant)));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, range1, true));
|
||||
wait(store(setSuccess, cx->blobbifyRange(range2, self->tenantName)));
|
||||
wait(store(setSuccess, cx->blobbifyRange(range2, self->tenant)));
|
||||
ASSERT(setSuccess);
|
||||
wait(self->checkRange(cx, self, range2, true));
|
||||
|
||||
// force purge range
|
||||
state Key purgeKey;
|
||||
wait(store(purgeKey, self->versionedForcePurge(cx, range1, self->tenantName)));
|
||||
wait(store(purgeKey, self->versionedForcePurge(cx, range1, self->tenant)));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
wait(store(purgeKey, self->versionedForcePurge(cx, range2, self->tenantName)));
|
||||
wait(store(purgeKey, self->versionedForcePurge(cx, range2, self->tenant)));
|
||||
wait(cx->waitPurgeGranulesComplete(purgeKey));
|
||||
|
||||
bool unsetSuccess = wait(cx->unblobbifyRange(range, self->tenantName));
|
||||
bool unsetSuccess = wait(cx->unblobbifyRange(range, self->tenant));
|
||||
ASSERT(unsetSuccess);
|
||||
|
||||
wait(self->tearDownRangeAfterUnit(cx, self, range));
|
||||
|
|
|
@ -36,7 +36,7 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
Key keyPrefix;
|
||||
double maxNumTenants;
|
||||
double minNumTenants;
|
||||
std::vector<TenantName> tenantNames;
|
||||
std::vector<Reference<Tenant>> tenants;
|
||||
bool deleteTenants;
|
||||
double testDuration;
|
||||
|
||||
|
@ -65,19 +65,19 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
state int numTenantsToCreate =
|
||||
deterministicRandom()->randomInt(workload->minNumTenants, workload->maxNumTenants + 1);
|
||||
TraceEvent("BulkSetupTenantCreation").detail("NumTenants", numTenantsToCreate);
|
||||
|
||||
if (numTenantsToCreate > 0) {
|
||||
std::vector<Future<Void>> tenantFutures;
|
||||
state std::vector<TenantName> tenantNames;
|
||||
state std::vector<Future<Optional<TenantMapEntry>>> tenantFutures;
|
||||
for (int i = 0; i < numTenantsToCreate; i++) {
|
||||
TenantMapEntry entry;
|
||||
tenantNames.push_back(TenantName(format("BulkSetupTenant_%04d", i)));
|
||||
TraceEvent("CreatingTenant")
|
||||
.detail("Tenant", tenantNames.back())
|
||||
.detail("TenantGroup", entry.tenantGroup);
|
||||
tenantFutures.push_back(success(TenantAPI::createTenant(cx.getReference(), tenantNames.back())));
|
||||
TenantName tenantName = TenantNameRef(format("BulkSetupTenant_%04d", i));
|
||||
TraceEvent("CreatingTenant").detail("Tenant", tenantName);
|
||||
tenantFutures.push_back(TenantAPI::createTenant(cx.getReference(), tenantName));
|
||||
}
|
||||
wait(waitForAll(tenantFutures));
|
||||
workload->tenantNames = tenantNames;
|
||||
for (auto& f : tenantFutures) {
|
||||
ASSERT(f.get().present());
|
||||
workload->tenants.push_back(makeReference<Tenant>(f.get().get().id, f.get().get().tenantName));
|
||||
}
|
||||
}
|
||||
wait(bulkSetup(cx,
|
||||
workload,
|
||||
|
@ -92,24 +92,19 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
0.1,
|
||||
0,
|
||||
0,
|
||||
workload->tenantNames));
|
||||
workload->tenants));
|
||||
|
||||
// We want to ensure that tenant deletion happens before the restore phase starts
|
||||
if (workload->deleteTenants) {
|
||||
state Reference<TenantEntryCache<Void>> tenantCache =
|
||||
makeReference<TenantEntryCache<Void>>(cx, TenantEntryCacheRefreshMode::WATCH);
|
||||
wait(tenantCache->init());
|
||||
state int numTenantsToDelete = deterministicRandom()->randomInt(0, workload->tenantNames.size() + 1);
|
||||
state int numTenantsToDelete = deterministicRandom()->randomInt(0, workload->tenants.size() + 1);
|
||||
if (numTenantsToDelete > 0) {
|
||||
state int i;
|
||||
for (i = 0; i < numTenantsToDelete; i++) {
|
||||
state TenantName tenant = deterministicRandom()->randomChoice(workload->tenantNames);
|
||||
Optional<TenantEntryCachePayload<Void>> payload = wait(tenantCache->getByName(tenant));
|
||||
ASSERT(payload.present());
|
||||
state int64_t tenantId = payload.get().entry.id;
|
||||
state int tenantIndex = deterministicRandom()->randomInt(0, workload->tenants.size());
|
||||
state Reference<Tenant> tenant = workload->tenants[tenantIndex];
|
||||
TraceEvent("BulkSetupTenantDeletionClearing")
|
||||
.detail("TenantName", tenant)
|
||||
.detail("TenantId", tenantId)
|
||||
.detail("TotalNumTenants", workload->tenantNames.size());
|
||||
.detail("Tenant", tenant)
|
||||
.detail("TotalNumTenants", workload->tenants.size());
|
||||
// clear the tenant
|
||||
state ReadYourWritesTransaction tr = ReadYourWritesTransaction(cx, tenant);
|
||||
loop {
|
||||
|
@ -122,17 +117,12 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
// delete the tenant
|
||||
wait(success(TenantAPI::deleteTenant(cx.getReference(), tenant)));
|
||||
for (auto it = workload->tenantNames.begin(); it != workload->tenantNames.end(); it++) {
|
||||
if (*it == tenant) {
|
||||
workload->tenantNames.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
wait(success(TenantAPI::deleteTenant(cx.getReference(), tenant->name.get(), tenant->id())));
|
||||
workload->tenants.erase(workload->tenants.begin() + tenantIndex);
|
||||
|
||||
TraceEvent("BulkSetupTenantDeletionDone")
|
||||
.detail("TenantName", tenant)
|
||||
.detail("TenantId", tenantId)
|
||||
.detail("TotalNumTenants", workload->tenantNames.size());
|
||||
.detail("Tenant", tenant)
|
||||
.detail("TotalNumTenants", workload->tenants.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,8 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
|
|||
double testDuration;
|
||||
Key keyPrefix;
|
||||
bool hasTenant;
|
||||
Optional<TenantName> tenant;
|
||||
Optional<TenantName> tenantName;
|
||||
Optional<Reference<Tenant>> tenant;
|
||||
bool checkOnly;
|
||||
|
||||
GetEstimatedRangeSizeWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
|
@ -45,7 +46,7 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
|
|||
nodeCount = getOption(options, "nodeCount"_sr, 10000);
|
||||
keyPrefix = unprintable(getOption(options, "keyPrefix"_sr, ""_sr).toString());
|
||||
hasTenant = hasOption(options, "tenant"_sr);
|
||||
tenant = hasTenant ? getOption(options, "tenant"_sr, "DefaultNeverUsed"_sr) : Optional<TenantName>();
|
||||
tenantName = hasTenant ? getOption(options, "tenant"_sr, "DefaultNeverUsed"_sr) : Optional<TenantName>();
|
||||
checkOnly = getOption(options, "checkOnly"_sr, false);
|
||||
}
|
||||
|
||||
|
@ -55,6 +56,7 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
|
|||
}
|
||||
// The following call to bulkSetup() assumes that we have a valid tenant.
|
||||
ASSERT(hasTenant);
|
||||
tenant = makeReference<Tenant>(cx, tenantName.get());
|
||||
// Use default values for arguments between (and including) postSetupWarming and endNodeIdx params.
|
||||
return bulkSetup(cx,
|
||||
this,
|
||||
|
@ -105,12 +107,12 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
|
|||
ACTOR static Future<int64_t> getSize(GetEstimatedRangeSizeWorkload* self, Database cx) {
|
||||
state ReadYourWritesTransaction tr(cx, self->tenant);
|
||||
state double totalDelay = 0.0;
|
||||
TraceEvent(SevDebug, "GetSizeStart").detail("Tenant", tr.getTenant());
|
||||
TraceEvent(SevDebug, "GetSizeStart").detail("Tenant", self->tenant);
|
||||
|
||||
loop {
|
||||
try {
|
||||
state int64_t size = wait(tr.getEstimatedRangeSizeBytes(normalKeys));
|
||||
TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", tr.getTenant()).detail("Size", size);
|
||||
TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", self->tenant).detail("Size", size);
|
||||
if (!sizeIsAsExpected(self, size) && totalDelay < 300.0) {
|
||||
totalDelay += 5.0;
|
||||
wait(delay(5.0));
|
||||
|
@ -118,7 +120,7 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
|
|||
return size;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", tr.getTenant());
|
||||
TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", self->tenant);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,17 +28,21 @@
|
|||
|
||||
struct LeakTLogInterfaceWorkload : TestWorkload {
|
||||
static constexpr auto NAME = "LeakTLogInterface";
|
||||
TenantName tenant;
|
||||
TenantName tenantName;
|
||||
Reference<Tenant> tenant;
|
||||
Standalone<StringRef> fieldName;
|
||||
double testDuration;
|
||||
|
||||
LeakTLogInterfaceWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
tenant = getOption(options, "tenant"_sr, "DefaultTenant"_sr);
|
||||
tenantName = getOption(options, "tenant"_sr, "DefaultTenant"_sr);
|
||||
fieldName = getOption(options, "key"_sr, "TLogInterface"_sr);
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return persistSerializedTLogInterface(this, cx); }
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
tenant = makeReference<Tenant>(cx, tenantName);
|
||||
return persistSerializedTLogInterface(this, cx);
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override { return timeout(updateLoop(this, cx), testDuration, Void()); }
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
|
|
@ -315,14 +315,17 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
|
|||
|
||||
ACTOR Future<Void> testSpecialKeySpaceErrors(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
|
||||
state Database cx = cx_->clone();
|
||||
state int64_t tenantId;
|
||||
try {
|
||||
wait(success(TenantAPI::createTenant(cx.getReference(), TenantName("foo"_sr))));
|
||||
Optional<TenantMapEntry> entry = wait(TenantAPI::createTenant(cx.getReference(), TenantName("foo"_sr)));
|
||||
ASSERT(entry.present());
|
||||
tenantId = entry.get().id;
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_tenant_already_exists || e.code() == error_code_actor_cancelled);
|
||||
}
|
||||
state Reference<ReadYourWritesTransaction> tx = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state Reference<ReadYourWritesTransaction> tenantTx =
|
||||
makeReference<ReadYourWritesTransaction>(cx, TenantName("foo"_sr));
|
||||
makeReference<ReadYourWritesTransaction>(cx, makeReference<Tenant>(tenantId));
|
||||
// Use new transactions that may use default tenant rather than re-use tx
|
||||
// This is because tx will reject raw access for later tests if default tenant is set
|
||||
state Reference<ReadYourWritesTransaction> defaultTx1 = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
|
|
@ -33,18 +33,23 @@
|
|||
struct StorageQuotaWorkload : TestWorkload {
|
||||
static constexpr auto NAME = "StorageQuota";
|
||||
TenantGroupName group;
|
||||
TenantName tenant;
|
||||
TenantName tenantName;
|
||||
Reference<Tenant> tenant;
|
||||
int nodeCount;
|
||||
TenantName emptyTenant;
|
||||
TenantName emptyTenantName;
|
||||
Reference<Tenant> emptyTenant;
|
||||
|
||||
StorageQuotaWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
group = getOption(options, "group"_sr, "DefaultGroup"_sr);
|
||||
tenant = getOption(options, "tenant"_sr, "DefaultTenant"_sr);
|
||||
tenantName = getOption(options, "tenant"_sr, "DefaultTenant"_sr);
|
||||
nodeCount = getOption(options, "nodeCount"_sr, 10000);
|
||||
emptyTenant = getOption(options, "emptyTenant"_sr, "DefaultTenant"_sr);
|
||||
emptyTenantName = getOption(options, "emptyTenant"_sr, "DefaultTenant"_sr);
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
tenant = makeReference<Tenant>(cx, tenantName);
|
||||
emptyTenant = makeReference<Tenant>(cx, emptyTenantName);
|
||||
|
||||
// Use default values for arguments between (and including) postSetupWarming and endNodeIdx params.
|
||||
return bulkSetup(cx,
|
||||
this,
|
||||
|
@ -72,8 +77,8 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
Standalone<KeyValueRef> operator()(int n) { return KeyValueRef(keyForIndex(n), value((n + 1) % nodeCount)); }
|
||||
|
||||
ACTOR Future<Void> _start(StorageQuotaWorkload* self, Database cx) {
|
||||
state TenantMapEntry entry1 = wait(TenantAPI::getTenant(cx.getReference(), self->tenant));
|
||||
state TenantMapEntry entry2 = wait(TenantAPI::getTenant(cx.getReference(), self->emptyTenant));
|
||||
state TenantMapEntry entry1 = wait(TenantAPI::getTenant(cx.getReference(), self->tenantName));
|
||||
state TenantMapEntry entry2 = wait(TenantAPI::getTenant(cx.getReference(), self->emptyTenantName));
|
||||
ASSERT(entry1.tenantGroup.present() && entry1.tenantGroup.get() == self->group &&
|
||||
entry2.tenantGroup.present() && entry2.tenantGroup.get() == self->group);
|
||||
|
||||
|
@ -116,8 +121,8 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<int64_t> getSize(Database cx, TenantName tenantName) {
|
||||
state ReadYourWritesTransaction tr(cx, tenantName);
|
||||
ACTOR static Future<int64_t> getSize(Database cx, Reference<Tenant> tenant) {
|
||||
state ReadYourWritesTransaction tr(cx, tenant);
|
||||
state double totalDelay = 0.0;
|
||||
state int64_t previousSize = -1;
|
||||
|
||||
|
@ -129,11 +134,11 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
totalDelay += 5.0;
|
||||
wait(delay(5.0));
|
||||
} else {
|
||||
TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", tr.getTenant().get()).detail("Size", size);
|
||||
TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", tenant).detail("Size", size);
|
||||
return size;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", tr.getTenant().get());
|
||||
TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", tenant);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
@ -179,7 +184,7 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<bool> tryWrite(StorageQuotaWorkload* self,
|
||||
Database cx,
|
||||
TenantName tenant,
|
||||
Reference<Tenant> tenant,
|
||||
bool bypassQuota,
|
||||
bool expectOk) {
|
||||
state int i;
|
||||
|
|
|
@ -50,13 +50,13 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
static constexpr auto NAME = "TenantManagement";
|
||||
|
||||
struct TenantData {
|
||||
int64_t id;
|
||||
Reference<Tenant> tenant;
|
||||
Optional<TenantGroupName> tenantGroup;
|
||||
bool empty;
|
||||
|
||||
TenantData() : id(-1), empty(true) {}
|
||||
TenantData() : empty(true) {}
|
||||
TenantData(int64_t id, Optional<TenantGroupName> tenantGroup, bool empty)
|
||||
: id(id), tenantGroup(tenantGroup), empty(empty) {}
|
||||
: tenant(makeReference<Tenant>(id)), tenantGroup(tenantGroup), empty(empty) {}
|
||||
};
|
||||
|
||||
struct TenantGroupData {
|
||||
|
@ -571,7 +571,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
// Randomly decide to insert a key into the tenant
|
||||
state bool insertData = deterministicRandom()->random01() < 0.5;
|
||||
if (insertData) {
|
||||
state Transaction insertTr(self->dataDb, tenantItr->first);
|
||||
state Transaction insertTr(self->dataDb, self->createdTenants[tenantItr->first].tenant);
|
||||
loop {
|
||||
try {
|
||||
// The value stored in the key will be the name of the tenant
|
||||
|
@ -656,7 +656,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// set a random watch on a tenant
|
||||
ACTOR static Future<Void> watchTenant(TenantManagementWorkload* self, TenantName tenant) {
|
||||
ACTOR static Future<Void> watchTenant(TenantManagementWorkload* self, Reference<Tenant> tenant) {
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->dataDb, tenant));
|
||||
try {
|
||||
|
@ -684,13 +684,13 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> clearTenantData(TenantManagementWorkload* self, TenantName tenant) {
|
||||
state Transaction clearTr(self->dataDb, tenant);
|
||||
ACTOR static Future<Void> clearTenantData(TenantManagementWorkload* self, TenantName tenantName) {
|
||||
state Transaction clearTr(self->dataDb, self->createdTenants[tenantName].tenant);
|
||||
loop {
|
||||
try {
|
||||
clearTr.clear(self->keyName);
|
||||
wait(clearTr.commit());
|
||||
auto itr = self->createdTenants.find(tenant);
|
||||
auto itr = self->createdTenants.find(tenantName);
|
||||
ASSERT(itr != self->createdTenants.end());
|
||||
itr->second.empty = true;
|
||||
return Void();
|
||||
|
@ -806,8 +806,9 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
|
||||
// watch the tenant to be deleted
|
||||
if (watchTenantCheck) {
|
||||
watchFutures.emplace_back(tenants[tenantIndex],
|
||||
errorOr(watchTenant(self, tenants[tenantIndex])));
|
||||
watchFutures.emplace_back(
|
||||
tenants[tenantIndex],
|
||||
errorOr(watchTenant(self, self->createdTenants[tenants[tenantIndex]].tenant)));
|
||||
}
|
||||
}
|
||||
// Otherwise, we will just report the current emptiness of the tenant
|
||||
|
@ -992,9 +993,9 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
|
||||
// Performs some validation on a tenant's contents
|
||||
ACTOR static Future<Void> checkTenantContents(TenantManagementWorkload* self,
|
||||
TenantName tenant,
|
||||
TenantName tenantName,
|
||||
TenantData tenantData) {
|
||||
state Transaction tr(self->dataDb, tenant);
|
||||
state Transaction tr(self->dataDb, self->createdTenants[tenantName].tenant);
|
||||
loop {
|
||||
try {
|
||||
// We only every store a single key in each tenant. Therefore we expect a range read of the entire
|
||||
|
@ -1010,7 +1011,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
else {
|
||||
ASSERT(result.size() == 1);
|
||||
ASSERT(result[0].key == self->keyName);
|
||||
ASSERT(result[0].value == tenant);
|
||||
ASSERT(result[0].value == tenantName);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -1115,7 +1116,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
// Get the tenant metadata and check that it matches our local state
|
||||
state TenantMapEntry entry = wait(getTenantImpl(tr, tenant, operationType, self));
|
||||
ASSERT(alreadyExists);
|
||||
ASSERT(entry.id == tenantData.id);
|
||||
ASSERT(entry.id == tenantData.tenant->id());
|
||||
ASSERT(entry.tenantGroup == tenantData.tenantGroup);
|
||||
wait(checkTenantContents(self, tenant, tenantData));
|
||||
return Void();
|
||||
|
@ -1258,7 +1259,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
TenantData tData = self->createdTenants[oldTenantName];
|
||||
self->createdTenants[newTenantName] = tData;
|
||||
self->createdTenants.erase(oldTenantName);
|
||||
state Transaction insertTr(self->dataDb, newTenantName);
|
||||
state Transaction insertTr(self->dataDb, tData.tenant);
|
||||
if (!tData.empty) {
|
||||
loop {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue