From f999623bb18550e49a36d86292a927a8a132214b Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 20 Dec 2022 16:30:59 -0800 Subject: [PATCH] Add a tenant lookup interface and use it when starting transactions --- bindings/c/fdb_c.cpp | 3 +- .../sphinx/source/api-error-codes.rst | 2 - fdbclient/ClientKnobs.cpp | 2 - fdbclient/FDBTypes.cpp | 8 +- fdbclient/NativeAPI.actor.cpp | 600 +++++++----------- fdbclient/ServerKnobs.cpp | 1 + fdbclient/include/fdbclient/ClientKnobs.h | 2 - .../include/fdbclient/CommitProxyInterface.h | 43 +- fdbclient/include/fdbclient/DatabaseContext.h | 23 +- fdbclient/include/fdbclient/FDBTypes.h | 2 +- fdbclient/include/fdbclient/NativeAPI.actor.h | 63 +- fdbclient/include/fdbclient/ReadYourWrites.h | 2 +- fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbserver/BlobWorker.actor.cpp | 6 +- fdbserver/CommitProxyServer.actor.cpp | 133 ++-- fdbserver/MockGlobalState.actor.cpp | 9 +- .../include/fdbserver/ProxyCommitData.actor.h | 6 +- fdbserver/storageserver.actor.cpp | 9 +- fdbserver/worker.actor.cpp | 1 + fdbserver/workloads/AuthzSecurity.actor.cpp | 57 +- .../workloads/FuzzApiCorrectness.actor.cpp | 3 +- .../workloads/GetEstimatedRangeSize.actor.cpp | 11 +- flow/Error.cpp | 1 - flow/include/flow/Trace.h | 7 + flow/include/flow/error_definitions.h | 1 - tests/slow/ApiCorrectnessAtomicRestore.toml | 4 + tests/slow/WriteDuringReadAtomicRestore.toml | 2 + 27 files changed, 476 insertions(+), 526 deletions(-) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index 1d4d65ebad..6355e3a5ff 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -82,8 +82,7 @@ extern "C" DLLEXPORT fdb_bool_t fdb_error_predicate(int predicate_test, fdb_erro code == error_code_grv_proxy_memory_limit_exceeded || code == error_code_commit_proxy_memory_limit_exceeded || code == error_code_batch_transaction_throttled || code == error_code_process_behind || - code == error_code_tag_throttled || code == error_code_unknown_tenant || - code == error_code_proxy_tag_throttled; + code == error_code_tag_throttled || code == error_code_proxy_tag_throttled; } return false; } diff --git a/documentation/sphinx/source/api-error-codes.rst b/documentation/sphinx/source/api-error-codes.rst index 7c06dbb6df..b9c72df92e 100644 --- a/documentation/sphinx/source/api-error-codes.rst +++ b/documentation/sphinx/source/api-error-codes.rst @@ -174,8 +174,6 @@ FoundationDB may return the following error codes from API functions. If you nee +-----------------------------------------------+-----+--------------------------------------------------------------------------------+ | tenants_disabled | 2136| Tenants have been disabled in the cluster | +-----------------------------------------------+-----+--------------------------------------------------------------------------------+ -| unknown_tenant | 2137| Tenant is not available from this server | -+-----------------------------------------------+-----+--------------------------------------------------------------------------------+ | api_version_unset | 2200| API version is not set | +-----------------------------------------------+-----+--------------------------------------------------------------------------------+ | api_version_already_set | 2201| API version may be set only once | diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index fb0680fb31..e81805adf7 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -92,8 +92,6 @@ void ClientKnobs::initialize(Randomize randomize) { init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3; init( LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD, 60 ); init( LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL, 60 ); - init( TENANT_CACHE_EVICTION_SIZE, 100000 ); - init( TENANT_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_EVICTION_SIZE_SIM = 3; init( GET_RANGE_SHARD_LIMIT, 2 ); init( WARM_RANGE_SHARD_LIMIT, 100 ); diff --git a/fdbclient/FDBTypes.cpp b/fdbclient/FDBTypes.cpp index 45bce1ed31..376d2a4efc 100644 --- a/fdbclient/FDBTypes.cpp +++ b/fdbclient/FDBTypes.cpp @@ -22,12 +22,12 @@ #include "fdbclient/Knobs.h" #include "fdbclient/NativeAPI.actor.h" -KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, KeyRef prefix) { - if (prefix.empty()) { +KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, Optional prefix) { + if (!prefix.present() || prefix.get().empty()) { return range; } else { - KeyRef begin = range.begin.startsWith(prefix) ? range.begin.removePrefix(prefix) : allKeys.begin; - KeyRef end = range.end.startsWith(prefix) ? range.end.removePrefix(prefix) : allKeys.end; + KeyRef begin = range.begin.startsWith(prefix.get()) ? range.begin.removePrefix(prefix.get()) : allKeys.begin; + KeyRef end = range.end.startsWith(prefix.get()) ? range.end.removePrefix(prefix.get()) : allKeys.end; return KeyRangeRef(begin, end); } } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e831704329..b856541645 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1528,7 +1528,8 @@ DatabaseContext::DatabaseContext(ReferenceisSimulated() ? CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM : CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE; - tenantCacheSize = g_network->isSimulated() ? CLIENT_KNOBS->TENANT_CACHE_EVICTION_SIZE_SIM - : CLIENT_KNOBS->TENANT_CACHE_EVICTION_SIZE; getValueSubmitted.init("NativeAPI.GetValueSubmitted"_sr); getValueCompleted.init("NativeAPI.GetValueCompleted"_sr); @@ -1829,7 +1828,8 @@ DatabaseContext::DatabaseContext(const Error& err) transactionsCommitStarted("CommitStarted", cc), transactionsCommitCompleted("CommitCompleted", cc), transactionKeyServerLocationRequests("KeyServerLocationRequests", cc), transactionKeyServerLocationRequestsCompleted("KeyServerLocationRequestsCompleted", cc), - transactionStatusRequests("StatusRequests", cc), transactionsTooOld("TooOld", cc), + transactionStatusRequests("StatusRequests", cc), transactionTenantLookupRequests("TenantLookupRequests", cc), + transactionTenantLookupRequestsCompleted("TenantLookupRequestsCompleted", cc), transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc), transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc), @@ -1896,52 +1896,37 @@ DatabaseContext::~DatabaseContext() { TraceEvent("DatabaseContextDestructed", dbId).backtrace(); } -Optional DatabaseContext::getCachedLocation(const Optional& tenantName, +Optional DatabaseContext::getCachedLocation(const TenantInfo& tenant, const KeyRef& key, Reverse isBackward) { - TenantMapEntry tenantEntry; Arena arena; KeyRef resolvedKey = key; - if (tenantName.present()) { - auto itr = tenantCache.find(tenantName.get()); - if (itr != tenantCache.end()) { - tenantEntry = itr->second; - resolvedKey = resolvedKey.withPrefix(tenantEntry.prefix, arena); - } else { - return Optional(); - } + if (tenant.hasTenant()) { + resolvedKey = resolvedKey.withPrefix(tenant.prefix.get(), arena); } auto range = isBackward ? locationCache.rangeContainingKeyBefore(resolvedKey) : locationCache.rangeContaining(resolvedKey); if (range->value()) { - return KeyRangeLocationInfo( - tenantEntry, toPrefixRelativeRange(range->range(), tenantEntry.prefix), range->value()); + return KeyRangeLocationInfo(toPrefixRelativeRange(range->range(), tenant.prefix), range->value()); } return Optional(); } -bool DatabaseContext::getCachedLocations(const Optional& tenantName, +bool DatabaseContext::getCachedLocations(const TenantInfo& tenant, const KeyRangeRef& range, std::vector& result, int limit, Reverse reverse) { result.clear(); - TenantMapEntry tenantEntry; Arena arena; KeyRangeRef resolvedRange = range; - if (tenantName.present()) { - auto itr = tenantCache.find(tenantName.get()); - if (itr != tenantCache.end()) { - tenantEntry = itr->second; - resolvedRange = resolvedRange.withPrefix(tenantEntry.prefix, arena); - } else { - return false; - } + if (tenant.hasTenant()) { + resolvedRange = resolvedRange.withPrefix(tenant.prefix.get(), arena); } auto begin = locationCache.rangeContaining(resolvedRange.begin); @@ -1954,8 +1939,7 @@ bool DatabaseContext::getCachedLocations(const Optional& tenantNa result.clear(); return false; } - result.emplace_back( - tenantEntry, toPrefixRelativeRange(r->range() & resolvedRange, tenantEntry.prefix), r->value()); + result.emplace_back(toPrefixRelativeRange(r->range() & resolvedRange, tenant.prefix), r->value()); if (result.size() == limit || begin == end) { break; } @@ -1969,26 +1953,8 @@ bool DatabaseContext::getCachedLocations(const Optional& tenantNa return true; } -void DatabaseContext::cacheTenant(const TenantName& tenant, const TenantMapEntry& tenantEntry) { - if (tenantCacheSize > 0) { - // Naive cache eviction just erases the entire cache when it gets full. - // We don't expect a single client to fill the tenant cache typically, so this should work reasonably well. - if (tenantCache.size() > tenantCacheSize) { - tenantCache.clear(); - } - - tenantCache[tenant] = tenantEntry; - } -} - -Reference DatabaseContext::setCachedLocation(const Optional& tenant, - const TenantMapEntry& tenantEntry, - const KeyRangeRef& absoluteKeys, +Reference DatabaseContext::setCachedLocation(const KeyRangeRef& absoluteKeys, const std::vector& servers) { - if (tenant.present()) { - cacheTenant(tenant.get(), tenantEntry); - } - std::vector>> serverRefs; serverRefs.reserve(servers.size()); for (const auto& interf : servers) { @@ -2008,15 +1974,11 @@ Reference DatabaseContext::setCachedLocation(const Optional& tenantPrefix, const KeyRef& key, Reverse isBackward) { Arena arena; KeyRef resolvedKey = key; - if (!tenantPrefix.empty()) { - resolvedKey = resolvedKey.withPrefix(tenantPrefix, arena); + if (tenantPrefix.present() && !tenantPrefix.get().empty()) { + resolvedKey = resolvedKey.withPrefix(tenantPrefix.get(), arena); } if (isBackward) { @@ -2026,11 +1988,11 @@ void DatabaseContext::invalidateCache(const KeyRef& tenantPrefix, const KeyRef& } } -void DatabaseContext::invalidateCache(const KeyRef& tenantPrefix, const KeyRangeRef& keys) { +void DatabaseContext::invalidateCache(const Optional& tenantPrefix, const KeyRangeRef& keys) { Arena arena; KeyRangeRef resolvedKeys = keys; - if (!tenantPrefix.empty()) { - resolvedKeys = resolvedKeys.withPrefix(tenantPrefix, arena); + if (tenantPrefix.present() && !tenantPrefix.get().empty()) { + resolvedKeys = resolvedKeys.withPrefix(tenantPrefix.get(), arena); } auto rs = locationCache.intersectingRanges(resolvedKeys); @@ -2191,8 +2153,7 @@ ACTOR static Future switchConnectionRecordImpl(ReferencecommitProxies.clear(); self->grvProxies.clear(); self->minAcceptableReadVersion = std::numeric_limits::max(); - self->tenantCache.clear(); - self->invalidateCache(Key(), allKeys); + self->invalidateCache({}, allKeys); self->ssVersionVectorCache.clear(); @@ -2928,42 +2889,29 @@ ACTOR Future getKeyLocation_internal(Database cx, if (debugID.present()) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.Before"); - try { - loop { - ++cx->transactionKeyServerLocationRequests; - choose { - when(wait(cx->onProxiesChanged())) {} - when(GetKeyServerLocationsReply rep = wait(basicLoadBalance( - cx->getCommitProxies(useProvisionalProxies), - &CommitProxyInterface::getKeyServersLocations, - GetKeyServerLocationsRequest( - span.context, tenant, key, Optional(), 100, isBackward, version, key.arena()), - TaskPriority::DefaultPromiseEndpoint))) { - ++cx->transactionKeyServerLocationRequestsCompleted; - if (debugID.present()) - g_traceBatch.addEvent( - "TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.After"); - ASSERT(rep.results.size() == 1); + loop { + ++cx->transactionKeyServerLocationRequests; + choose { + when(wait(cx->onProxiesChanged())) {} + when(GetKeyServerLocationsReply rep = wait(basicLoadBalance( + cx->getCommitProxies(useProvisionalProxies), + &CommitProxyInterface::getKeyServersLocations, + GetKeyServerLocationsRequest( + span.context, tenant, key, Optional(), 100, isBackward, version, key.arena()), + TaskPriority::DefaultPromiseEndpoint))) { + ++cx->transactionKeyServerLocationRequestsCompleted; + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.After"); + ASSERT(rep.results.size() == 1); - auto locationInfo = cx->setCachedLocation( - tenant.name, rep.tenantEntry, rep.results[0].first, rep.results[0].second); - updateTssMappings(cx, rep); - updateTagMappings(cx, rep); + auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second); + updateTssMappings(cx, rep); + updateTagMappings(cx, rep); - return KeyRangeLocationInfo( - rep.tenantEntry, - KeyRange(toPrefixRelativeRange(rep.results[0].first, rep.tenantEntry.prefix), rep.arena), - locationInfo); - } + return KeyRangeLocationInfo( + KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena), locationInfo); } } - } catch (Error& e) { - if (e.code() == error_code_tenant_not_found) { - ASSERT(tenant.name.present()); - cx->invalidateCachedTenant(tenant.name.get()); - } - - throw; } } @@ -3007,7 +2955,7 @@ Future getKeyLocation(Database const& cx, Reverse isBackward, Version version) { // we first check whether this range is cached - Optional locationInfo = cx->getCachedLocation(tenant.name, key, isBackward); + Optional locationInfo = cx->getCachedLocation(tenant, key, isBackward); if (!locationInfo.present()) { return getKeyLocation_internal( cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version); @@ -3021,7 +2969,7 @@ Future getKeyLocation(Database const& cx, } if (onlyEndpointFailedAndNeedRefresh) { - cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key); + cx->invalidateCache(tenant.prefix, key); // Refresh the cache with a new getKeyLocations made to proxies. return getKeyLocation_internal( @@ -3038,24 +2986,23 @@ Future getKeyLocation(Reference trState, Reverse isBackward, UseTenant useTenant, Version version) { - auto f = getKeyLocation(trState->cx, - useTenant ? trState->getTenantInfo(AllowInvalidTenantID::True) : TenantInfo(), - key, - member, - trState->spanContext, - trState->readOptions.present() ? trState->readOptions.get().debugID : Optional(), - trState->useProvisionalProxies, - isBackward, - version); - - if (trState->tenant().present() && useTenant && trState->tenantId() == TenantInfo::INVALID_TENANT) { - return map(f, [trState](const KeyRangeLocationInfo& locationInfo) { - trState->trySetTenantId(locationInfo.tenantEntry.id); - return locationInfo; - }); - } else { - return f; + // This is temporary while we restructure how transactions wait for tenant IDs + if (useTenant && trState->tenant().present() && !trState->tenant().get()->idFuture.isReady()) { + return mapAsync(trState->tenant().get()->idFuture, + [trState, key, member, isBackward, useTenant, version](int64_t tenantId) { + return getKeyLocation(trState, key, member, isBackward, useTenant, version); + }); } + + return getKeyLocation(trState->cx, + useTenant ? trState->getTenantInfo() : TenantInfo(), + key, + member, + trState->spanContext, + trState->readOptions.present() ? trState->readOptions.get().debugID : Optional(), + trState->useProvisionalProxies, + isBackward, + version); } ACTOR Future> getKeyRangeLocations_internal( @@ -3072,50 +3019,37 @@ ACTOR Future> getKeyRangeLocations_internal( if (debugID.present()) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before"); - try { - loop { - ++cx->transactionKeyServerLocationRequests; - choose { - when(wait(cx->onProxiesChanged())) {} - when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance( - cx->getCommitProxies(useProvisionalProxies), - &CommitProxyInterface::getKeyServersLocations, - GetKeyServerLocationsRequest( - span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()), - TaskPriority::DefaultPromiseEndpoint))) { - ++cx->transactionKeyServerLocationRequestsCompleted; - state GetKeyServerLocationsReply rep = _rep; - if (debugID.present()) - g_traceBatch.addEvent( - "TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After"); - ASSERT(rep.results.size()); + loop { + ++cx->transactionKeyServerLocationRequests; + choose { + when(wait(cx->onProxiesChanged())) {} + when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance( + cx->getCommitProxies(useProvisionalProxies), + &CommitProxyInterface::getKeyServersLocations, + GetKeyServerLocationsRequest( + span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()), + TaskPriority::DefaultPromiseEndpoint))) { + ++cx->transactionKeyServerLocationRequestsCompleted; + state GetKeyServerLocationsReply rep = _rep; + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After"); + ASSERT(rep.results.size()); - state std::vector results; - state int shard = 0; - for (; shard < rep.results.size(); shard++) { - // FIXME: these shards are being inserted into the map sequentially, it would be much more CPU - // efficient to save the map pairs and insert them all at once. - results.emplace_back( - rep.tenantEntry, - (toPrefixRelativeRange(rep.results[shard].first, rep.tenantEntry.prefix) & keys), - cx->setCachedLocation( - tenant.name, rep.tenantEntry, rep.results[shard].first, rep.results[shard].second)); - wait(yield()); - } - updateTssMappings(cx, rep); - updateTagMappings(cx, rep); - - return results; + state std::vector results; + state int shard = 0; + for (; shard < rep.results.size(); shard++) { + // FIXME: these shards are being inserted into the map sequentially, it would be much more CPU + // efficient to save the map pairs and insert them all at once. + results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys), + cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second)); + wait(yield()); } + updateTssMappings(cx, rep); + updateTagMappings(cx, rep); + + return results; } } - } catch (Error& e) { - if (e.code() == error_code_tenant_not_found) { - ASSERT(tenant.name.present()); - cx->invalidateCachedTenant(tenant.name.get()); - } - - throw; } } @@ -3140,7 +3074,7 @@ Future> getKeyRangeLocations(Database const& c ASSERT(!keys.empty()); std::vector locations; - if (!cx->getCachedLocations(tenant.name, keys, locations, limit, reverse)) { + if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) { return getKeyRangeLocations_internal( cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version); } @@ -3155,7 +3089,7 @@ Future> getKeyRangeLocations(Database const& c } if (onlyEndpointFailedAndNeedRefresh) { - cx->invalidateCache(locationInfo.tenantEntry.prefix, locationInfo.range.begin); + cx->invalidateCache(tenant.prefix, locationInfo.range.begin); foundFailed = true; } } @@ -3177,26 +3111,24 @@ Future> getKeyRangeLocations(Referencecx, - useTenant ? trState->getTenantInfo(AllowInvalidTenantID::True) : TenantInfo(), - keys, - limit, - reverse, - member, - trState->spanContext, - trState->readOptions.present() ? trState->readOptions.get().debugID : Optional(), - trState->useProvisionalProxies, - version); - - if (trState->tenant().present() && useTenant && trState->tenantId() == TenantInfo::INVALID_TENANT) { - return map(f, [trState](const std::vector& locationInfo) { - ASSERT(!locationInfo.empty()); - trState->trySetTenantId(locationInfo[0].tenantEntry.id); - return locationInfo; - }); - } else { - return f; + // This is temporary while we restructure how transactions wait for tenant IDs + if (useTenant && trState->tenant().present() && !trState->tenant().get()->idFuture.isReady()) { + return mapAsync(trState->tenant().get()->idFuture, + [trState, keys, limit, reverse, member, useTenant, version](int64_t tenantId) { + return getKeyRangeLocations(trState, keys, limit, reverse, member, useTenant, version); + }); } + + return getKeyRangeLocations(trState->cx, + useTenant ? trState->getTenantInfo(AllowInvalidTenantID::True) : TenantInfo(), + keys, + limit, + reverse, + member, + trState->spanContext, + trState->readOptions.present() ? trState->readOptions.get().debugID : Optional(), + trState->useProvisionalProxies, + version); } ACTOR Future warmRange_impl(Reference trState, KeyRange keys, Future fVersion) { @@ -3226,7 +3158,7 @@ ACTOR Future warmRange_impl(Reference trState, KeyRange if (totalRequests % 20 == 0) { // To avoid blocking the proxies from starting other transactions, occasionally get a read version. - state Transaction tr(trState->cx, trState->tenant()); + state Transaction tr(trState->cx, trState->tenant().flatMapRef(&Tenant::name)); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -3258,22 +3190,43 @@ SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentCont deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64(), TraceFlags::unsampled); } +ACTOR Future lookupTenant(Database cx, TenantName tenant, SpanContext spanContext) { + state Span span("NAPI:lookupTenant"_loc, spanContext); + loop { + ++cx->transactionTenantLookupRequests; + choose { + when(wait(cx->onProxiesChanged())) {} + when(GetTenantIdReply rep = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), + &CommitProxyInterface::getTenantId, + GetTenantIdRequest(span.context, tenant, latestVersion), + TaskPriority::DefaultPromiseEndpoint))) { + ++cx->transactionTenantLookupRequestsCompleted; + return rep.tenantId; + } + } + } +} + FDB_DEFINE_BOOLEAN_PARAM(AllowInvalidTenantID); TransactionState::TransactionState(Database cx, - Optional tenant, + Optional tenantName, TaskPriority taskID, SpanContext spanContext, Reference trLogInfo) : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanContext(spanContext), - readVersionObtainedFromGrvProxy(true), tenant_(tenant), tenantSet(tenant.present()) {} + readVersionObtainedFromGrvProxy(true), tenantSet(tenantName.present()) { + if (tenantName.present()) { + tenant_ = makeReference(lookupTenant(cx, tenantName.get(), spanContext), tenantName.get()); + } +} Reference TransactionState::cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const { SpanContext newSpanContext = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanContext; - Reference newState = - makeReference(cx, tenant_, cx->taskID, newSpanContext, newTrLogInfo); + Reference newState = makeReference( + cx, tenant_.flatMapRef(&Tenant::name), cx->taskID, newSpanContext, newTrLogInfo); if (!cx->apiVersionAtLeast(16)) { newState->options = options; @@ -3289,8 +3242,8 @@ Reference TransactionState::cloneAndReset(Reference const& t = tenant(); +TenantInfo TransactionState::getTenantInfo(AllowInvalidTenantID allowInvalidTenantId /* = false */) { + Optional> const& t = tenant(); if (options.rawAccess) { return TenantInfo(); @@ -3311,8 +3264,11 @@ TenantInfo TransactionState::getTenantInfo(AllowInvalidTenantID allowInvalidId / } } - ASSERT(allowInvalidId || tenantId_ != TenantInfo::INVALID_TENANT); - return TenantInfo(t, authToken, tenantId_); + ASSERT(t.present() && (allowInvalidTenantId || t.get()->id() != TenantInfo::INVALID_TENANT)); + return TenantInfo(t.get()->name, + authToken, + (allowInvalidTenantId && !t.get()->idFuture.isReady()) ? TenantInfo::INVALID_TENANT + : t.get()->id()); } // Returns the tenant used in this transaction. If the tenant is unset and raw access isn't specified, then the default @@ -3321,12 +3277,12 @@ TenantInfo TransactionState::getTenantInfo(AllowInvalidTenantID allowInvalidId / // // This function should not be called in the transaction constructor or in the setOption function to allow a user the // opportunity to set raw access. -Optional const& TransactionState::tenant() { +Optional> const& TransactionState::tenant() { if (tenantSet) { return tenant_; } else { - if (!tenant_.present() && !options.rawAccess) { - tenant_ = cx->defaultTenant; + if (!tenant_.present() && !options.rawAccess && cx->defaultTenant.present()) { + tenant_ = makeReference(lookupTenant(cx, cx->defaultTenant.get(), spanContext), cx->defaultTenant); } tenantSet = true; return tenant_; @@ -3340,13 +3296,6 @@ bool TransactionState::hasTenant() const { return tenantSet && tenant_.present(); } -Future TransactionState::handleUnknownTenant() { - tenantId_ = TenantInfo::INVALID_TENANT; - ASSERT(tenant().present()); - cx->invalidateCachedTenant(tenant().get()); - return delay(CLIENT_KNOBS->UNKNOWN_TENANT_RETRY_DELAY, taskID); -} - Future Transaction::warmRange(KeyRange keys) { return warmRange_impl(trState, keys, getReadVersion()); } @@ -3359,7 +3308,8 @@ ACTOR Future> getValue(Reference trState, state Version ver = wait(version); state Span span("NAPI:getValue"_loc, trState->spanContext); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, + trState->tenant().get()->name.castTo().orDefault(""_sr)); } trState->cx->validateVersion(ver); @@ -3434,8 +3384,12 @@ ACTOR Future> getValue(Reference trState, trState->cx->readLatencies.addSample(latency); if (trState->trLogInfo && recordLogInfo) { int valueSize = reply.value.present() ? reply.value.get().size() : 0; - trState->trLogInfo->addLog(FdbClientLogEvents::EventGet( - startTimeD, trState->cx->clientLocality.dcId(), latency, valueSize, key, trState->tenant())); + trState->trLogInfo->addLog(FdbClientLogEvents::EventGet(startTimeD, + trState->cx->clientLocality.dcId(), + latency, + valueSize, + key, + trState->tenant().flatMapRef(&Tenant::name))); } trState->cx->getValueCompleted->latency = timer_int() - startTime; trState->cx->getValueCompleted->log(); @@ -3469,18 +3423,17 @@ ACTOR Future> getValue(Reference trState, } if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || (e.code() == error_code_transaction_too_old && ver == latestVersion)) { - trState->cx->invalidateCache(locationInfo.tenantEntry.prefix, key); + trState->cx->invalidateCache(useTenant ? trState->tenant().mapRef(&Tenant::prefix) : Optional(), + key); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); - } else if (e.code() == error_code_unknown_tenant) { - ASSERT(useTenant); - wait(trState->handleUnknownTenant()); } else { if (trState->trLogInfo && recordLogInfo) - trState->trLogInfo->addLog(FdbClientLogEvents::EventGetError(startTimeD, - trState->cx->clientLocality.dcId(), - static_cast(e.code()), - key, - trState->tenant())); + trState->trLogInfo->addLog( + FdbClientLogEvents::EventGetError(startTimeD, + trState->cx->clientLocality.dcId(), + static_cast(e.code()), + key, + trState->tenant().flatMapRef(&Tenant::name))); throw e; } } @@ -3584,12 +3537,11 @@ ACTOR Future getKey(Reference trState, if (getKeyID.present()) g_traceBatch.addEvent("GetKeyDebug", getKeyID.get().first(), "NativeAPI.getKey.Error"); if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - trState->cx->invalidateCache(locationInfo.tenantEntry.prefix, k.getKey(), Reverse{ k.isBackward() }); + trState->cx->invalidateCache(useTenant ? trState->tenant().mapRef(&Tenant::prefix) : Optional(), + k.getKey(), + Reverse{ k.isBackward() }); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); - } else if (e.code() == error_code_unknown_tenant) { - ASSERT(useTenant); - wait(trState->handleUnknownTenant()); } else { TraceEvent(SevInfo, "GetKeyError").error(e).detail("AtKey", k.getKey()).detail("Offset", k.offset); throw e; @@ -3687,10 +3639,6 @@ ACTOR Future watchValue(Database cx, Reference p parameters->useProvisionalProxies, Reverse::False, parameters->version)); - if (parameters->tenant.tenantId != locationInfo.tenantEntry.id) { - throw tenant_not_found(); - } - try { state Optional watchValueID = Optional(); if (parameters->debugID.present()) { @@ -3739,12 +3687,8 @@ ACTOR Future watchValue(Database cx, Reference p ver = v; } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - cx->invalidateCache(locationInfo.tenantEntry.prefix, parameters->key); + cx->invalidateCache(parameters->tenant.prefix, parameters->key); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, parameters->taskID)); - } else if (e.code() == error_code_unknown_tenant) { - ASSERT(parameters->tenant.name.present()); - cx->invalidateCachedTenant(parameters->tenant.name.get()); - wait(delay(CLIENT_KNOBS->UNKNOWN_TENANT_RETRY_DELAY, parameters->taskID)); } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { // clang-format off CODE_PROBE(e.code() == error_code_watch_cancelled, "Too many watches on the storage server, poll for changes instead"); @@ -4022,7 +3966,8 @@ Future getExactRange(Reference trState, state Span span("NAPI:getExactRange"_loc, trState->spanContext); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, + trState->tenant().get()->name.castTo().orDefault(""_sr)); } // printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); @@ -4187,14 +4132,11 @@ Future getExactRange(Reference trState, else keys = KeyRangeRef(range.begin, keys.end); - trState->cx->invalidateCache(locations[0].tenantEntry.prefix, keys); + trState->cx->invalidateCache( + useTenant ? trState->tenant().mapRef(&Tenant::prefix) : Optional(), keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); break; - } else if (e.code() == error_code_unknown_tenant) { - ASSERT(useTenant); - wait(trState->handleUnknownTenant()); - break; } else { TraceEvent(SevInfo, "GetExactRangeError") .error(e) @@ -4330,7 +4272,7 @@ void getRangeFinished(Reference trState, bytes, begin.getKey(), end.getKey(), - trState->tenant())); + trState->tenant().flatMapRef(&Tenant::name))); } if (!snapshot) { @@ -4390,7 +4332,8 @@ Future getRange(Reference trState, state Span span("NAPI:getRange"_loc, trState->spanContext); state Optional getRangeID = Optional(); if (useTenant && trState->tenant().present()) { - span.addAttribute("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, + trState->tenant().get()->name.castTo().orDefault(""_sr)); } try { @@ -4644,7 +4587,8 @@ Future getRange(Reference trState, } if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || (e.code() == error_code_transaction_too_old && readVersion == latestVersion)) { - trState->cx->invalidateCache(beginServer.tenantEntry.prefix, + trState->cx->invalidateCache(useTenant ? trState->tenant().mapRef(&Tenant::prefix) + : Optional(), reverse ? end.getKey() : begin.getKey(), Reverse{ reverse ? (end - 1).isBackward() : begin.isBackward() }); @@ -4666,9 +4610,6 @@ Future getRange(Reference trState, } wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); - } else if (e.code() == error_code_unknown_tenant) { - ASSERT(useTenant); - wait(trState->handleUnknownTenant()); } else { if (trState->trLogInfo) trState->trLogInfo->addLog( @@ -4677,7 +4618,7 @@ Future getRange(Reference trState, static_cast(e.code()), begin.getKey(), end.getKey(), - trState->tenant())); + trState->tenant().flatMapRef(&Tenant::name))); throw e; } @@ -5124,13 +5065,10 @@ ACTOR Future getRangeStreamFragment(Reference trState, else keys = KeyRangeRef(range.begin, keys.end); - trState->cx->invalidateCache(locations[0].tenantEntry.prefix, keys); + trState->cx->invalidateCache(trState->tenant().mapRef(&Tenant::prefix), keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); break; - } else if (e.code() == error_code_unknown_tenant) { - wait(trState->handleUnknownTenant()); - break; } else { results->sendError(e); return Void(); @@ -5403,19 +5341,18 @@ void Watch::setWatch(Future watchFuture) { onSetWatchTrigger.send(Void()); } -ACTOR Future getTenantMetadata(Reference trState, Key key, Version version) { - KeyRangeLocationInfo locationInfo = - wait(getKeyLocation(trState, key, &StorageServerInterface::getValue, Reverse::False, UseTenant::True, version)); +ACTOR Future getTenantMetadata(Reference trState) { + wait(success(trState->tenant().get()->idFuture)); return trState->getTenantInfo(); } Future populateAndGetTenant(Reference trState, Key const& key, Version version) { if (!trState->tenant().present() || key == metadataVersionKey) { return TenantInfo(); - } else if (trState->tenantId() != TenantInfo::INVALID_TENANT) { + } else if (trState->tenant().get()->idFuture.isReady()) { return trState->getTenantInfo(); } else { - return getTenantMetadata(trState, key, version); + return getTenantMetadata(trState); } } @@ -5433,20 +5370,6 @@ ACTOR Future restartWatch(Database cx, // The tenantId should be the old one. cx->deleteWatchMetadata(tenantInfo.tenantId, key, /* removeReferenceCount */ true); - // The ID of the tenant may be different on the cluster that we switched to, so obtain the new ID - if (tenantInfo.name.present()) { - state KeyRangeLocationInfo locationInfo = wait(getKeyLocation(cx, - tenantInfo, - key, - &StorageServerInterface::watchValue, - spanContext, - debugID, - useProvisionalProxies, - Reverse::False, - latestVersion)); - tenantInfo.tenantId = locationInfo.tenantEntry.id; - } - wait(watchValueMap(cx->minAcceptableReadVersion, tenantInfo, key, @@ -5543,7 +5466,7 @@ ACTOR Future>> getAddressesForKeyActor(Referen state Version version = wait(ver); KeyRangeLocationInfo locationInfo = wait(getKeyLocation( trState, ""_sr, &StorageServerInterface::getValue, Reverse::False, UseTenant::True, version)); - resolvedKey = key.withPrefix(locationInfo.tenantEntry.prefix); + resolvedKey = key.withPrefix(trState->tenant().get()->prefix()); } // If key >= allKeys.end, then getRange will return a kv-pair with an empty value. This will result in our @@ -6152,7 +6075,7 @@ FDB_BOOLEAN_PARAM(TenantPrefixPrepended); ACTOR static Future commitDummyTransaction(Reference trState, KeyRange range, TenantPrefixPrepended tenantPrefixPrepended) { - state Transaction tr(trState->cx, trState->tenant()); + state Transaction tr(trState->cx, trState->tenant().flatMapRef(&Tenant::name)); state int retries = 0; state Span span("NAPI:dummyTransaction"_loc, trState->spanContext); tr.span.setParent(span.context); @@ -6162,10 +6085,11 @@ ACTOR static Future commitDummyTransaction(Reference trS tr.trState->options = trState->options; tr.trState->taskID = trState->taskID; tr.trState->authToken = trState->authToken; - tr.trState->trySetTenantId(trState->tenantId()); if (!trState->hasTenant()) { tr.setOption(FDBTransactionOptions::RAW_ACCESS); } else { + int64_t tenantId = wait(tr.trState->tenant().get()->idFuture); + tr.trState->tenant().get()->idFuture = tenantId; tr.trState->skipApplyTenantPrefix = tenantPrefixPrepended; CODE_PROBE(true, "Commit of a dummy transaction in tenant keyspace"); } @@ -6177,7 +6101,7 @@ ACTOR static Future commitDummyTransaction(Reference trS return Void(); } catch (Error& e) { // If the tenant is gone, then our original transaction won't be able to commit - if (e.code() == error_code_unknown_tenant) { + if (e.code() == error_code_tenant_not_found) { return Void(); } TraceEvent("CommitDummyTransactionError") @@ -6448,9 +6372,9 @@ ACTOR static Future tryCommit(Reference trState, Reverse::False, UseTenant::True, req.transaction.read_snapshot)); - applyTenantPrefix(req, locationInfo.tenantEntry.prefix); + applyTenantPrefix(req, trState->tenant().get()->prefix()); tenantPrefixPrepended = TenantPrefixPrepended::True; - tenantPrefix = locationInfo.tenantEntry.prefix; + tenantPrefix = trState->tenant().get()->prefix(); } CODE_PROBE(trState->skipApplyTenantPrefix, "Tenant prefix prepend skipped for dummy transaction"); req.tenantInfo = trState->getTenantInfo(); @@ -6536,7 +6460,7 @@ ACTOR static Future tryCommit(Reference trState, req.transaction.mutations.expectedSize(), ci.version, req, - trState->tenant())); + trState->tenant().flatMapRef(&Tenant::name))); if (trState->automaticIdempotency && alternativeChosen >= 0) { // Automatic idempotency means we're responsible for best effort idempotency id clean up proxiesUsed->getInterface(alternativeChosen) @@ -6618,12 +6542,6 @@ ACTOR static Future tryCommit(Reference trState, // The user needs to be informed that we aren't sure whether the commit happened. Standard retry loops // retry it anyway (relying on transaction idempotence) but a client might do something else. throw commit_unknown_result(); - } else if (e.code() == error_code_unknown_tenant) { - // Rather than reset the tenant and retry just the commit, we need to throw this error to the user and let - // them retry the whole transaction - ASSERT(trState->tenant().present()); - trState->cx->invalidateCachedTenant(trState->tenant().get()); - throw; } else { if (e.code() != error_code_transaction_too_old && e.code() != error_code_not_committed && e.code() != error_code_database_locked && e.code() != error_code_commit_proxy_memory_limit_exceeded && @@ -6635,8 +6553,12 @@ ACTOR static Future tryCommit(Reference trState, TraceEvent(SevError, "TryCommitError").error(e); } if (trState->trLogInfo) - trState->trLogInfo->addLog(FdbClientLogEvents::EventCommitError( - startTime, trState->cx->clientLocality.dcId(), static_cast(e.code()), req, trState->tenant())); + trState->trLogInfo->addLog( + FdbClientLogEvents::EventCommitError(startTime, + trState->cx->clientLocality.dcId(), + static_cast(e.code()), + req, + trState->tenant().flatMapRef(&Tenant::name))); throw; } } @@ -7253,7 +7175,7 @@ ACTOR Future extractReadVersion(Reference trState, latency, trState->options.priority, rep.version, - trState->tenant())); + trState->tenant().flatMapRef(&Tenant::name))); if (rep.locked && !trState->options.lockAware) throw database_locked(); @@ -7603,11 +7525,6 @@ Future Transaction::onError(Error const& e) { reset(); return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), trState->taskID); } - if (e.code() == error_code_unknown_tenant) { - double maxBackoff = trState->options.maxBackoff; - reset(); - return delay(std::min(CLIENT_KNOBS->UNKNOWN_TENANT_RETRY_DELAY, maxBackoff), trState->taskID); - } return e; } @@ -7619,7 +7536,6 @@ ACTOR Future doGetStorageMetrics(Database cx, TenantInfo tenantInfo, KeyRange keys, Reference locationInfo, - TenantMapEntry tenantEntry, Optional> trState) { try { WaitMetricsRequest req(tenantInfo, keys, StorageMetrics(), StorageMetrics()); @@ -7630,15 +7546,13 @@ ACTOR Future doGetStorageMetrics(Database cx, return m; } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - cx->invalidateCache(tenantEntry.prefix, keys); + cx->invalidateCache(tenantInfo.prefix, keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); - } else if (e.code() == error_code_unknown_tenant && trState.present() && - tenantInfo.tenantId != TenantInfo::INVALID_TENANT) { - wait(trState.get()->handleUnknownTenant()); } else if (e.code() == error_code_future_version) { wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, TaskPriority::DataDistribution)); } else { - TraceEvent(SevError, "WaitStorageMetricsError").error(e); + bool ok = e.code() == error_code_tenant_not_found; + TraceEvent(ok ? SevInfo : SevError, "WaitStorageMetricsError").error(e); throw; } @@ -7670,8 +7584,7 @@ ACTOR Future getStorageMetricsLargeKeyRange(Database cx, for (int i = 0; i < nLocs; i++) { partBegin = (i == 0) ? keys.begin : locations[i].range.begin; partEnd = (i == nLocs - 1) ? keys.end : locations[i].range.end; - fx[i] = doGetStorageMetrics( - cx, tenantInfo, KeyRangeRef(partBegin, partEnd), locations[i].locations, locations[i].tenantEntry, trState); + fx[i] = doGetStorageMetrics(cx, tenantInfo, KeyRangeRef(partBegin, partEnd), locations[i].locations, trState); } wait(waitForAll(fx)); for (int i = 0; i < nLocs; i++) { @@ -7814,7 +7727,7 @@ ACTOR Future>> getReadHotRanges(Da TraceEvent(SevError, "GetReadHotSubRangesError").error(e); throw; } - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } } @@ -7878,7 +7791,7 @@ ACTOR Future, int>> waitStorageMetrics( .detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY); wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskPriority::DataDistribution)); // make sure that the next getKeyRangeLocations() call will actually re-fetch the range - cx->invalidateCache(locations[0].tenantEntry.prefix, keys); + cx->invalidateCache(tenantInfo.prefix, keys); continue; } @@ -7891,15 +7804,13 @@ ACTOR Future, int>> waitStorageMetrics( } catch (Error& e) { TraceEvent(SevDebug, "WaitStorageMetricsError").error(e); if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - cx->invalidateCache(locations[0].tenantEntry.prefix, keys); + cx->invalidateCache(tenantInfo.prefix, keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); - } else if (e.code() == error_code_unknown_tenant && trState.present() && - tenantInfo.tenantId != TenantInfo::INVALID_TENANT) { - wait(trState.get()->handleUnknownTenant()); } else if (e.code() == error_code_future_version) { wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, TaskPriority::DataDistribution)); } else { - TraceEvent(SevError, "WaitStorageMetricsError").error(e); + bool ok = e.code() == error_code_tenant_not_found; + TraceEvent(ok ? SevInfo : SevError, "WaitStorageMetricsError").error(e); throw; } } @@ -8017,10 +7928,8 @@ ACTOR Future>> getRangeSplitPoints(Referencecx->invalidateCache(locations[0].tenantEntry.prefix, keys); + trState->cx->invalidateCache(trState->tenant().mapRef(&Tenant::prefix), keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); - } else if (e.code() == error_code_unknown_tenant) { - wait(trState->handleUnknownTenant()); } else { TraceEvent(SevError, "GetRangeSplitPoints").error(e); throw; @@ -8029,64 +7938,6 @@ ACTOR Future>> getRangeSplitPoints(Reference blobGranuleGetTenantEntry(Transaction* self, - Key rangeStartKey, - Optional tenantName) { - ASSERT(tenantName.present() || self->getTenant().present()); - TenantName tName = tenantName.present() ? tenantName.get() : self->getTenant().get(); - state TenantMapEntry tme; - - Optional cachedLocationInfo = - self->trState->cx->getCachedLocation(tName, rangeStartKey, Reverse::False); - if (!cachedLocationInfo.present()) { - // If we're passing in a tenant, use that and do not touch the transaction. - TenantInfo tInfo; - if (tenantName.present()) { - tInfo = TenantInfo(tName, {}, TenantInfo::INVALID_TENANT); - } else { - tInfo = self->trState->getTenantInfo(AllowInvalidTenantID::True); - } - KeyRangeLocationInfo l = wait(getKeyLocation_internal( - self->trState->cx, - tInfo, - rangeStartKey, - self->trState->spanContext, - self->trState->readOptions.present() ? self->trState->readOptions.get().debugID : Optional(), - self->trState->useProvisionalProxies, - Reverse::False, - latestVersion)); - tme = l.tenantEntry; - } else { - tme = cachedLocationInfo.get().tenantEntry; - } - - if (tme.id == TenantInfo::INVALID_TENANT) { - throw tenant_not_found(); - } - - // Modify transaction if desired. - if (!tenantName.present()) { - self->trState->trySetTenantId(tme.id); - } - return tme; -} - -// Tenant's are supposed to be unique and therefore can be loaded once. -// There is an assumption that a tenant exists as long as operations are happening against said tenant. -ACTOR Future blobLoadTenantMapEntry(Database* db, Key rangeStartKey, Optional tenantName) { - state Transaction tr(*db); - - loop { - try { - TenantMapEntry tme = wait(blobGranuleGetTenantEntry(&tr, rangeStartKey, tenantName)); - return tme; - } catch (Error& e) { - wait(tr.onError(e)); - } - } -} - Future>> Transaction::getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize) { return ::getRangeSplitPoints( trState, keys, chunkSize, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion); @@ -8108,9 +7959,8 @@ ACTOR Future>> getBlobGranuleRangesActor(Trans } if (self->getTenant().present()) { - // have to bypass tenant to read system key space, and add tenant prefix to part of mapping - TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, currentRange.begin, {})); - tenantPrefix = tenantEntry.prefix; + int64_t tenantId = wait(self->getTenant().get()->idFuture); + tenantPrefix = TenantAPI::idToPrefix(tenantId); } else { self->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); } @@ -8207,14 +8057,15 @@ ACTOR Future>> readBlobGranulesActor( range.begin.printable(), range.end.printable(), rv, - self->getTenant().present() ? " for tenant " + self->getTenant().get().printable() : ""); + self->getTenant().present() ? " for tenant " + printable(self->getTenant().get()->description()) + : ""); } if (self->getTenant().present()) { // have to bypass tenant to read system key space, and add tenant prefix to part of mapping - TenantMapEntry tenantEntry = wait(blobGranuleGetTenantEntry(self, range.begin, {})); - tenantPrefix = tenantEntry.prefix; - Standalone mappingPrefix = tenantEntry.prefix.withPrefix(blobGranuleMappingKeys.begin); + int64_t tenantId = wait(self->getTenant().get()->idFuture); + tenantPrefix = TenantAPI::idToPrefix(tenantId); + Standalone mappingPrefix = tenantPrefix.get().withPrefix(blobGranuleMappingKeys.begin); // basically krmGetRange, but enable it to not use tenant without RAW_ACCESS by doing manual getRange with // UseTenant::False @@ -8578,8 +8429,8 @@ ACTOR Future verifyBlobRangeActor(Reference cx, } if (tenantName.present()) { - TenantMapEntry tme = wait(blobLoadTenantMapEntry(&db, range.begin, tenantName)); - range = range.withPrefix(tme.prefix); + int64_t tenantId = wait(lookupTenant(db, tenantName.get(), SpanContext())); + range = range.withPrefix(TenantAPI::idToPrefix(tenantId)); curRegion = KeyRangeRef(range.begin, range.begin); } @@ -8762,7 +8613,7 @@ ACTOR Future splitStorageMetricsStream(PromiseStream resultStream, resultStream.sendError(e); throw; } - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } } @@ -8851,7 +8702,7 @@ ACTOR Future>> splitStorageMetrics(Database cx, // solution to this. if (locations.size() == CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT) { wait(delay(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskPriority::DataDistribution)); - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); continue; } @@ -8862,7 +8713,7 @@ ACTOR Future>> splitStorageMetrics(Database cx, return results.get(); } - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } } @@ -9087,7 +8938,7 @@ ACTOR static Future> getCheckpointMetaDataForRan choose { when(wait(cx->connectionFileChanged())) { - cx->invalidateCache(KeyRef(), range); + cx->invalidateCache({}, range); } when(wait(waitForAll(futures))) { break; @@ -9100,7 +8951,7 @@ ACTOR static Future> getCheckpointMetaDataForRan TraceEvent(SevWarn, "GetCheckpointError").errorUnsuppressed(e).detail("Range", range); if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || e.code() == error_code_connection_failed || e.code() == error_code_broken_promise) { - cx->invalidateCache(KeyRef(), range); + cx->invalidateCache({}, range); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); } else { throw; @@ -10479,7 +10330,7 @@ ACTOR Future getChangeFeedStreamActor(Reference db, e.code() == error_code_storage_too_many_feed_streams) { ++db->feedErrors; db->changeFeedCache.erase(rangeID); - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); if (begin == lastBeginVersion || e.code() == error_code_storage_too_many_feed_streams) { // We didn't read anything since the last failure before failing again. // Back off quickly and exponentially, up to 1 second @@ -10618,7 +10469,7 @@ ACTOR Future getOverlappingChangeFeedsActor(Referenc } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || e.code() == error_code_future_version) { - cx->invalidateCache(Key(), range); + cx->invalidateCache({}, range); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); } else { throw e; @@ -10741,7 +10592,7 @@ ACTOR Future popChangeFeedMutationsActor(Reference db, Ke throw; } db->changeFeedCache.erase(rangeID); - cx->invalidateCache(Key(), keys); + cx->invalidateCache({}, keys); wait(popChangeFeedBackup(cx, rangeID, version)); } return Void(); @@ -10813,8 +10664,8 @@ ACTOR Future purgeBlobGranulesActor(Reference db, } if (tenant.present()) { - TenantMapEntry tme = wait(blobLoadTenantMapEntry(&cx, range.begin, tenant)); - purgeRange = purgeRange.withPrefix(tme.prefix); + int64_t tenantId = wait(lookupTenant(cx, tenant.get(), SpanContext())); + purgeRange = purgeRange.withPrefix(TenantAPI::idToPrefix(tenantId)); } loop { @@ -10916,8 +10767,8 @@ ACTOR Future setBlobRangeActor(Reference cx, state Reference tr = makeReference(db); if (tenantName.present()) { - TenantMapEntry tme = wait(blobLoadTenantMapEntry(&db, range.begin, tenantName)); - range = range.withPrefix(tme.prefix); + int64_t tenantId = wait(lookupTenant(db, tenantName.get(), SpanContext())); + range = range.withPrefix(TenantAPI::idToPrefix(tenantId)); } state Value value = active ? blobRangeActive : blobRangeInactive; @@ -10977,12 +10828,13 @@ ACTOR Future>> listBlobbifiedRangesActor(Refer state Database db(cx); state Transaction tr(db); - state TenantMapEntry tme; + state Key tenantPrefix; state Standalone> blobRanges; if (tenantName.present()) { - wait(store(tme, blobLoadTenantMapEntry(&db, range.begin, tenantName))); - range = range.withPrefix(tme.prefix); + int64_t tenantId = wait(lookupTenant(db, tenantName.get(), SpanContext())); + tenantPrefix = TenantAPI::idToPrefix(tenantId); + range = range.withPrefix(tenantPrefix); } loop { @@ -11002,14 +10854,14 @@ ACTOR Future>> listBlobbifiedRangesActor(Refer state Standalone> tenantBlobRanges; for (auto& blobRange : blobRanges) { // Filter out blob ranges that span tenants for some reason. - if (!blobRange.begin.startsWith(tme.prefix) || !blobRange.end.startsWith(tme.prefix)) { + if (!blobRange.begin.startsWith(tenantPrefix) || !blobRange.end.startsWith(tenantPrefix)) { TraceEvent("ListBlobbifiedRangeSpansTenants") .suppressFor(/*seconds=*/5) .detail("Tenant", tenantName.get()) .detail("Range", blobRange); continue; } - tenantBlobRanges.push_back_deep(tenantBlobRanges.arena(), blobRange.removePrefix(tme.prefix)); + tenantBlobRanges.push_back_deep(tenantBlobRanges.arena(), blobRange.removePrefix(tenantPrefix)); } return tenantBlobRanges; } diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 388fe45e1e..62bc993a53 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -488,6 +488,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 ); init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 ); init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 ); + init( TENANT_ID_REQUEST_MAX_QUEUE_SIZE, 1e6 ); init( COMMIT_PROXY_LIVENESS_TIMEOUT, 20.0 ); init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005; diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 90ac004621..75e54cf644 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -90,8 +90,6 @@ public: int LOCATION_CACHE_EVICTION_SIZE_SIM; double LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD; double LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL; - int TENANT_CACHE_EVICTION_SIZE; - int TENANT_CACHE_EVICTION_SIZE_SIM; int GET_RANGE_SHARD_LIMIT; int WARM_RANGE_SHARD_LIMIT; diff --git a/fdbclient/include/fdbclient/CommitProxyInterface.h b/fdbclient/include/fdbclient/CommitProxyInterface.h index 7b202b7a7f..b6e32da3c9 100644 --- a/fdbclient/include/fdbclient/CommitProxyInterface.h +++ b/fdbclient/include/fdbclient/CommitProxyInterface.h @@ -62,6 +62,7 @@ struct CommitProxyInterface { RequestStream exclusionSafetyCheckReq; RequestStream getDDMetrics; PublicRequestStream expireIdempotencyId; + PublicRequestStream getTenantId; UID id() const { return commit.getEndpoint().token; } std::string toString() const { return id().shortString(); } @@ -90,6 +91,7 @@ struct CommitProxyInterface { getDDMetrics = RequestStream(commit.getEndpoint().getAdjustedEndpoint(9)); expireIdempotencyId = PublicRequestStream(commit.getEndpoint().getAdjustedEndpoint(10)); + getTenantId = PublicRequestStream(commit.getEndpoint().getAdjustedEndpoint(11)); } } @@ -107,6 +109,7 @@ struct CommitProxyInterface { streams.push_back(exclusionSafetyCheckReq.getReceiver()); streams.push_back(getDDMetrics.getReceiver()); streams.push_back(expireIdempotencyId.getReceiver()); + streams.push_back(getTenantId.getReceiver()); FlowTransport::transport().addEndpoints(streams); } }; @@ -361,10 +364,46 @@ struct GetReadVersionRequest : TimedRequest { } }; +struct GetTenantIdReply { + constexpr static FileIdentifier file_identifier = 11441284; + int64_t tenantId = TenantInfo::INVALID_TENANT; + + GetTenantIdReply() {} + GetTenantIdReply(int64_t tenantId) : tenantId(tenantId) {} + + template + void serialize(Ar& ar) { + serializer(ar, tenantId); + } +}; + +struct GetTenantIdRequest { + constexpr static FileIdentifier file_identifier = 11299717; + SpanContext spanContext; + TenantName tenantName; + ReplyPromise reply; + + // This version is used to specify the minimum metadata version a proxy must have in order to declare that + // a tenant is not present. If the metadata version is lower, the proxy must wait in case the tenant gets + // created. If latestVersion is specified, then the proxy will wait until it is sure that it has received + // updates from other proxies before answering. + Version minTenantVersion; + + GetTenantIdRequest() : minTenantVersion(latestVersion) {} + GetTenantIdRequest(SpanContext spanContext, TenantNameRef const& tenantName, Version minTenantVersion) + : spanContext(spanContext), tenantName(tenantName), minTenantVersion(minTenantVersion) {} + + bool verify() const { return true; } + + template + void serialize(Ar& ar) { + serializer(ar, reply, spanContext, tenantName, minTenantVersion); + } +}; + struct GetKeyServerLocationsReply { constexpr static FileIdentifier file_identifier = 10636023; Arena arena; - TenantMapEntry tenantEntry; std::vector>> results; // if any storage servers in results have a TSS pair, that mapping is in here @@ -379,7 +418,7 @@ struct GetKeyServerLocationsReply { template void serialize(Ar& ar) { - serializer(ar, results, resultsTssMapping, tenantEntry, resultsTagMapping, arena); + serializer(ar, results, resultsTssMapping, resultsTagMapping, arena); } }; diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index b5499c904a..d161861273 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -203,13 +203,11 @@ struct EndpointFailureInfo { }; struct KeyRangeLocationInfo { - TenantMapEntry tenantEntry; KeyRange range; Reference locations; KeyRangeLocationInfo() {} - KeyRangeLocationInfo(TenantMapEntry tenantEntry, KeyRange range, Reference locations) - : tenantEntry(tenantEntry), range(range), locations(locations) {} + KeyRangeLocationInfo(KeyRange range, Reference locations) : range(range), locations(locations) {} }; struct OverlappingChangeFeedsInfo { @@ -260,22 +258,17 @@ public: return cx; } - Optional getCachedLocation(const Optional& tenant, + Optional getCachedLocation(const TenantInfo& tenant, const KeyRef&, Reverse isBackward = Reverse::False); - bool getCachedLocations(const Optional& tenant, + bool getCachedLocations(const TenantInfo& tenant, const KeyRangeRef&, std::vector&, int limit, Reverse reverse); - void cacheTenant(const TenantName& tenant, const TenantMapEntry& tenantEntry); - Reference setCachedLocation(const Optional& tenant, - const TenantMapEntry& tenantEntry, - const KeyRangeRef&, - const std::vector&); - void invalidateCachedTenant(const TenantNameRef& tenant); - void invalidateCache(const KeyRef& tenantPrefix, const KeyRef& key, Reverse isBackward = Reverse::False); - void invalidateCache(const KeyRef& tenantPrefix, const KeyRangeRef& keys); + Reference setCachedLocation(const KeyRangeRef&, const std::vector&); + void invalidateCache(const Optional& tenantPrefix, const KeyRef& key, Reverse isBackward = Reverse::False); + void invalidateCache(const Optional& tenantPrefix, const KeyRangeRef& keys); // Records that `endpoint` is failed on a healthy server. void setFailedEndpointOnHealthyServer(const Endpoint& endpoint); @@ -496,10 +489,8 @@ public: // Cache of location information int locationCacheSize; - int tenantCacheSize; CoalescedKeyRangeMap> locationCache; std::unordered_map failedEndpointsOnHealthyServersInfo; - std::unordered_map tenantCache; std::map server_interf; std::map blobWorker_interf; // blob workers don't change endpoints for the same ID @@ -562,6 +553,8 @@ public: Counter transactionKeyServerLocationRequests; Counter transactionKeyServerLocationRequestsCompleted; Counter transactionStatusRequests; + Counter transactionTenantLookupRequests; + Counter transactionTenantLookupRequestsCompleted; Counter transactionsTooOld; Counter transactionsFutureVersions; Counter transactionsNotCommitted; diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 2fcb303389..09e97b81d0 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -610,7 +610,7 @@ KeyRef keyBetween(const KeyRangeRef& keys); // Returns a randomKey between keys. If it's impossible, return keys.end. Key randomKeyBetween(const KeyRangeRef& keys); -KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, KeyRef prefix); +KeyRangeRef toPrefixRelativeRange(KeyRangeRef range, Optional prefix); struct KeySelectorRef { private: diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index 1858fc1565..51277a1be3 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -237,6 +237,44 @@ struct Watch : public ReferenceCounted, NonCopyable { void setWatch(Future watchFuture); }; +class Tenant : public ReferenceCounted { +public: + Tenant(Future id, Optional name) : idFuture(id), name(name) {} + + int64_t id() const { + ASSERT(idFuture.isReady()); + return idFuture.get(); + } + + StringRef prefix() const { + ASSERT(idFuture.isReady()); + if (bigEndianId == -1) { + bigEndianId = bigEndian64(idFuture.get()); + } + return StringRef(reinterpret_cast(&bigEndianId), TenantAPI::PREFIX_SIZE); + } + + std::string description() const { + StringRef nameStr = name.castTo().orDefault(""_sr); + if (idFuture.canGet()) { + return format("%*s (%lld)", nameStr.size(), nameStr.begin(), idFuture.get()); + } else { + return format("%*s", nameStr.size(), nameStr.begin()); + } + } + + Future idFuture; + Optional name; + +private: + mutable int64_t bigEndianId = -1; +}; + +template <> +struct Traceable : std::true_type { + static std::string toString(const Tenant& tenant) { return printable(tenant.description()); } +}; + FDB_DECLARE_BOOLEAN_PARAM(AllowInvalidTenantID); struct TransactionState : ReferenceCounted { @@ -280,37 +318,24 @@ struct TransactionState : ReferenceCounted { // VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy TransactionState(Database cx, - Optional tenant, + Optional tenantName, TaskPriority taskID, SpanContext spanContext, Reference trLogInfo); Reference cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const; - TenantInfo getTenantInfo(AllowInvalidTenantID allowInvalidId = AllowInvalidTenantID::False); + TenantInfo getTenantInfo(AllowInvalidTenantID allowInvalidTenantId = AllowInvalidTenantID::False); - Optional const& tenant(); + Optional> const& tenant(); bool hasTenant() const; - int64_t tenantId() const { return tenantId_; } - void trySetTenantId(int64_t tenantId) { - if (tenantId_ == TenantInfo::INVALID_TENANT) { - tenantId_ = tenantId; - } - } - - Future handleUnknownTenant(); + int64_t tenantId() const { return tenant_.present() ? tenant_.get()->id() : TenantInfo::INVALID_TENANT; } private: - Optional tenant_; - int64_t tenantId_ = TenantInfo::INVALID_TENANT; + Optional> tenant_; bool tenantSet; }; -class Tenant { - Future id; - TenantName name; -}; - class Transaction : NonCopyable { public: explicit Transaction(Database const& cx, Optional const& tenant = Optional()); @@ -507,7 +532,7 @@ public: return Standalone>(tr.transaction.write_conflict_ranges, tr.arena); } - Optional getTenant() { return trState->tenant(); } + Optional> getTenant() { return trState->tenant(); } Reference trState; std::vector> watches; diff --git a/fdbclient/include/fdbclient/ReadYourWrites.h b/fdbclient/include/fdbclient/ReadYourWrites.h index afa7dbc890..c9bca102fa 100644 --- a/fdbclient/include/fdbclient/ReadYourWrites.h +++ b/fdbclient/include/fdbclient/ReadYourWrites.h @@ -212,7 +212,7 @@ public: } Transaction& getTransaction() { return tr; } - Optional getTenant() { return tr.getTenant(); } + Optional> getTenant() { return tr.getTenant(); } TagSet const& getTags() const { return tr.getTags(); } // used in template functions as returned Future type diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index e7b46769f1..2b8726ac2b 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -403,6 +403,7 @@ public: double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET; int START_TRANSACTION_MAX_QUEUE_SIZE; int KEY_LOCATION_MAX_QUEUE_SIZE; + int TENANT_ID_REQUEST_MAX_QUEUE_SIZE; double COMMIT_PROXY_LIVENESS_TIMEOUT; double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE; diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 791d8d228c..d3ac16ffa4 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -3575,14 +3575,14 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl ASSERT(tenantEntry.get().id == req.tenantInfo.tenantId); tenantPrefix = tenantEntry.get().prefix; } else { - CODE_PROBE(true, "Blob worker unknown tenant"); + CODE_PROBE(true, "Blob worker tenant not found"); // FIXME - better way. Wait on retry here, or just have better model for tenant metadata? // Just throw wrong_shard_server and make the client retry and assume we load it later - TraceEvent(SevDebug, "BlobWorkerRequestUnknownTenant", bwData->id) + TraceEvent(SevDebug, "BlobWorkerRequestTenantNotFound", bwData->id) .suppressFor(5.0) .detail("TenantName", req.tenantInfo.name.get()) .detail("TenantId", req.tenantInfo.tenantId); - throw unknown_tenant(); + throw tenant_not_found(); } req.keyRange = KeyRangeRef(req.keyRange.begin.withPrefix(tenantPrefix.get(), req.arena), req.keyRange.end.withPrefix(tenantPrefix.get(), req.arena)); diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 4b5dad71b6..103855f04c 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -251,28 +251,15 @@ struct ResolutionRequestBuilder { } }; -ErrorOr lookupTenant(ProxyCommitData* commitData, TenantNameRef tenant, bool logOnFailure) { - auto itr = commitData->tenantNameIndex.find(tenant); - if (itr == commitData->tenantNameIndex.end()) { - if (logOnFailure) { - TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("TenantName", tenant); - } - - return tenant_not_found(); - } - - return itr->second; -} - -bool checkTenant(ProxyCommitData* commitData, int64_t tenant, Optional tenantName) { +bool checkTenantNoWait(ProxyCommitData* commitData, int64_t tenant, const char* context, bool logOnFailure) { if (tenant != TenantInfo::INVALID_TENANT) { auto itr = commitData->tenantMap.find(tenant); if (itr == commitData->tenantMap.end()) { - TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("Tenant", tenant); - return false; - } else if (tenantName.present() && itr->second != tenantName.get()) { - // This is temporary and will be removed when the client stops caching the tenant name -> ID mapping - TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("Tenant", tenant); + if (logOnFailure) { + TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid) + .detail("Tenant", tenant) + .detail("Context", context); + } return false; } @@ -282,6 +269,19 @@ bool checkTenant(ProxyCommitData* commitData, int64_t tenant, Optional checkTenant(ProxyCommitData* commitData, int64_t tenant, Version minVersion, const char* context) { + loop { + state Version currentVersion = commitData->version.get(); + if (checkTenantNoWait(commitData, tenant, context, currentVersion >= minVersion)) { + return true; + } else if (currentVersion >= minVersion) { + return false; + } else { + wait(commitData->version.whenAtLeast(currentVersion + 1)); + } + } +} + bool verifyTenantPrefix(ProxyCommitData* const commitData, const CommitTransactionRequest& req) { if (req.tenantInfo.hasTenant()) { KeyRef tenantPrefix = req.tenantInfo.prefix.get(); @@ -1164,11 +1164,11 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self int t; for (t = 0; t < trs.size() && !self->forceRecovery; t++) { if (self->committed[t] == ConflictBatch::TransactionCommitted && (!self->locked || trs[t].isLockAware())) { - bool isValid = checkTenant(pProxyCommitData, trs[t].tenantInfo.tenantId, trs[t].tenantInfo.name); + bool isValid = checkTenantNoWait(pProxyCommitData, trs[t].tenantInfo.tenantId, "Commit", true); if (!isValid) { self->committed[t] = ConflictBatch::TransactionTenantFailure; - trs[t].reply.sendError(unknown_tenant()); + trs[t].reply.sendError(tenant_not_found()); } else { self->commitCount++; applyMetadataMutations(trs[t].spanContext, @@ -2108,10 +2108,8 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat } } -ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { +ACTOR static Future doTenantIdRequest(GetTenantIdRequest req, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore - getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations; - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; wait(commitData->validState.getFuture()); wait(delay(0, TaskPriority::DefaultEndpoint)); @@ -2124,37 +2122,75 @@ ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsReques ? delay(SERVER_KNOBS->FUTURE_VERSION_DELAY) : Never(); - while (req.tenant.name.present() && tenantId.isError()) { - bool finalQuery = commitData->version.get() >= minTenantVersion; - tenantId = lookupTenant(commitData, req.tenant.name.get(), finalQuery); - - if (tenantId.isError()) { - if (finalQuery) { - req.reply.sendError(tenantId.getError()); - return Void(); - } else { - choose { - // Wait until we are sure that we've received metadata updates through minTenantVersion - // If latestVersion is specified, this will wait until we have definitely received - // updates through the version at the time we received the request - when(wait(commitData->version.whenAtLeast(minTenantVersion))) {} - when(wait(futureVersionDelay)) { - req.reply.sendError(future_version()); - return Void(); - } - } - } + choose { + // Wait until we are sure that we've received metadata updates through minTenantVersion + // If latestVersion is specified, this will wait until we have definitely received + // updates through the version at the time we received the request + when(wait(commitData->version.whenAtLeast(minTenantVersion))) {} + when(wait(futureVersionDelay)) { + req.reply.sendError(future_version()); + ++commitData->stats.tenantIdRequestOut; + ++commitData->stats.tenantIdRequestErrors; + return Void(); } } + auto itr = commitData->tenantNameIndex.find(req.tenantName); + if (itr != commitData->tenantNameIndex.end()) { + req.reply.send(GetTenantIdReply(itr->second)); + } else { + TraceEvent(SevWarn, "CommitProxyTenantNotFound", commitData->dbgid).detail("TenantName", req.tenantName); + ++commitData->stats.tenantIdRequestErrors; + req.reply.sendError(tenant_not_found()); + } + + ++commitData->stats.tenantIdRequestOut; + return Void(); +} + +ACTOR static Future tenantIdServer(CommitProxyInterface proxy, + PromiseStream> addActor, + ProxyCommitData* commitData) { + loop { + GetTenantIdRequest req = waitNext(proxy.getTenantId.getFuture()); + // WARNING: this code is run at a high priority, so it needs to do as little work as possible + if (commitData->stats.tenantIdRequestIn.getValue() - commitData->stats.tenantIdRequestOut.getValue() > + SERVER_KNOBS->TENANT_ID_REQUEST_MAX_QUEUE_SIZE) { + ++commitData->stats.tenantIdRequestErrors; + req.reply.sendError(commit_proxy_memory_limit_exceeded()); + TraceEvent(SevWarnAlways, "ProxyGetTenantRequestThresholdExceeded").suppressFor(60); + } else { + ++commitData->stats.tenantIdRequestIn; + addActor.send(doTenantIdRequest(req, commitData)); + } + } +} + +ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { + // We can't respond to these requests until we have valid txnStateStore + getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations; + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; + + wait(commitData->validState.getFuture()); + + state Version minVersion = commitData->stats.lastCommitVersionAssigned + 1; + wait(delay(0, TaskPriority::DefaultEndpoint)); + + bool validTenant = wait(checkTenant(commitData, req.tenant.tenantId, minVersion, "GetKeyServerLocation")); + + if (!validTenant) { + ++commitData->stats.keyServerLocationOut; + req.reply.sendError(tenant_not_found()); + return Void(); + } + std::unordered_set tssMappingsIncluded; GetKeyServerLocationsReply rep; - if (req.tenant.name.present()) { - rep.tenantEntry = TenantMapEntry(tenantId.get(), req.tenant.name.get(), TenantState::READY); - req.begin = req.begin.withPrefix(rep.tenantEntry.prefix, req.arena); + if (req.tenant.hasTenant()) { + req.begin = req.begin.withPrefix(req.tenant.prefix.get(), req.arena); if (req.end.present()) { - req.end = req.end.get().withPrefix(rep.tenantEntry.prefix, req.arena); + req.end = req.end.get().withPrefix(req.tenant.prefix.get(), req.arena); } } @@ -2936,6 +2972,7 @@ ACTOR Future commitProxyServerCore(CommitProxyInterface proxy, TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit); addActor.send(monitorRemoteCommitted(&commitData)); + addActor.send(tenantIdServer(proxy, addActor, &commitData)); addActor.send(readRequestServer(proxy, addActor, &commitData)); addActor.send(rejoinServer(proxy, &commitData)); addActor.send(ddMetricsRequestServer(proxy, db)); diff --git a/fdbserver/MockGlobalState.actor.cpp b/fdbserver/MockGlobalState.actor.cpp index 7efa767e52..c613019507 100644 --- a/fdbserver/MockGlobalState.actor.cpp +++ b/fdbserver/MockGlobalState.actor.cpp @@ -587,10 +587,8 @@ Future MockGlobalState::getKeyLocation(TenantInfo tenant, ASSERT_EQ(srcTeam.size(), 1); rep.results.emplace_back(single, extractStorageServerInterfaces(srcTeam.front().servers)); - return KeyRangeLocationInfo( - rep.tenantEntry, - KeyRange(toPrefixRelativeRange(rep.results[0].first, rep.tenantEntry.prefix), rep.arena), - buildLocationInfo(rep.results[0].second)); + return KeyRangeLocationInfo(KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena), + buildLocationInfo(rep.results[0].second)); } Future> MockGlobalState::getKeyRangeLocations( @@ -622,8 +620,7 @@ Future> MockGlobalState::getKeyRangeLocations( std::vector results; for (int shard = 0; shard < rep.results.size(); shard++) { - results.emplace_back(rep.tenantEntry, - (toPrefixRelativeRange(rep.results[shard].first, rep.tenantEntry.prefix) & keys), + results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys), buildLocationInfo(rep.results[shard].second)); } return results; diff --git a/fdbserver/include/fdbserver/ProxyCommitData.actor.h b/fdbserver/include/fdbserver/ProxyCommitData.actor.h index 8bee246acd..d69334aa94 100644 --- a/fdbserver/include/fdbserver/ProxyCommitData.actor.h +++ b/fdbserver/include/fdbserver/ProxyCommitData.actor.h @@ -64,6 +64,9 @@ struct ProxyStats { Counter mutations; Counter conflictRanges; Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors; + Counter tenantIdRequestIn; + Counter tenantIdRequestOut; + Counter tenantIdRequestErrors; Counter txnExpensiveClearCostEstCount; Version lastCommitVersionAssigned; @@ -116,7 +119,8 @@ struct ProxyStats { commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc), - keyServerLocationErrors("KeyServerLocationErrors", cc), + keyServerLocationErrors("KeyServerLocationErrors", cc), tenantIdRequestIn("TenantIdRequestIn", cc), + tenantIdRequestOut("TenantIdRequestOut", cc), tenantIdRequestErrors("TenantIdRequestErrors", cc), txnExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), lastCommitVersionAssigned(0), commitLatencySample("CommitLatencyMetrics", id, diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c21e2e28a1..e9b29b6c25 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -138,7 +138,7 @@ bool canReplyWith(Error e) { case error_code_server_overloaded: case error_code_change_feed_popped: case error_code_tenant_name_required: - case error_code_unknown_tenant: + case error_code_tenant_not_found: // getMappedRange related exceptions that are not retriable: case error_code_mapper_bad_index: case error_code_mapper_no_such_key: @@ -2015,12 +2015,7 @@ void StorageServer::checkTenantEntry(Version version, TenantInfo tenantInfo) { TraceEvent(SevWarn, "StorageTenantNotFound", thisServerID) .detail("Tenant", tenantInfo.tenantId) .backtrace(); - throw unknown_tenant(); - } - - // TENANT_TODO: this is temporary pending other changes to remove reliance on names - if (*itr != tenantInfo.name.get()) { - throw unknown_tenant(); + throw tenant_not_found(); } } } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index e562a60ee1..f947cdabfc 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2679,6 +2679,7 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.getStorageServerRejoinInfo); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.txnState); + DUMPTOKEN(recruited.getTenantId); // printf("Recruited as commitProxyServer\n"); errorForwarders.add(zombie(recruited, diff --git a/fdbserver/workloads/AuthzSecurity.actor.cpp b/fdbserver/workloads/AuthzSecurity.actor.cpp index 2443ae309f..f00800edcc 100644 --- a/fdbserver/workloads/AuthzSecurity.actor.cpp +++ b/fdbserver/workloads/AuthzSecurity.actor.cpp @@ -136,21 +136,22 @@ struct AuthzSecurityWorkload : TestWorkload { } } - ACTOR static Future refreshAndGetCachedLocation(AuthzSecurityWorkload* self, - Database cx, - TenantName tenant, - Standalone token, - StringRef key) { + ACTOR static Future> refreshAndGetCachedLocation( + AuthzSecurityWorkload* self, + Database cx, + TenantName tenant, + Standalone token, + StringRef key) { state Transaction tr(cx, tenant); self->setAuthToken(tr, token); loop { try { // trigger GetKeyServerLocationsRequest and subsequent cache update - Optional value = wait(tr.get(key)); - (void)value; - auto loc = cx->getCachedLocation(tenant, key); + wait(success(tr.get(key))); + int64_t tenantId = wait(tr.getTenant().get()->idFuture); + auto loc = cx->getCachedLocation(tr.trState->getTenantInfo(), key); if (loc.present()) { - return loc.get(); + return std::make_pair(loc.get(), tenantId); } else { wait(delay(0.1)); } @@ -173,16 +174,16 @@ struct AuthzSecurityWorkload : TestWorkload { Optional> expectedValue, Standalone token, Database cx, - KeyRangeLocationInfo loc) { + std::pair locationAndTenant) { loop { GetValueRequest req; req.key = key; req.version = committedVersion; - req.tenantInfo.tenantId = loc.tenantEntry.id; + req.tenantInfo.tenantId = locationAndTenant.second; req.tenantInfo.name = tenant; req.tenantInfo.token = token; try { - GetValueReply reply = wait(loadBalance(loc.locations->locations(), + GetValueReply reply = wait(loadBalance(locationAndTenant.first.locations->locations(), &StorageServerInterface::getValue, req, TaskPriority::DefaultPromiseEndpoint, @@ -212,7 +213,8 @@ struct AuthzSecurityWorkload : TestWorkload { state Version committedVersion = wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value)); // refresh key location cache via get() - KeyRangeLocationInfo loc = wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key)); + std::pair locationAndTenant = + wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key)); if (positive) { // Supposed to succeed. Expected to occasionally fail because of buggify, faultInjection, or data // distribution, but should not return permission_denied @@ -223,7 +225,7 @@ struct AuthzSecurityWorkload : TestWorkload { value, self->signedToken /* passing correct token */, cx, - loc)); + locationAndTenant)); if (!outcome.present()) { ++self->crossTenantGetPositive; } else if (outcome.get().code() == error_code_permission_denied) { @@ -241,7 +243,7 @@ struct AuthzSecurityWorkload : TestWorkload { value, self->signedTokenAnotherTenant /* deliberately passing bad token */, cx, - loc)); + locationAndTenant)); // Should always fail. Expected to return permission_denied, but expected to occasionally fail with // different errors if (!outcome.present()) { @@ -263,17 +265,15 @@ struct AuthzSecurityWorkload : TestWorkload { Value newValue, Version readVersion, Database cx, - KeyRangeLocationInfo loc) { + int64_t tenantId) { loop { - auto const& tenantEntry = loc.tenantEntry; - ASSERT(!tenantEntry.prefix.empty()); - state Key prefixedKey = key.withPrefix(tenantEntry.prefix); + state Key prefixedKey = key.withPrefix(TenantAPI::idToPrefix(tenantId)); CommitTransactionRequest req; req.transaction.mutations.push_back(req.arena, MutationRef(MutationRef::SetValue, prefixedKey, newValue)); req.transaction.read_snapshot = readVersion; req.tenantInfo.name = tenant; req.tenantInfo.token = token; - req.tenantInfo.tenantId = tenantEntry.id; + req.tenantInfo.tenantId = tenantId; try { CommitID reply = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), &CommitProxyInterface::commit, @@ -297,11 +297,12 @@ struct AuthzSecurityWorkload : TestWorkload { state Version committedVersion = wait(setAndCommitKeyValueAndGetVersion(self, cx, self->tenant, self->signedToken, key, value)); // refresh key location cache to extract tenant prefix - KeyRangeLocationInfo loc = wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key)); + std::pair locationAndTenant = + wait(refreshAndGetCachedLocation(self, cx, self->tenant, self->signedToken, key)); if (positive) { // Expected to succeed, may occasionally fail - Optional outcome = - wait(tryCommit(self, self->tenant, self->signedToken, key, newValue, committedVersion, cx, loc)); + Optional outcome = wait(tryCommit( + self, self->tenant, self->signedToken, key, newValue, committedVersion, cx, locationAndTenant.second)); if (!outcome.present()) { ++self->crossTenantCommitPositive; } else if (outcome.get().code() == error_code_permission_denied) { @@ -311,8 +312,14 @@ struct AuthzSecurityWorkload : TestWorkload { .log(); } } else { - Optional outcome = wait(tryCommit( - self, self->tenant, self->signedTokenAnotherTenant, key, newValue, committedVersion, cx, loc)); + Optional outcome = wait(tryCommit(self, + self->tenant, + self->signedTokenAnotherTenant, + key, + newValue, + committedVersion, + cx, + locationAndTenant.second)); if (!outcome.present()) { TraceEvent(SevError, "AuthzSecurityError") .detail("Case", "CrossTenantGetDisallowed") diff --git a/fdbserver/workloads/FuzzApiCorrectness.actor.cpp b/fdbserver/workloads/FuzzApiCorrectness.actor.cpp index 4caa4b7a49..96ebaf435a 100644 --- a/fdbserver/workloads/FuzzApiCorrectness.actor.cpp +++ b/fdbserver/workloads/FuzzApiCorrectness.actor.cpp @@ -451,8 +451,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload { wait(timeoutError(unsafeThreadFutureToFuture(tr->commit()), 30)); } catch (Error& e) { if (e.code() == error_code_client_invalid_operation || - e.code() == error_code_transaction_too_large || e.code() == error_code_unknown_tenant || - e.code() == error_code_invalid_option) { + e.code() == error_code_transaction_too_large || e.code() == error_code_invalid_option) { throw not_committed(); } } diff --git a/fdbserver/workloads/GetEstimatedRangeSize.actor.cpp b/fdbserver/workloads/GetEstimatedRangeSize.actor.cpp index e8ad94a358..5f80bd04e6 100644 --- a/fdbserver/workloads/GetEstimatedRangeSize.actor.cpp +++ b/fdbserver/workloads/GetEstimatedRangeSize.actor.cpp @@ -105,15 +105,12 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload { ACTOR static Future getSize(GetEstimatedRangeSizeWorkload* self, Database cx) { state ReadYourWritesTransaction tr(cx, self->tenant); state double totalDelay = 0.0; - TraceEvent(SevDebug, "GetSizeStart") - .detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr); + TraceEvent(SevDebug, "GetSizeStart").detail("Tenant", tr.getTenant()); loop { try { state int64_t size = wait(tr.getEstimatedRangeSizeBytes(normalKeys)); - TraceEvent(SevDebug, "GetSizeResult") - .detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr) - .detail("Size", size); + TraceEvent(SevDebug, "GetSizeResult").detail("Tenant", tr.getTenant()).detail("Size", size); if (!sizeIsAsExpected(self, size) && totalDelay < 300.0) { totalDelay += 5.0; wait(delay(5.0)); @@ -121,9 +118,7 @@ struct GetEstimatedRangeSizeWorkload : TestWorkload { return size; } } catch (Error& e) { - TraceEvent(SevDebug, "GetSizeError") - .errorUnsuppressed(e) - .detail("Tenant", tr.getTenant().present() ? tr.getTenant().get() : "none"_sr); + TraceEvent(SevDebug, "GetSizeError").errorUnsuppressed(e).detail("Tenant", tr.getTenant()); wait(tr.onError(e)); } } diff --git a/flow/Error.cpp b/flow/Error.cpp index 2506a19fa8..41cbb419a3 100644 --- a/flow/Error.cpp +++ b/flow/Error.cpp @@ -208,7 +208,6 @@ const std::set transactionRetryableErrors = { error_code_not_committed, error_code_process_behind, error_code_batch_transaction_throttled, error_code_tag_throttled, - error_code_unknown_tenant, error_code_proxy_tag_throttled, // maybe committed error error_code_cluster_version_changed, diff --git a/flow/include/flow/Trace.h b/flow/include/flow/Trace.h index 4e109ee854..52e08f1e1d 100644 --- a/flow/include/flow/Trace.h +++ b/flow/include/flow/Trace.h @@ -405,6 +405,13 @@ struct Traceable : TraceableStringImpl {}; template <> struct Traceable : TraceableStringImpl {}; +template +struct Traceable> : std::conditional::value, std::true_type, std::false_type>::type { + static std::string toString(const Reference& value) { + return value ? Traceable::toString(*value) : "[not set]"; + } +}; + template struct SpecialTraceMetricType : std::conditional::value || std::is_enum::value, std::true_type, std::false_type>::type { diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 680eb0b9a6..38d764dd35 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -249,7 +249,6 @@ ERROR( tenant_not_empty, 2133, "Cannot delete a non-empty tenant" ) ERROR( invalid_tenant_name, 2134, "Tenant name cannot begin with \\xff" ) ERROR( tenant_prefix_allocator_conflict, 2135, "The database already has keys stored at the prefix allocated for the tenant" ) ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster" ) -ERROR( unknown_tenant, 2137, "Tenant is not available from this server" ) ERROR( illegal_tenant_access, 2138, "Illegal tenant access" ) ERROR( invalid_tenant_group_name, 2139, "Tenant group name cannot begin with \\xff" ) ERROR( invalid_tenant_configuration, 2140, "Tenant configuration is invalid" ) diff --git a/tests/slow/ApiCorrectnessAtomicRestore.toml b/tests/slow/ApiCorrectnessAtomicRestore.toml index a99d7f9f1b..75863d9c48 100644 --- a/tests/slow/ApiCorrectnessAtomicRestore.toml +++ b/tests/slow/ApiCorrectnessAtomicRestore.toml @@ -1,3 +1,7 @@ +[configuration] +# Tenant lookups fail during the atomic restore because they aren't affected by locking +allowDefaultTenant = false + [[knobs]] rocksdb_read_value_timeout=300.0 rocksdb_read_value_prefix_timeout=300.0 diff --git a/tests/slow/WriteDuringReadAtomicRestore.toml b/tests/slow/WriteDuringReadAtomicRestore.toml index a148f0a1c9..04b6e6457a 100644 --- a/tests/slow/WriteDuringReadAtomicRestore.toml +++ b/tests/slow/WriteDuringReadAtomicRestore.toml @@ -1,5 +1,7 @@ [configuration] StderrSeverity = 30 +# Tenant lookups fail during the atomic restore because they aren't affected by locking +allowDefaultTenant = false [[test]] testTitle = 'WriteDuringReadTest'