Add a tenant lookup interface and use it when starting transactions

This commit is contained in:
A.J. Beamon 2022-12-20 16:30:59 -08:00
parent 05bbb7c840
commit f999623bb1
27 changed files with 476 additions and 526 deletions

View File

@ -82,8 +82,7 @@ extern "C" DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_erro
code == error_code_grv_proxy_memory_limit_exceeded ||
code == error_code_commit_proxy_memory_limit_exceeded ||
code == error_code_batch_transaction_throttled || code == error_code_process_behind ||
code == error_code_tag_throttled || code == error_code_unknown_tenant ||
code == error_code_proxy_tag_throttled;
code == error_code_tag_throttled || code == error_code_proxy_tag_throttled;
}
return false;
}

View File

@ -174,8 +174,6 @@ FoundationDB may return the following error codes from API functions. If you nee
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| tenants_disabled | 2136| Tenants have been disabled in the cluster |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| unknown_tenant | 2137| Tenant is not available from this server |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| api_version_unset | 2200| API version is not set |
+-----------------------------------------------+-----+--------------------------------------------------------------------------------+
| api_version_already_set | 2201| API version may be set only once |

View File

@ -92,8 +92,6 @@ void ClientKnobs::initialize(Randomize randomize) {
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD, 60 );
init( LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL, 60 );
init( TENANT_CACHE_EVICTION_SIZE, 100000 );
init( TENANT_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_EVICTION_SIZE_SIM = 3;
init( GET_RANGE_SHARD_LIMIT, 2 );
init( WARM_RANGE_SHARD_LIMIT, 100 );

View File

@ -22,12 +22,12 @@
#include "fdbclient/Knobs.h"
#include "fdbclient/NativeAPI.actor.h"
KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, KeyRef prefix) {
if (prefix.empty()) {
KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, Optional<KeyRef> prefix) {
if (!prefix.present() || prefix.get().empty()) {
return range;
} else {
KeyRef begin = range.begin.startsWith(prefix) ? range.begin.removePrefix(prefix) : allKeys.begin;
KeyRef end = range.end.startsWith(prefix) ? range.end.removePrefix(prefix) : allKeys.end;
KeyRef begin = range.begin.startsWith(prefix.get()) ? range.begin.removePrefix(prefix.get()) : allKeys.begin;
KeyRef end = range.end.startsWith(prefix.get()) ? range.end.removePrefix(prefix.get()) : allKeys.end;
return KeyRangeRef(begin, end);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -488,6 +488,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
init( TENANT_ID_REQUEST_MAX_QUEUE_SIZE, 1e6 );
init( COMMIT_PROXY_LIVENESS_TIMEOUT, 20.0 );
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;

View File

@ -90,8 +90,6 @@ public:
int LOCATION_CACHE_EVICTION_SIZE_SIM;
double LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD;
double LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL;
int TENANT_CACHE_EVICTION_SIZE;
int TENANT_CACHE_EVICTION_SIZE_SIM;
int GET_RANGE_SHARD_LIMIT;
int WARM_RANGE_SHARD_LIMIT;

View File

@ -62,6 +62,7 @@ struct CommitProxyInterface {
RequestStream<struct ExclusionSafetyCheckRequest> exclusionSafetyCheckReq;
RequestStream<struct GetDDMetricsRequest> getDDMetrics;
PublicRequestStream<struct ExpireIdempotencyIdRequest> expireIdempotencyId;
PublicRequestStream<struct GetTenantIdRequest> getTenantId;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
@ -90,6 +91,7 @@ struct CommitProxyInterface {
getDDMetrics = RequestStream<struct GetDDMetricsRequest>(commit.getEndpoint().getAdjustedEndpoint(9));
expireIdempotencyId =
PublicRequestStream<struct ExpireIdempotencyIdRequest>(commit.getEndpoint().getAdjustedEndpoint(10));
getTenantId = PublicRequestStream<struct GetTenantIdRequest>(commit.getEndpoint().getAdjustedEndpoint(11));
}
}
@ -107,6 +109,7 @@ struct CommitProxyInterface {
streams.push_back(exclusionSafetyCheckReq.getReceiver());
streams.push_back(getDDMetrics.getReceiver());
streams.push_back(expireIdempotencyId.getReceiver());
streams.push_back(getTenantId.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
};
@ -361,10 +364,46 @@ struct GetReadVersionRequest : TimedRequest {
}
};
struct GetTenantIdReply {
constexpr static FileIdentifier file_identifier = 11441284;
int64_t tenantId = TenantInfo::INVALID_TENANT;
GetTenantIdReply() {}
GetTenantIdReply(int64_t tenantId) : tenantId(tenantId) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tenantId);
}
};
struct GetTenantIdRequest {
constexpr static FileIdentifier file_identifier = 11299717;
SpanContext spanContext;
TenantName tenantName;
ReplyPromise<GetTenantIdReply> reply;
// This version is used to specify the minimum metadata version a proxy must have in order to declare that
// a tenant is not present. If the metadata version is lower, the proxy must wait in case the tenant gets
// created. If latestVersion is specified, then the proxy will wait until it is sure that it has received
// updates from other proxies before answering.
Version minTenantVersion;
GetTenantIdRequest() : minTenantVersion(latestVersion) {}
GetTenantIdRequest(SpanContext spanContext, TenantNameRef const& tenantName, Version minTenantVersion)
: spanContext(spanContext), tenantName(tenantName), minTenantVersion(minTenantVersion) {}
bool verify() const { return true; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, spanContext, tenantName, minTenantVersion);
}
};
struct GetKeyServerLocationsReply {
constexpr static FileIdentifier file_identifier = 10636023;
Arena arena;
TenantMapEntry tenantEntry;
std::vector<std::pair<KeyRangeRef, std::vector<StorageServerInterface>>> results;
// if any storage servers in results have a TSS pair, that mapping is in here
@ -379,7 +418,7 @@ struct GetKeyServerLocationsReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, results, resultsTssMapping, tenantEntry, resultsTagMapping, arena);
serializer(ar, results, resultsTssMapping, resultsTagMapping, arena);
}
};

View File

@ -203,13 +203,11 @@ struct EndpointFailureInfo {
};
struct KeyRangeLocationInfo {
TenantMapEntry tenantEntry;
KeyRange range;
Reference<LocationInfo> locations;
KeyRangeLocationInfo() {}
KeyRangeLocationInfo(TenantMapEntry tenantEntry, KeyRange range, Reference<LocationInfo> locations)
: tenantEntry(tenantEntry), range(range), locations(locations) {}
KeyRangeLocationInfo(KeyRange range, Reference<LocationInfo> locations) : range(range), locations(locations) {}
};
struct OverlappingChangeFeedsInfo {
@ -260,22 +258,17 @@ public:
return cx;
}
Optional<KeyRangeLocationInfo> getCachedLocation(const Optional<TenantNameRef>& tenant,
Optional<KeyRangeLocationInfo> getCachedLocation(const TenantInfo& tenant,
const KeyRef&,
Reverse isBackward = Reverse::False);
bool getCachedLocations(const Optional<TenantNameRef>& tenant,
bool getCachedLocations(const TenantInfo& tenant,
const KeyRangeRef&,
std::vector<KeyRangeLocationInfo>&,
int limit,
Reverse reverse);
void cacheTenant(const TenantName& tenant, const TenantMapEntry& tenantEntry);
Reference<LocationInfo> setCachedLocation(const Optional<TenantNameRef>& tenant,
const TenantMapEntry& tenantEntry,
const KeyRangeRef&,
const std::vector<struct StorageServerInterface>&);
void invalidateCachedTenant(const TenantNameRef& tenant);
void invalidateCache(const KeyRef& tenantPrefix, const KeyRef& key, Reverse isBackward = Reverse::False);
void invalidateCache(const KeyRef& tenantPrefix, const KeyRangeRef& keys);
Reference<LocationInfo> setCachedLocation(const KeyRangeRef&, const std::vector<struct StorageServerInterface>&);
void invalidateCache(const Optional<KeyRef>& tenantPrefix, const KeyRef& key, Reverse isBackward = Reverse::False);
void invalidateCache(const Optional<KeyRef>& tenantPrefix, const KeyRangeRef& keys);
// Records that `endpoint` is failed on a healthy server.
void setFailedEndpointOnHealthyServer(const Endpoint& endpoint);
@ -496,10 +489,8 @@ public:
// Cache of location information
int locationCacheSize;
int tenantCacheSize;
CoalescedKeyRangeMap<Reference<LocationInfo>> locationCache;
std::unordered_map<Endpoint, EndpointFailureInfo> failedEndpointsOnHealthyServersInfo;
std::unordered_map<TenantName, TenantMapEntry> tenantCache;
std::map<UID, StorageServerInfo*> server_interf;
std::map<UID, BlobWorkerInterface> blobWorker_interf; // blob workers don't change endpoints for the same ID
@ -562,6 +553,8 @@ public:
Counter transactionKeyServerLocationRequests;
Counter transactionKeyServerLocationRequestsCompleted;
Counter transactionStatusRequests;
Counter transactionTenantLookupRequests;
Counter transactionTenantLookupRequestsCompleted;
Counter transactionsTooOld;
Counter transactionsFutureVersions;
Counter transactionsNotCommitted;

View File

@ -610,7 +610,7 @@ KeyRef keyBetween(const KeyRangeRef& keys);
// Returns a randomKey between keys. If it's impossible, return keys.end.
Key randomKeyBetween(const KeyRangeRef& keys);
KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, KeyRef prefix);
KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, Optional<KeyRef> prefix);
struct KeySelectorRef {
private:

View File

@ -237,6 +237,44 @@ struct Watch : public ReferenceCounted<Watch>, NonCopyable {
void setWatch(Future<Void> watchFuture);
};
class Tenant : public ReferenceCounted<Tenant> {
public:
Tenant(Future<int64_t> id, Optional<TenantName> name) : idFuture(id), name(name) {}
int64_t id() const {
ASSERT(idFuture.isReady());
return idFuture.get();
}
StringRef prefix() const {
ASSERT(idFuture.isReady());
if (bigEndianId == -1) {
bigEndianId = bigEndian64(idFuture.get());
}
return StringRef(reinterpret_cast<const uint8_t*>(&bigEndianId), TenantAPI::PREFIX_SIZE);
}
std::string description() const {
StringRef nameStr = name.castTo<TenantNameRef>().orDefault("<unspecified>"_sr);
if (idFuture.canGet()) {
return format("%*s (%lld)", nameStr.size(), nameStr.begin(), idFuture.get());
} else {
return format("%*s", nameStr.size(), nameStr.begin());
}
}
Future<int64_t> idFuture;
Optional<TenantName> name;
private:
mutable int64_t bigEndianId = -1;
};
template <>
struct Traceable<Tenant> : std::true_type {
static std::string toString(const Tenant& tenant) { return printable(tenant.description()); }
};
FDB_DECLARE_BOOLEAN_PARAM(AllowInvalidTenantID);
struct TransactionState : ReferenceCounted<TransactionState> {
@ -280,37 +318,24 @@ struct TransactionState : ReferenceCounted<TransactionState> {
// VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy
TransactionState(Database cx,
Optional<TenantName> tenant,
Optional<TenantName> tenantName,
TaskPriority taskID,
SpanContext spanContext,
Reference<TransactionLogInfo> trLogInfo);
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
TenantInfo getTenantInfo(AllowInvalidTenantID allowInvalidId = AllowInvalidTenantID::False);
TenantInfo getTenantInfo(AllowInvalidTenantID allowInvalidTenantId = AllowInvalidTenantID::False);
Optional<TenantName> const& tenant();
Optional<Reference<Tenant>> const& tenant();
bool hasTenant() const;
int64_t tenantId() const { return tenantId_; }
void trySetTenantId(int64_t tenantId) {
if (tenantId_ == TenantInfo::INVALID_TENANT) {
tenantId_ = tenantId;
}
}
Future<Void> handleUnknownTenant();
int64_t tenantId() const { return tenant_.present() ? tenant_.get()->id() : TenantInfo::INVALID_TENANT; }
private:
Optional<TenantName> tenant_;
int64_t tenantId_ = TenantInfo::INVALID_TENANT;
Optional<Reference<Tenant>> tenant_;
bool tenantSet;
};
class Tenant {
Future<int64_t> id;
TenantName name;
};
class Transaction : NonCopyable {
public:
explicit Transaction(Database const& cx, Optional<TenantName> const& tenant = Optional<TenantName>());
@ -507,7 +532,7 @@ public:
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
}
Optional<TenantName> getTenant() { return trState->tenant(); }
Optional<Reference<Tenant>> getTenant() { return trState->tenant(); }
Reference<TransactionState> trState;
std::vector<Reference<Watch>> watches;

View File

@ -212,7 +212,7 @@ public:
}
Transaction& getTransaction() { return tr; }
Optional<TenantName> getTenant() { return tr.getTenant(); }
Optional<Reference<Tenant>> getTenant() { return tr.getTenant(); }
TagSet const& getTags() const { return tr.getTags(); }
// used in template functions as returned Future type

View File

@ -403,6 +403,7 @@ public:
double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
int TENANT_ID_REQUEST_MAX_QUEUE_SIZE;
double COMMIT_PROXY_LIVENESS_TIMEOUT;
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;

View File

@ -3575,14 +3575,14 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
ASSERT(tenantEntry.get().id == req.tenantInfo.tenantId);
tenantPrefix = tenantEntry.get().prefix;
} else {
CODE_PROBE(true, "Blob worker unknown tenant");
CODE_PROBE(true, "Blob worker tenant not found");
// FIXME - better way. Wait on retry here, or just have better model for tenant metadata?
// Just throw wrong_shard_server and make the client retry and assume we load it later
TraceEvent(SevDebug, "BlobWorkerRequestUnknownTenant", bwData->id)
TraceEvent(SevDebug, "BlobWorkerRequestTenantNotFound", bwData->id)
.suppressFor(5.0)
.detail("TenantName", req.tenantInfo.name.get())
.detail("TenantId", req.tenantInfo.tenantId);
throw unknown_tenant();
throw tenant_not_found();
}
req.keyRange = KeyRangeRef(req.keyRange.begin.withPrefix(tenantPrefix.get(), req.arena),
req.keyRange.end.withPrefix(tenantPrefix.get(), req.arena));

View File

@ -251,28 +251,15 @@ struct ResolutionRequestBuilder {
}
};
ErrorOr<int64_t> lookupTenant(ProxyCommitData* commitData, TenantNameRef tenant, bool logOnFailure) {
auto itr = commitData->tenantNameIndex.find(tenant);
if (itr == commitData->tenantNameIndex.end()) {
if (logOnFailure) {
TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("TenantName", tenant);
}
return tenant_not_found();
}
return itr->second;
}
bool checkTenant(ProxyCommitData* commitData, int64_t tenant, Optional<TenantNameRef> tenantName) {
bool checkTenantNoWait(ProxyCommitData* commitData, int64_t tenant, const char* context, bool logOnFailure) {
if (tenant != TenantInfo::INVALID_TENANT) {
auto itr = commitData->tenantMap.find(tenant);
if (itr == commitData->tenantMap.end()) {
TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("Tenant", tenant);
return false;
} else if (tenantName.present() && itr->second != tenantName.get()) {
// This is temporary and will be removed when the client stops caching the tenant name -> ID mapping
TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("Tenant", tenant);
if (logOnFailure) {
TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid)
.detail("Tenant", tenant)
.detail("Context", context);
}
return false;
}
@ -282,6 +269,19 @@ bool checkTenant(ProxyCommitData* commitData, int64_t tenant, Optional<TenantNam
return true;
}
ACTOR Future<bool> checkTenant(ProxyCommitData* commitData, int64_t tenant, Version minVersion, const char* context) {
loop {
state Version currentVersion = commitData->version.get();
if (checkTenantNoWait(commitData, tenant, context, currentVersion >= minVersion)) {
return true;
} else if (currentVersion >= minVersion) {
return false;
} else {
wait(commitData->version.whenAtLeast(currentVersion + 1));
}
}
}
bool verifyTenantPrefix(ProxyCommitData* const commitData, const CommitTransactionRequest& req) {
if (req.tenantInfo.hasTenant()) {
KeyRef tenantPrefix = req.tenantInfo.prefix.get();
@ -1164,11 +1164,11 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
int t;
for (t = 0; t < trs.size() && !self->forceRecovery; t++) {
if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) {
bool isValid = checkTenant(pProxyCommitData, trs[t].tenantInfo.tenantId, trs[t].tenantInfo.name);
bool isValid = checkTenantNoWait(pProxyCommitData, trs[t].tenantInfo.tenantId, "Commit", true);
if (!isValid) {
self->committed[t] = ConflictBatch::TransactionTenantFailure;
trs[t].reply.sendError(unknown_tenant());
trs[t].reply.sendError(tenant_not_found());
} else {
self->commitCount++;
applyMetadataMutations(trs[t].spanContext,
@ -2108,10 +2108,8 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat
}
}
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
ACTOR static Future<Void> doTenantIdRequest(GetTenantIdRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -2124,37 +2122,75 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
? delay(SERVER_KNOBS->FUTURE_VERSION_DELAY)
: Never();
while (req.tenant.name.present() && tenantId.isError()) {
bool finalQuery = commitData->version.get() >= minTenantVersion;
tenantId = lookupTenant(commitData, req.tenant.name.get(), finalQuery);
if (tenantId.isError()) {
if (finalQuery) {
req.reply.sendError(tenantId.getError());
return Void();
} else {
choose {
// Wait until we are sure that we've received metadata updates through minTenantVersion
// If latestVersion is specified, this will wait until we have definitely received
// updates through the version at the time we received the request
when(wait(commitData->version.whenAtLeast(minTenantVersion))) {}
when(wait(futureVersionDelay)) {
req.reply.sendError(future_version());
return Void();
}
}
}
choose {
// Wait until we are sure that we've received metadata updates through minTenantVersion
// If latestVersion is specified, this will wait until we have definitely received
// updates through the version at the time we received the request
when(wait(commitData->version.whenAtLeast(minTenantVersion))) {}
when(wait(futureVersionDelay)) {
req.reply.sendError(future_version());
++commitData->stats.tenantIdRequestOut;
++commitData->stats.tenantIdRequestErrors;
return Void();
}
}
auto itr = commitData->tenantNameIndex.find(req.tenantName);
if (itr != commitData->tenantNameIndex.end()) {
req.reply.send(GetTenantIdReply(itr->second));
} else {
TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("TenantName", req.tenantName);
++commitData->stats.tenantIdRequestErrors;
req.reply.sendError(tenant_not_found());
}
++commitData->stats.tenantIdRequestOut;
return Void();
}
ACTOR static Future<Void> tenantIdServer(CommitProxyInterface proxy,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData) {
loop {
GetTenantIdRequest req = waitNext(proxy.getTenantId.getFuture());
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
if (commitData->stats.tenantIdRequestIn.getValue() - commitData->stats.tenantIdRequestOut.getValue() >
SERVER_KNOBS->TENANT_ID_REQUEST_MAX_QUEUE_SIZE) {
++commitData->stats.tenantIdRequestErrors;
req.reply.sendError(commit_proxy_memory_limit_exceeded());
TraceEvent(SevWarnAlways, "ProxyGetTenantRequestThresholdExceeded").suppressFor(60);
} else {
++commitData->stats.tenantIdRequestIn;
addActor.send(doTenantIdRequest(req, commitData));
}
}
}
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
wait(commitData->validState.getFuture());
state Version minVersion = commitData->stats.lastCommitVersionAssigned + 1;
wait(delay(0, TaskPriority::DefaultEndpoint));
bool validTenant = wait(checkTenant(commitData, req.tenant.tenantId, minVersion, "GetKeyServerLocation"));
if (!validTenant) {
++commitData->stats.keyServerLocationOut;
req.reply.sendError(tenant_not_found());
return Void();
}
std::unordered_set<UID> tssMappingsIncluded;
GetKeyServerLocationsReply rep;
if (req.tenant.name.present()) {
rep.tenantEntry = TenantMapEntry(tenantId.get(), req.tenant.name.get(), TenantState::READY);
req.begin = req.begin.withPrefix(rep.tenantEntry.prefix, req.arena);
if (req.tenant.hasTenant()) {
req.begin = req.begin.withPrefix(req.tenant.prefix.get(), req.arena);
if (req.end.present()) {
req.end = req.end.get().withPrefix(rep.tenantEntry.prefix, req.arena);
req.end = req.end.get().withPrefix(req.tenant.prefix.get(), req.arena);
}
}
@ -2936,6 +2972,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData));
addActor.send(tenantIdServer(proxy, addActor, &commitData));
addActor.send(readRequestServer(proxy, addActor, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(ddMetricsRequestServer(proxy, db));

View File

@ -587,10 +587,8 @@ Future<KeyRangeLocationInfo> MockGlobalState::getKeyLocation(TenantInfo tenant,
ASSERT_EQ(srcTeam.size(), 1);
rep.results.emplace_back(single, extractStorageServerInterfaces(srcTeam.front().servers));
return KeyRangeLocationInfo(
rep.tenantEntry,
KeyRange(toPrefixRelativeRange(rep.results[0].first, rep.tenantEntry.prefix), rep.arena),
buildLocationInfo(rep.results[0].second));
return KeyRangeLocationInfo(KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena),
buildLocationInfo(rep.results[0].second));
}
Future<std::vector<KeyRangeLocationInfo>> MockGlobalState::getKeyRangeLocations(
@ -622,8 +620,7 @@ Future<std::vector<KeyRangeLocationInfo>> MockGlobalState::getKeyRangeLocations(
std::vector<KeyRangeLocationInfo> results;
for (int shard = 0; shard < rep.results.size(); shard++) {
results.emplace_back(rep.tenantEntry,
(toPrefixRelativeRange(rep.results[shard].first, rep.tenantEntry.prefix) & keys),
results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys),
buildLocationInfo(rep.results[shard].second));
}
return results;

View File

@ -64,6 +64,9 @@ struct ProxyStats {
Counter mutations;
Counter conflictRanges;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Counter tenantIdRequestIn;
Counter tenantIdRequestOut;
Counter tenantIdRequestErrors;
Counter txnExpensiveClearCostEstCount;
Version lastCommitVersionAssigned;
@ -116,7 +119,8 @@ struct ProxyStats {
commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc),
mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc),
keyServerLocationErrors("KeyServerLocationErrors", cc),
keyServerLocationErrors("KeyServerLocationErrors", cc), tenantIdRequestIn("TenantIdRequestIn", cc),
tenantIdRequestOut("TenantIdRequestOut", cc), tenantIdRequestErrors("TenantIdRequestErrors", cc),
txnExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics",
id,

View File

@ -138,7 +138,7 @@ bool canReplyWith(Error e) {
case error_code_server_overloaded:
case error_code_change_feed_popped:
case error_code_tenant_name_required:
case error_code_unknown_tenant:
case error_code_tenant_not_found:
// getMappedRange related exceptions that are not retriable:
case error_code_mapper_bad_index:
case error_code_mapper_no_such_key:
@ -2015,12 +2015,7 @@ void StorageServer::checkTenantEntry(Version version, TenantInfo tenantInfo) {
TraceEvent(SevWarn, "StorageTenantNotFound", thisServerID)
.detail("Tenant", tenantInfo.tenantId)
.backtrace();
throw unknown_tenant();
}
// TENANT_TODO: this is temporary pending other changes to remove reliance on names
if (*itr != tenantInfo.name.get()) {
throw unknown_tenant();
throw tenant_not_found();
}
}
}

View File

@ -2679,6 +2679,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
DUMPTOKEN(recruited.getStorageServerRejoinInfo);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.txnState);
DUMPTOKEN(recruited.getTenantId);
// printf("Recruited as commitProxyServer\n");
errorForwarders.add(zombie(recruited,

View File

@ -136,21 +136,22 @@ struct AuthzSecurityWorkload : TestWorkload {
}
}
ACTOR static Future<KeyRangeLocationInfo> refreshAndGetCachedLocation(AuthzSecurityWorkload* self,
Database cx,
TenantName tenant,
Standalone<StringRef> token,
StringRef key) {
ACTOR static Future<std::pair<KeyRangeLocationInfo, int64_t>> refreshAndGetCachedLocation(
AuthzSecurityWorkload* self,
Database cx,
TenantName tenant,
Standalone<StringRef> token,
StringRef key) {
state Transaction tr(cx, tenant);
self->setAuthToken(tr, token);
loop {
try {
// trigger GetKeyServerLocationsRequest and subsequent cache update
Optional<Value> value = wait(tr.get(key));
(void)value;
auto loc = cx->getCachedLocation(tenant, key);
wait(success(tr.get(key)));
int64_t tenantId = wait(tr.getTenant().get()->idFuture);
auto loc = cx->getCachedLocation(tr.trState->getTenantInfo(), key);
if (loc.present()) {
return loc.get();
return std::make_pair(loc.get(), tenantId);
} else {
wait(delay(0.1));
}
@ -173,16 +174,16 @@ struct AuthzSecurityWorkload : TestWorkload {
Optional<Standalone<StringRef>> expectedValue,
Standalone<StringRef> token,
Database cx,
KeyRangeLocationInfo loc) {
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant) {
loop {
GetValueRequest req;
req.key = key;
req.version = committedVersion;
req.tenantInfo.tenantId = loc.tenantEntry.id;
req.tenantInfo.tenantId = locationAndTenant.second;
req.tenantInfo.name = tenant;
req.tenantInfo.token = token;
try {
GetValueReply reply = wait(loadBalance(loc.locations->locations(),
GetValueReply reply = wait(loadBalance(locationAndTenant.first.locations->locations(),
&StorageServerInterface::getValue,
req,
TaskPriority::DefaultPromiseEndpoint,
@ -212,7 +213,8 @@ struct AuthzSecurityWorkload : TestWorkload {
state Version committedVersion =
wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value));
// refresh key location cache via get()
KeyRangeLocationInfo loc = wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant =
wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
if (positive) {
// Supposed to succeed. Expected to occasionally fail because of buggify, faultInjection, or data
// distribution, but should not return permission_denied
@ -223,7 +225,7 @@ struct AuthzSecurityWorkload : TestWorkload {
value,
self->signedToken /* passing correct token */,
cx,
loc));
locationAndTenant));
if (!outcome.present()) {
++self->crossTenantGetPositive;
} else if (outcome.get().code() == error_code_permission_denied) {
@ -241,7 +243,7 @@ struct AuthzSecurityWorkload : TestWorkload {
value,
self->signedTokenAnotherTenant /* deliberately passing bad token */,
cx,
loc));
locationAndTenant));
// Should always fail. Expected to return permission_denied, but expected to occasionally fail with
// different errors
if (!outcome.present()) {
@ -263,17 +265,15 @@ struct AuthzSecurityWorkload : TestWorkload {
Value newValue,
Version readVersion,
Database cx,
KeyRangeLocationInfo loc) {
int64_t tenantId) {
loop {
auto const& tenantEntry = loc.tenantEntry;
ASSERT(!tenantEntry.prefix.empty());
state Key prefixedKey = key.withPrefix(tenantEntry.prefix);
state Key prefixedKey = key.withPrefix(TenantAPI::idToPrefix(tenantId));
CommitTransactionRequest req;
req.transaction.mutations.push_back(req.arena, MutationRef(MutationRef::SetValue, prefixedKey, newValue));
req.transaction.read_snapshot = readVersion;
req.tenantInfo.name = tenant;
req.tenantInfo.token = token;
req.tenantInfo.tenantId = tenantEntry.id;
req.tenantInfo.tenantId = tenantId;
try {
CommitID reply = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
&CommitProxyInterface::commit,
@ -297,11 +297,12 @@ struct AuthzSecurityWorkload : TestWorkload {
state Version committedVersion =
wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value));
// refresh key location cache to extract tenant prefix
KeyRangeLocationInfo loc = wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
std::pair<KeyRangeLocationInfo, int64_t> locationAndTenant =
wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key));
if (positive) {
// Expected to succeed, may occasionally fail
Optional<Error> outcome =
wait(tryCommit(self, self->tenant, self->signedToken, key, newValue, committedVersion, cx, loc));
Optional<Error> outcome = wait(tryCommit(
self, self->tenant, self->signedToken, key, newValue, committedVersion, cx, locationAndTenant.second));
if (!outcome.present()) {
++self->crossTenantCommitPositive;
} else if (outcome.get().code() == error_code_permission_denied) {
@ -311,8 +312,14 @@ struct AuthzSecurityWorkload : TestWorkload {
.log();
}
} else {
Optional<Error> outcome = wait(tryCommit(
self, self->tenant, self->signedTokenAnotherTenant, key, newValue, committedVersion, cx, loc));
Optional<Error> outcome = wait(tryCommit(self,
self->tenant,
self->signedTokenAnotherTenant,
key,
newValue,
committedVersion,
cx,
locationAndTenant.second));
if (!outcome.present()) {
TraceEvent(SevError, "AuthzSecurityError")
.detail("Case", "CrossTenantGetDisallowed")

View File

@ -451,8 +451,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
wait(timeoutError(unsafeThreadFutureToFuture(tr->commit()), 30));
} catch (Error& e) {
if (e.code() == error_code_client_invalid_operation ||
e.code() == error_code_transaction_too_large || e.code() == error_code_unknown_tenant ||
e.code() == error_code_invalid_option) {
e.code() == error_code_transaction_too_large || e.code() == error_code_invalid_option) {
throw not_committed();
}
}

View File

@ -105,15 +105,12 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
ACTOR static Future<int64_t> getSize(GetEstimatedRangeSizeWorkload* self, Database cx) {
state ReadYourWritesTransaction tr(cx, self->tenant);
state double totalDelay = 0.0;
TraceEvent(SevDebug, "GetSizeStart")
.detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr);
TraceEvent(SevDebug, "GetSizeStart").detail("Tenant", tr.getTenant());
loop {
try {
state int64_t size = wait(tr.getEstimatedRangeSizeBytes(normalKeys));
TraceEvent(SevDebug, "GetSizeResult")
.detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr)
.detail("Size", size);
TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", tr.getTenant()).detail("Size", size);
if (!sizeIsAsExpected(self, size) && totalDelay < 300.0) {
totalDelay += 5.0;
wait(delay(5.0));
@ -121,9 +118,7 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload {
return size;
}
} catch (Error& e) {
TraceEvent(SevDebug, "GetSizeError")
.errorUnsuppressed(e)
.detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr);
TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", tr.getTenant());
wait(tr.onError(e));
}
}

View File

@ -208,7 +208,6 @@ const std::set<int> transactionRetryableErrors = { error_code_not_committed,
error_code_process_behind,
error_code_batch_transaction_throttled,
error_code_tag_throttled,
error_code_unknown_tenant,
error_code_proxy_tag_throttled,
// maybe committed error
error_code_cluster_version_changed,

View File

@ -405,6 +405,13 @@ struct Traceable<std::string> : TraceableStringImpl<std::string> {};
template <>
struct Traceable<std::string_view> : TraceableStringImpl<std::string_view> {};
template <class T>
struct Traceable<Reference<T>> : std::conditional<Traceable<T>::value, std::true_type, std::false_type>::type {
static std::string toString(const Reference<T>& value) {
return value ? Traceable<T>::toString(*value) : "[not set]";
}
};
template <class T>
struct SpecialTraceMetricType
: std::conditional<std::is_integral<T>::value || std::is_enum<T>::value, std::true_type, std::false_type>::type {

View File

@ -249,7 +249,6 @@ ERROR( tenant_not_empty, 2133, "Cannot delete a non-empty tenant" )
ERROR( invalid_tenant_name, 2134, "Tenant name cannot begin with \\xff" )
ERROR( tenant_prefix_allocator_conflict, 2135, "The database already has keys stored at the prefix allocated for the tenant" )
ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster" )
ERROR( unknown_tenant, 2137, "Tenant is not available from this server" )
ERROR( illegal_tenant_access, 2138, "Illegal tenant access" )
ERROR( invalid_tenant_group_name, 2139, "Tenant group name cannot begin with \\xff" )
ERROR( invalid_tenant_configuration, 2140, "Tenant configuration is invalid" )

View File

@ -1,3 +1,7 @@
[configuration]
# Tenant lookups fail during the atomic restore because they aren't affected by locking
allowDefaultTenant = false
[[knobs]]
rocksdb_read_value_timeout=300.0
rocksdb_read_value_prefix_timeout=300.0

View File

@ -1,5 +1,7 @@
[configuration]
StderrSeverity = 30
# Tenant lookups fail during the atomic restore because they aren't affected by locking
allowDefaultTenant = false
[[test]]
testTitle = 'WriteDuringReadTest'