diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 5268aab702..8e1cd2de8c 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -786,7 +786,7 @@ struct CopyLogRangeTaskFunc : TaskFuncBase { loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; + tr.trState->options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; wait(checkDatabaseLock(&tr, BinaryReader::fromStringRef( task->params[BackupAgentBase::keyConfigLogUid], Unversioned()))); @@ -1531,7 +1531,7 @@ struct OldCopyLogRangeTaskFunc : TaskFuncBase { loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; + tr.trState->options.sizeLimit = 2 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; wait(checkDatabaseLock(&tr, BinaryReader::fromStringRef( task->params[BackupAgentBase::keyConfigLogUid], Unversioned()))); diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 859e4d9212..30e97a3dcc 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -132,19 +132,39 @@ public: } }; +struct WatchParameters : public ReferenceCounted { + const Key key; + const Optional value; + + const Version version; + const TagSet tags; + const SpanID spanID; + const TaskPriority taskID; + const Optional debugID; + const UseProvisionalProxies useProvisionalProxies; + + WatchParameters(Key key, + Optional value, + Version version, + TagSet tags, + SpanID spanID, + TaskPriority taskID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies) + : key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID), debugID(debugID), + useProvisionalProxies(useProvisionalProxies) {} +}; + class WatchMetadata : public ReferenceCounted { public: - Key key; - Optional value; - Version version; Promise watchPromise; Future watchFuture; Future watchFutureSS; - TransactionInfo info; - TagSet tags; + Reference parameters; - WatchMetadata(Key key, Optional value, Version version, TransactionInfo info, TagSet tags); + WatchMetadata(Reference parameters) + : watchFuture(watchPromise.getFuture()), parameters(parameters) {} }; struct MutationAndVersionStream { @@ -225,12 +245,25 @@ public: bool sampleOnCost(uint64_t cost) const; void updateProxies(); - Reference getCommitProxies(bool useProvisionalProxies); - Future> getCommitProxiesFuture(bool useProvisionalProxies); - Reference getGrvProxies(bool useProvisionalProxies); + Reference getCommitProxies(UseProvisionalProxies useProvisionalProxies); + Future> getCommitProxiesFuture(UseProvisionalProxies useProvisionalProxies); + Reference getGrvProxies(UseProvisionalProxies useProvisionalProxies); Future onProxiesChanged() const; Future onClientLibStatusChanged() const; Future getHealthMetrics(bool detailed); + // Pass a negative value for `shardLimit` to indicate no limit on the shard number. + Future getStorageMetrics(KeyRange const& keys, int shardLimit); + Future, int>> waitStorageMetrics(KeyRange const& keys, + StorageMetrics const& min, + StorageMetrics const& max, + StorageMetrics const& permittedError, + int shardLimit, + int expectedShardCount); + Future>> splitStorageMetrics(KeyRange const& keys, + StorageMetrics const& limit, + StorageMetrics const& estimated); + + Future>> getReadHotRanges(KeyRange const& keys); // Returns the protocol version reported by the coordinator this client is connected to // If an expected version is given, the future won't return until the protocol version is different than expected @@ -498,7 +531,7 @@ public: EventCacheHolder connectToDatabaseEventCacheHolder; private: - std::unordered_map> watchMap; + std::unordered_map> watchMap; }; #endif diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 302bcc7929..fd1d1fca35 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -124,6 +124,9 @@ Future loadBalance( } } // namespace +FDB_BOOLEAN_PARAM(TransactionRecordLogInfo); +FDB_DEFINE_BOOLEAN_PARAM(UseProvisionalProxies); + NetworkOptions networkOptions; TLSConfig tlsConfig(TLSEndpointType::CLIENT); @@ -1016,7 +1019,7 @@ ACTOR static Future getHealthMetricsActor(DatabaseContext* cx, bo loop { choose { when(wait(cx->onProxiesChanged())) {} - when(GetHealthMetricsReply rep = wait(basicLoadBalance(cx->getGrvProxies(false), + when(GetHealthMetricsReply rep = wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies::False), &GrvProxyInterface::getHealthMetrics, GetHealthMetricsRequest(sendDetailedRequest)))) { cx->healthMetrics.update(rep.healthMetrics, detailed, true); @@ -1913,9 +1916,8 @@ Reference DatabaseContext::getWatchMetadata(KeyRef key) const { } KeyRef DatabaseContext::setWatchMetadata(Reference metadata) { - KeyRef keyRef = metadata->key.contents(); - watchMap[keyRef] = metadata; - return keyRef; + watchMap[metadata->parameters->key] = metadata; + return metadata->parameters->key; } void DatabaseContext::deleteWatchMetadata(KeyRef key) { @@ -1926,12 +1928,6 @@ void DatabaseContext::clearWatchMetadata() { watchMap.clear(); } -WatchMetadata::WatchMetadata(Key key, Optional value, Version version, TransactionInfo info, TagSet tags) - : key(key), value(value), version(version), info(info), tags(tags) { - // create dummy future - watchFuture = watchPromise.getFuture(); -} - const UniqueOrderedOptionList& Database::getTransactionDefaults() const { ASSERT(db); return db->transactionDefaults; @@ -2245,7 +2241,7 @@ void DatabaseContext::updateProxies() { } } -Reference DatabaseContext::getCommitProxies(bool useProvisionalProxies) { +Reference DatabaseContext::getCommitProxies(UseProvisionalProxies useProvisionalProxies) { updateProxies(); if (proxyProvisional && !useProvisionalProxies) { return Reference(); @@ -2253,7 +2249,7 @@ Reference DatabaseContext::getCommitProxies(bool useProvisional return commitProxies; } -Reference DatabaseContext::getGrvProxies(bool useProvisionalProxies) { +Reference DatabaseContext::getGrvProxies(UseProvisionalProxies useProvisionalProxies) { updateProxies(); if (proxyProvisional && !useProvisionalProxies) { return Reference(); @@ -2263,7 +2259,8 @@ Reference DatabaseContext::getGrvProxies(bool useProvisionalProxie // Actor which will wait until the MultiInterface returned by the DatabaseContext cx is not // nullptr -ACTOR Future> getCommitProxiesFuture(DatabaseContext* cx, bool useProvisionalProxies) { +ACTOR Future> getCommitProxiesFuture(DatabaseContext* cx, + UseProvisionalProxies useProvisionalProxies) { loop { Reference commitProxies = cx->getCommitProxies(useProvisionalProxies); if (commitProxies) @@ -2273,7 +2270,8 @@ ACTOR Future> getCommitProxiesFuture(DatabaseContext* } // Returns a future which will not be set until the CommitProxyInfo of this DatabaseContext is not nullptr -Future> DatabaseContext::getCommitProxiesFuture(bool useProvisionalProxies) { +Future> DatabaseContext::getCommitProxiesFuture( + UseProvisionalProxies useProvisionalProxies) { return ::getCommitProxiesFuture(this, useProvisionalProxies); } @@ -2344,28 +2342,22 @@ AddressExclusion AddressExclusion::parse(StringRef const& key) { } } -Future getRange(Database const& cx, +Future> getValue(Reference const& trState, + Key const& key, + Future const& version, + TransactionRecordLogInfo const& recordLogInfo); + +Future getRange(Reference const& trState, Future const& fVersion, KeySelector const& begin, KeySelector const& end, GetRangeLimits const& limits, - Reverse const& reverse, - TransactionInfo const& info, - TagSet const& tags); + Reverse const& reverse); -ACTOR Future> getValue(Future version, - Key key, - Database cx, - TransactionInfo info, - Reference trLogInfo, - TagSet tags); - -ACTOR Future> fetchServerInterface(Database cx, - TransactionInfo info, - UID id, - TagSet tags, - Future ver = latestVersion) { - Optional val = wait(getValue(ver, serverListKeyFor(id), cx, info, Reference(), tags)); +ACTOR Future> fetchServerInterface(Reference trState, + Future ver, + UID id) { + Optional val = wait(getValue(trState, serverListKeyFor(id), ver, TransactionRecordLogInfo::False)); if (!val.present()) { // A storage server has been removed from serverList since we read keyServers return Optional(); @@ -2374,15 +2366,12 @@ ACTOR Future> fetchServerInterface(Database cx, return decodeServerListValue(val.get()); } -ACTOR Future>> transactionalGetServerInterfaces(Future ver, - Database cx, - TransactionInfo info, - std::vector ids, - TagSet tags) { +ACTOR Future>> +transactionalGetServerInterfaces(Reference trState, Future ver, std::vector ids) { state std::vector>> serverListEntries; serverListEntries.reserve(ids.size()); for (int s = 0; s < ids.size(); s++) { - serverListEntries.push_back(fetchServerInterface(cx, info, ids[s], tags, ver)); + serverListEntries.push_back(fetchServerInterface(trState, ver, ids[s])); } std::vector> serverListValues = wait(getAll(serverListEntries)); @@ -2422,31 +2411,36 @@ void updateTssMappings(Database cx, const GetKeyServerLocationsReply& reply) { // If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). // Otherwise returns the shard containing key -ACTOR Future>> -getKeyLocation_internal(Database cx, Key key, TransactionInfo info, Reverse isBackward = Reverse::False) { - state Span span("NAPI:getKeyLocation"_loc, info.spanID); +ACTOR Future>> getKeyLocation_internal( + Database cx, + Key key, + SpanID spanID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies, + Reverse isBackward) { + + state Span span("NAPI:getKeyLocation"_loc, spanID); if (isBackward) { ASSERT(key != allKeys.begin && key <= allKeys.end); } else { ASSERT(key < allKeys.end); } - if (info.debugID.present()) - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before"); + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.Before"); loop { ++cx->transactionKeyServerLocationRequests; choose { when(wait(cx->onProxiesChanged())) {} when(GetKeyServerLocationsReply rep = wait(basicLoadBalance( - cx->getCommitProxies(info.useProvisionalProxies), + cx->getCommitProxies(useProvisionalProxies), &CommitProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(span.context, key, Optional(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint))) { ++cx->transactionKeyServerLocationRequestsCompleted; - if (info.debugID.present()) - g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.After"); ASSERT(rep.results.size() == 1); auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second); @@ -2461,45 +2455,61 @@ template Future>> getKeyLocation(Database const& cx, Key const& key, F StorageServerInterface::*member, - TransactionInfo const& info, + SpanID spanID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies, Reverse isBackward = Reverse::False) { // we first check whether this range is cached auto ssi = cx->getCachedLocation(key, isBackward); if (!ssi.second) { - return getKeyLocation_internal(cx, key, info, isBackward); + return getKeyLocation_internal(cx, key, spanID, debugID, useProvisionalProxies, isBackward); } for (int i = 0; i < ssi.second->size(); i++) { if (IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, member).getEndpoint())) { cx->invalidateCache(key); ssi.second.clear(); - return getKeyLocation_internal(cx, key, info, isBackward); + return getKeyLocation_internal(cx, key, spanID, debugID, useProvisionalProxies, isBackward); } } return ssi; } -ACTOR Future>>> -getKeyRangeLocations_internal(Database cx, KeyRange keys, int limit, Reverse reverse, TransactionInfo info) { - state Span span("NAPI:getKeyRangeLocations"_loc, info.spanID); - if (info.debugID.present()) - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before"); +template +Future>> getKeyLocation(Reference trState, + Key const& key, + F StorageServerInterface::*member, + Reverse isBackward = Reverse::False) { + return getKeyLocation( + trState->cx, key, member, trState->spanID, trState->debugID, trState->useProvisionalProxies, isBackward); +} + +ACTOR Future>>> getKeyRangeLocations_internal( + Database cx, + KeyRange keys, + int limit, + Reverse reverse, + SpanID spanID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies) { + state Span span("NAPI:getKeyRangeLocations"_loc, spanID); + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before"); loop { ++cx->transactionKeyServerLocationRequests; choose { when(wait(cx->onProxiesChanged())) {} when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance( - cx->getCommitProxies(info.useProvisionalProxies), + cx->getCommitProxies(useProvisionalProxies), &CommitProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(span.context, keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint))) { ++cx->transactionKeyServerLocationRequestsCompleted; state GetKeyServerLocationsReply rep = _rep; - if (info.debugID.present()) - g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After"); + if (debugID.present()) + g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After"); ASSERT(rep.results.size()); state std::vector>> results; @@ -2532,12 +2542,15 @@ Future>>> getKeyRangeLoc int limit, Reverse reverse, F StorageServerInterface::*member, - TransactionInfo const& info) { + SpanID const& spanID, + Optional const& debugID, + UseProvisionalProxies useProvisionalProxies) { + ASSERT(!keys.empty()); std::vector>> locations; if (!cx->getCachedLocations(keys, locations, limit, reverse)) { - return getKeyRangeLocations_internal(cx, keys, limit, reverse, info); + return getKeyRangeLocations_internal(cx, keys, limit, reverse, spanID, debugID, useProvisionalProxies); } bool foundFailed = false; @@ -2557,21 +2570,38 @@ Future>>> getKeyRangeLoc } if (foundFailed) { - return getKeyRangeLocations_internal(cx, keys, limit, reverse, info); + return getKeyRangeLocations_internal(cx, keys, limit, reverse, spanID, debugID, useProvisionalProxies); } return locations; } -ACTOR Future warmRange_impl(Transaction* self, Database cx, KeyRange keys) { +template +Future>>> getKeyRangeLocations( + Reference trState, + KeyRange const& keys, + int limit, + Reverse reverse, + F StorageServerInterface::*member) { + return getKeyRangeLocations( + trState->cx, keys, limit, reverse, member, trState->spanID, trState->debugID, trState->useProvisionalProxies); +} + +ACTOR Future warmRange_impl(Reference trState, KeyRange keys) { state int totalRanges = 0; state int totalRequests = 0; loop { - std::vector>> locations = wait( - getKeyRangeLocations_internal(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, Reverse::False, self->info)); + std::vector>> locations = + wait(getKeyRangeLocations_internal(trState->cx, + keys, + CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, + Reverse::False, + trState->spanID, + trState->debugID, + trState->useProvisionalProxies)); totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT; totalRequests++; - if (locations.size() == 0 || totalRanges >= cx->locationCacheSize || + if (locations.size() == 0 || totalRanges >= trState->cx->locationCacheSize || locations[locations.size() - 1].first.end >= keys.end) break; @@ -2579,7 +2609,7 @@ ACTOR Future warmRange_impl(Transaction* self, Database cx, KeyRange keys) if (totalRequests % 20 == 0) { // To avoid blocking the proxies from starting other transactions, occasionally get a read version. - state Transaction tr(cx); + state Transaction tr(trState->cx); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -2596,32 +2626,30 @@ ACTOR Future warmRange_impl(Transaction* self, Database cx, KeyRange keys) return Void(); } -Future Transaction::warmRange(Database cx, KeyRange keys) { - return warmRange_impl(this, cx, keys); +Future Transaction::warmRange(KeyRange keys) { + return warmRange_impl(trState, keys); } -ACTOR Future> getValue(Future version, +ACTOR Future> getValue(Reference trState, Key key, - Database cx, - TransactionInfo info, - Reference trLogInfo, - TagSet tags) { + Future version, + TransactionRecordLogInfo recordLogInfo = TransactionRecordLogInfo::True) { state Version ver = wait(version); - state Span span("NAPI:getValue"_loc, info.spanID); + state Span span("NAPI:getValue"_loc, trState->spanID); span.addTag("key"_sr, key); - cx->validateVersion(ver); + trState->cx->validateVersion(ver); loop { state std::pair> ssi = - wait(getKeyLocation(cx, key, &StorageServerInterface::getValue, info)); + wait(getKeyLocation(trState, key, &StorageServerInterface::getValue)); state Optional getValueID = Optional(); state uint64_t startTime; state double startTimeD; try { - if (info.debugID.present()) { + if (trState->debugID.present()) { getValueID = nondeterministicRandom()->randomUniqueID(); - g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first()); + g_traceBatch.addAttach("GetValueAttachID", trState->debugID.get().first(), getValueID.get().first()); g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask()); @@ -2631,10 +2659,10 @@ ACTOR Future> getValue(Future version, .detail("Servers", describe(ssi.second->get()));*/ } - ++cx->getValueSubmitted; + ++trState->cx->getValueSubmitted; startTime = timer_int(); startTimeD = now(); - ++cx->transactionPhysicalReads; + ++trState->cx->transactionPhysicalReads; state GetValueReply reply; try { @@ -2643,36 +2671,40 @@ ACTOR Future> getValue(Future version, std::vector{ transaction_too_old(), future_version() }); } choose { - when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } + when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); } when(GetValueReply _reply = wait(loadBalance( - cx.getPtr(), + trState->cx.getPtr(), ssi.second, &StorageServerInterface::getValue, - GetValueRequest( - span.context, key, ver, cx->sampleReadTags() ? tags : Optional(), getValueID), + GetValueRequest(span.context, + key, + ver, + trState->cx->sampleReadTags() ? trState->options.readTags + : Optional(), + getValueID), TaskPriority::DefaultPromiseEndpoint, AtMostOnce::False, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + trState->cx->enableLocalityLoadBalance ? &trState->cx->queueModel : nullptr))) { reply = _reply; } } - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; } catch (Error&) { - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; throw; } double latency = now() - startTimeD; - cx->readLatencies.addSample(latency); - if (trLogInfo) { + trState->cx->readLatencies.addSample(latency); + if (trState->trLogInfo && recordLogInfo) { int valueSize = reply.value.present() ? reply.value.get().size() : 0; - trLogInfo->addLog( - FdbClientLogEvents::EventGet(startTimeD, cx->clientLocality.dcId(), latency, valueSize, key)); + trState->trLogInfo->addLog(FdbClientLogEvents::EventGet( + startTimeD, trState->cx->clientLocality.dcId(), latency, valueSize, key)); } - cx->getValueCompleted->latency = timer_int() - startTime; - cx->getValueCompleted->log(); + trState->cx->getValueCompleted->latency = timer_int() - startTime; + trState->cx->getValueCompleted->log(); - if (info.debugID.present()) { + if (getValueID.present()) { g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask()); @@ -2682,13 +2714,13 @@ ACTOR Future> getValue(Future version, .detail("ReplySize", reply.value.present() ? reply.value.get().size() : -1);*/ } - cx->transactionBytesRead += reply.value.present() ? reply.value.get().size() : 0; - ++cx->transactionKeysRead; + trState->cx->transactionBytesRead += reply.value.present() ? reply.value.get().size() : 0; + ++trState->cx->transactionKeysRead; return reply.value; } catch (Error& e) { - cx->getValueCompleted->latency = timer_int() - startTime; - cx->getValueCompleted->log(); - if (info.debugID.present()) { + trState->cx->getValueCompleted->latency = timer_int() - startTime; + trState->cx->getValueCompleted->log(); + if (getValueID.present()) { g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask()); @@ -2699,27 +2731,27 @@ ACTOR Future> getValue(Future version, } if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || (e.code() == error_code_transaction_too_old && ver == latestVersion)) { - cx->invalidateCache(key); - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + trState->cx->invalidateCache(key); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); } else { - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventGetError( - startTimeD, cx->clientLocality.dcId(), static_cast(e.code()), key)); + if (trState->trLogInfo && recordLogInfo) + trState->trLogInfo->addLog(FdbClientLogEvents::EventGetError( + startTimeD, trState->cx->clientLocality.dcId(), static_cast(e.code()), key)); throw e; } } } } -ACTOR Future getKey(Database cx, KeySelector k, Future version, TransactionInfo info, TagSet tags) { +ACTOR Future getKey(Reference trState, KeySelector k, Future version) { wait(success(version)); state Optional getKeyID = Optional(); - state Span span("NAPI:getKey"_loc, info.spanID); - if (info.debugID.present()) { + state Span span("NAPI:getKey"_loc, trState->spanID); + if (trState->debugID.present()) { getKeyID = nondeterministicRandom()->randomUniqueID(); - g_traceBatch.addAttach("GetKeyAttachID", info.debugID.get().first(), getKeyID.get().first()); + g_traceBatch.addAttach("GetKeyAttachID", trState->debugID.get().first(), getKeyID.get().first()); g_traceBatch.addEvent( "GetKeyDebug", getKeyID.get().first(), @@ -2738,42 +2770,45 @@ ACTOR Future getKey(Database cx, KeySelector k, Future version, Tr Key locationKey(k.getKey(), k.arena()); state std::pair> ssi = - wait(getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, Reverse{ k.isBackward() })); + wait(getKeyLocation(trState, locationKey, &StorageServerInterface::getKey, Reverse{ k.isBackward() })); try { - if (info.debugID.present()) + if (getKeyID.present()) g_traceBatch.addEvent( "GetKeyDebug", getKeyID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", // k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual); - ++cx->transactionPhysicalReads; + ++trState->cx->transactionPhysicalReads; - GetKeyRequest req( - span.context, k, version.get(), cx->sampleReadTags() ? tags : Optional(), getKeyID); + GetKeyRequest req(span.context, + k, + version.get(), + trState->cx->sampleReadTags() ? trState->options.readTags : Optional(), + getKeyID); req.arena.dependsOn(k.arena()); state GetKeyReply reply; try { choose { - when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetKeyReply _reply = - wait(loadBalance(cx.getPtr(), - ssi.second, - &StorageServerInterface::getKey, - req, - TaskPriority::DefaultPromiseEndpoint, - AtMostOnce::False, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); } + when(GetKeyReply _reply = wait(loadBalance( + trState->cx.getPtr(), + ssi.second, + &StorageServerInterface::getKey, + req, + TaskPriority::DefaultPromiseEndpoint, + AtMostOnce::False, + trState->cx->enableLocalityLoadBalance ? &trState->cx->queueModel : nullptr))) { reply = _reply; } } - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; } catch (Error&) { - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; throw; } - if (info.debugID.present()) + if (getKeyID.present()) g_traceBatch.addEvent("GetKeyDebug", getKeyID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", @@ -2783,12 +2818,12 @@ ACTOR Future getKey(Database cx, KeySelector k, Future version, Tr return k.getKey(); } } catch (Error& e) { - if (info.debugID.present()) + if (getKeyID.present()) g_traceBatch.addEvent("GetKeyDebug", getKeyID.get().first(), "NativeAPI.getKey.Error"); if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - cx->invalidateCache(k.getKey(), Reverse{ k.isBackward() }); + trState->cx->invalidateCache(k.getKey(), Reverse{ k.isBackward() }); - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); } else { TraceEvent(SevInfo, "GetKeyError").error(e).detail("AtKey", k.getKey()).detail("Offset", k.offset); throw e; @@ -2804,7 +2839,7 @@ ACTOR Future waitForCommittedVersion(Database cx, Version version, Span choose { when(wait(cx->onProxiesChanged())) {} when(GetReadVersionReply v = - wait(basicLoadBalance(cx->getGrvProxies(false), + wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies::False), &GrvProxyInterface::getConsistentReadVersion, GetReadVersionRequest(span.context, 0, TransactionPriority::IMMEDIATE), cx->taskID))) { @@ -2825,16 +2860,16 @@ ACTOR Future waitForCommittedVersion(Database cx, Version version, Span } } -ACTOR Future getRawVersion(Database cx, SpanID spanContext) { - state Span span("NAPI:getRawVersion"_loc, { spanContext }); +ACTOR Future getRawVersion(Reference trState) { + state Span span("NAPI:getRawVersion"_loc, { trState->spanID }); loop { choose { - when(wait(cx->onProxiesChanged())) {} + when(wait(trState->cx->onProxiesChanged())) {} when(GetReadVersionReply v = - wait(basicLoadBalance(cx->getGrvProxies(false), + wait(basicLoadBalance(trState->cx->getGrvProxies(UseProvisionalProxies::False), &GrvProxyInterface::getConsistentReadVersion, - GetReadVersionRequest(spanContext, 0, TransactionPriority::IMMEDIATE), - cx->taskID))) { + GetReadVersionRequest(trState->spanID, 0, TransactionPriority::IMMEDIATE), + trState->cx->taskID))) { return v.version; } } @@ -2846,52 +2881,51 @@ ACTOR Future readVersionBatcher( FutureStream, Optional>> versionStream, uint32_t flags); -ACTOR Future watchValue(Future version, - Key key, - Optional value, - Database cx, - TransactionInfo info, - TagSet tags) { - state Version ver = wait(version); - state Span span("NAPI:watchValue"_loc, info.spanID); - cx->validateVersion(ver); - ASSERT(ver != latestVersion); +ACTOR Future watchValue(Database cx, Reference parameters) { + state Span span("NAPI:watchValue"_loc, parameters->spanID); + state Version ver = parameters->version; + cx->validateVersion(parameters->version); + ASSERT(parameters->version != latestVersion); loop { state std::pair> ssi = - wait(getKeyLocation(cx, key, &StorageServerInterface::watchValue, info)); + wait(getKeyLocation(cx, + parameters->key, + &StorageServerInterface::watchValue, + parameters->spanID, + parameters->debugID, + parameters->useProvisionalProxies)); try { state Optional watchValueID = Optional(); - if (info.debugID.present()) { + if (parameters->debugID.present()) { watchValueID = nondeterministicRandom()->randomUniqueID(); - g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first()); + g_traceBatch.addAttach( + "WatchValueAttachID", parameters->debugID.get().first(), watchValueID.get().first()); g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask()); } state WatchValueReply resp; choose { - when(WatchValueReply r = - wait(loadBalance(cx.getPtr(), - ssi.second, - &StorageServerInterface::watchValue, - WatchValueRequest(span.context, - key, - value, - ver, - cx->sampleReadTags() ? tags : Optional(), - watchValueID), - TaskPriority::DefaultPromiseEndpoint))) { + when(WatchValueReply r = wait( + loadBalance(cx.getPtr(), + ssi.second, + &StorageServerInterface::watchValue, + WatchValueRequest(span.context, + parameters->key, + parameters->value, + ver, + cx->sampleReadTags() ? parameters->tags : Optional(), + watchValueID), + TaskPriority::DefaultPromiseEndpoint))) { resp = r; } when(wait(cx->connectionRecord ? cx->connectionRecord->onChange() : Never())) { wait(Never()); } } - if (info.debugID.present()) { - g_traceBatch.addEvent("WatchValueDebug", - watchValueID.get().first(), - "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask()); + if (watchValueID.present()) { + g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); } // FIXME: wait for known committed version on the storage server before replying, @@ -2899,8 +2933,6 @@ ACTOR Future watchValue(Future version, // than the current update loop) Version v = wait(waitForCommittedVersion(cx, resp.version, span.context)); - //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp.version).detail("Key", key ).detail("Value", value); - // False if there is a master failure between getting the response and getting the committed version, // Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT if (v - resp.version < 50000000) { @@ -2909,21 +2941,21 @@ ACTOR Future watchValue(Future version, ver = v; } catch (Error& e) { if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) { - cx->invalidateCache(key); - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + cx->invalidateCache(parameters->key); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, parameters->taskID)); } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { // clang-format off TEST(e.code() == error_code_watch_cancelled); // Too many watches on the storage server, poll for changes instead TEST(e.code() == error_code_process_behind); // The storage servers are all behind // clang-format on - wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID)); + wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, parameters->taskID)); } else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case // it was cancelled TEST(true); // A watch timed out - wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID)); + wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, parameters->taskID)); } else { state Error err = e; - wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID)); + wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, parameters->taskID)); throw err; } } @@ -2937,25 +2969,24 @@ ACTOR Future watchStorageServerResp(KeyRef key, Database cx) { if (!metadata.isValid()) return Void(); - Version watchVersion = wait(watchValue(Future(metadata->version), - metadata->key, - metadata->value, - cx, - metadata->info, - metadata->tags)); + Version watchVersion = wait(watchValue(cx, metadata->parameters)); metadata = cx->getWatchMetadata(key); if (!metadata.isValid()) return Void(); - if (watchVersion >= metadata->version) { // case 1: version_1 (SS) >= version_2 (map) + // case 1: version_1 (SS) >= version_2 (map) + if (watchVersion >= metadata->parameters->version) { cx->deleteWatchMetadata(key); if (metadata->watchPromise.canBeSet()) metadata->watchPromise.send(watchVersion); - } else { // ABA happens + } + // ABA happens + else { TEST(true); // ABA issue where the version returned from the server is less than the version in the map - if (metadata->watchPromise.getFutureReferenceCount() == - 1) { // case 2: version_1 < version_2 and future_count == 1 + + // case 2: version_1 < version_2 and future_count == 1 + if (metadata->watchPromise.getFutureReferenceCount() == 1) { cx->deleteWatchMetadata(key); } } @@ -2980,39 +3011,36 @@ ACTOR Future watchStorageServerResp(KeyRef key, Database cx) { } } -ACTOR Future sameVersionDiffValue(Version ver, - Key key, - Optional value, - Database cx, - TransactionInfo info, - TagSet tags) { +ACTOR Future sameVersionDiffValue(Database cx, Reference parameters) { state ReadYourWritesTransaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - state Optional valSS = wait(tr.get(key)); - Reference metadata = cx->getWatchMetadata(key.contents()); + state Optional valSS = wait(tr.get(parameters->key)); + Reference metadata = cx->getWatchMetadata(parameters->key.contents()); - if (metadata.isValid() && - valSS != metadata->value) { // val_3 != val_1 (storage server value doesnt match value in map) - cx->deleteWatchMetadata(key.contents()); + // val_3 != val_1 (storage server value doesnt match value in map) + if (metadata.isValid() && valSS != metadata->parameters->value) { + cx->deleteWatchMetadata(parameters->key.contents()); - metadata->watchPromise.send(ver); + metadata->watchPromise.send(parameters->version); metadata->watchFutureSS.cancel(); } - if (valSS == - value) { // val_3 == val_2 (storage server value matches value passed into the function -> new watch) - metadata = makeReference(key, value, ver, info, tags); + // val_3 == val_2 (storage server value matches value passed into the function -> new watch) + if (valSS == parameters->value) { + metadata = makeReference(parameters); KeyRef keyRef = cx->setWatchMetadata(metadata); metadata->watchFutureSS = watchStorageServerResp(keyRef, cx); } - if (valSS != value) - return Void(); // if val_3 != val_2 + // if val_3 != val_2 + if (valSS != parameters->value) + return Void(); - wait(success(metadata->watchPromise.getFuture())); // val_3 == val_2 + // val_3 == val_2 + wait(success(metadata->watchPromise.getFuture())); return Void(); } catch (Error& e) { @@ -3021,49 +3049,48 @@ ACTOR Future sameVersionDiffValue(Version ver, } } -Future getWatchFuture(Version ver, - Key key, - Optional value, - Database cx, - TransactionInfo info, - TagSet tags) { - Reference metadata = cx->getWatchMetadata(key.contents()); +Future getWatchFuture(Database cx, Reference parameters) { + Reference metadata = cx->getWatchMetadata(parameters->key.contents()); - if (!metadata.isValid()) { // case 1: key not in map - metadata = makeReference(key, value, ver, info, tags); + // case 1: key not in map + if (!metadata.isValid()) { + metadata = makeReference(parameters); KeyRef keyRef = cx->setWatchMetadata(metadata); metadata->watchFutureSS = watchStorageServerResp(keyRef, cx); return success(metadata->watchPromise.getFuture()); - } else if (metadata->value == value) { // case 2: val_1 == val_2 (received watch with same value as key already in - // the map so just update) - if (ver > metadata->version) { - metadata->version = ver; - metadata->info = info; - metadata->tags = tags; + } + // case 2: val_1 == val_2 (received watch with same value as key already in the map so just update) + else if (metadata->parameters->value == parameters->value) { + if (parameters->version > metadata->parameters->version) { + metadata->parameters = parameters; } return success(metadata->watchPromise.getFuture()); - } else if (ver > metadata->version) { // case 3: val_1 != val_2 && version_2 > version_1 (recived watch with - // different value and a higher version so recreate in SS) + } + // case 3: val_1 != val_2 && version_2 > version_1 (received watch with different value and a higher version so + // recreate in SS) + else if (parameters->version > metadata->parameters->version) { TEST(true); // Setting a watch that has a different value than the one in the map but a higher version (newer) - cx->deleteWatchMetadata(key.contents()); + cx->deleteWatchMetadata(parameters->key); - metadata->watchPromise.send(ver); + metadata->watchPromise.send(parameters->version); metadata->watchFutureSS.cancel(); - metadata = makeReference(key, value, ver, info, tags); + metadata = makeReference(parameters); KeyRef keyRef = cx->setWatchMetadata(metadata); metadata->watchFutureSS = watchStorageServerResp(keyRef, cx); return success(metadata->watchPromise.getFuture()); - } else if (metadata->version == ver) { // case 5: val_1 != val_2 && version_1 == version_2 (recived watch with - // different value but same version) + } + // case 5: val_1 != val_2 && version_1 == version_2 (received watch with different value but same version) + else if (metadata->parameters->version == parameters->version) { TEST(true); // Setting a watch which has a different value than the one in the map but the same version - return sameVersionDiffValue(ver, key, value, cx, info, tags); + return sameVersionDiffValue(cx, parameters); } TEST(true); // Setting a watch which has a different value than the one in the map but a lower version (older) + // case 4: val_1 != val_2 && version_2 < version_1 return Void(); } @@ -3072,10 +3099,14 @@ ACTOR Future watchValueMap(Future version, Key key, Optional value, Database cx, - TransactionInfo info, - TagSet tags) { + TagSet tags, + SpanID spanID, + TaskPriority taskID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies) { state Version ver = wait(version); - wait(getWatchFuture(ver, key, value, cx, info, tags)); + wait(getWatchFuture( + cx, makeReference(key, value, ver, tags, spanID, taskID, debugID, useProvisionalProxies))); return Void(); } @@ -3112,26 +3143,23 @@ RequestStream StorageServerInterface::*getRangeReques } ACTOR template -Future getExactRange(Database cx, +Future getExactRange(Reference trState, Version version, KeyRange keys, Key mapper, GetRangeLimits limits, - Reverse reverse, - TransactionInfo info, - TagSet tags) { + Reverse reverse) { state RangeResult output; - state Span span("NAPI:getExactRange"_loc, info.spanID); + state Span span("NAPI:getExactRange"_loc, trState->spanID); // printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); loop { state std::vector>> locations = - wait(getKeyRangeLocations(cx, + wait(getKeyRangeLocations(trState, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, - getRangeRequestStream(), - info)); + getRangeRequestStream())); ASSERT(locations.size()); state int shard = 0; loop { @@ -3153,14 +3181,14 @@ Future getExactRange(Database cx, ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); // FIXME: buggify byte limits on internal functions that use them, instead of globally - req.tags = cx->sampleReadTags() ? tags : Optional(); - req.debugID = info.debugID; + req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional(); + req.debugID = trState->debugID; try { - if (info.debugID.present()) { + if (trState->debugID.present()) { g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before"); - /*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get()) + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.getExactRange.Before"); + /*TraceEvent("TransactionDebugGetExactRangeInfo", trState->debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) .detail("ReqLimit", req.limit) @@ -3169,30 +3197,30 @@ Future getExactRange(Database cx, .detail("Reverse", reverse) .detail("Servers", locations[shard].second->description());*/ } - ++cx->transactionPhysicalReads; + ++trState->cx->transactionPhysicalReads; state GetKeyValuesFamilyReply rep; try { choose { - when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); } - when(GetKeyValuesFamilyReply _rep = - wait(loadBalance(cx.getPtr(), - locations[shard].second, - getRangeRequestStream(), - req, - TaskPriority::DefaultPromiseEndpoint, - AtMostOnce::False, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) { + when(wait(trState->cx->connectionFileChanged())) { throw transaction_too_old(); } + when(GetKeyValuesFamilyReply _rep = wait(loadBalance( + trState->cx.getPtr(), + locations[shard].second, + getRangeRequestStream(), + req, + TaskPriority::DefaultPromiseEndpoint, + AtMostOnce::False, + trState->cx->enableLocalityLoadBalance ? &trState->cx->queueModel : nullptr))) { rep = _rep; } } - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; } catch (Error&) { - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; throw; } - if (info.debugID.present()) + if (trState->debugID.present()) g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.getExactRange.After"); output.arena().dependsOn(rep.arena); output.append(output.arena(), rep.data.begin(), rep.data.size()); @@ -3274,8 +3302,8 @@ Future getExactRange(Database cx, else keys = KeyRangeRef(range.begin, keys.end); - cx->invalidateCache(keys); - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + trState->cx->invalidateCache(keys); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); break; } else { TraceEvent(SevInfo, "GetExactRangeError") @@ -3289,32 +3317,26 @@ Future getExactRange(Database cx, } } -Future resolveKey(Database const& cx, - KeySelector const& key, - Version const& version, - TransactionInfo const& info, - TagSet const& tags) { +Future resolveKey(Reference trState, KeySelector const& key, Version const& version) { if (key.isFirstGreaterOrEqual()) return Future(key.getKey()); if (key.isFirstGreaterThan()) return Future(keyAfter(key.getKey())); - return getKey(cx, key, version, info, tags); + return getKey(trState, key, version); } ACTOR template -Future getRangeFallback(Database cx, +Future getRangeFallback(Reference trState, Version version, KeySelector begin, KeySelector end, Key mapper, GetRangeLimits limits, - Reverse reverse, - TransactionInfo info, - TagSet tags) { + Reverse reverse) { if (version == latestVersion) { - state Transaction transaction(cx); + state Transaction transaction(trState->cx); transaction.setOption(FDBTransactionOptions::CAUSAL_READ_RISKY); transaction.setOption(FDBTransactionOptions::LOCK_AWARE); transaction.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); @@ -3322,8 +3344,8 @@ Future getRangeFallback(Database cx, version = ver; } - Future fb = resolveKey(cx, begin, version, info, tags); - state Future fe = resolveKey(cx, end, version, info, tags); + Future fb = resolveKey(trState, begin, version); + state Future fe = resolveKey(trState, end, version); state Key b = wait(fb); state Key e = wait(fe); @@ -3336,7 +3358,7 @@ Future getRangeFallback(Database cx, // or allKeys.begin exists in the database and will be part of the conflict range anyways RangeResult _r = wait(getExactRange( - cx, version, KeyRangeRef(b, e), mapper, limits, reverse, info, tags)); + trState, version, KeyRangeRef(b, e), mapper, limits, reverse)); RangeResult r = _r; if (b == allKeys.begin && ((reverse && !r.more) || !reverse)) @@ -3362,8 +3384,7 @@ Future getRangeFallback(Database cx, } // TODO: Client should add mapped keys to conflict ranges. -void getRangeFinished(Database cx, - Reference trLogInfo, +void getRangeFinished(Reference trState, double startTime, KeySelector begin, KeySelector end, @@ -3376,12 +3397,12 @@ void getRangeFinished(Database cx, bytes += kv.key.size() + kv.value.size(); } - cx->transactionBytesRead += bytes; - cx->transactionKeysRead += result.size(); + trState->cx->transactionBytesRead += bytes; + trState->cx->transactionKeysRead += result.size(); - if (trLogInfo) { - trLogInfo->addLog(FdbClientLogEvents::EventGetRange( - startTime, cx->clientLocality.dcId(), now() - startTime, bytes, begin.getKey(), end.getKey())); + if (trState->trLogInfo) { + trState->trLogInfo->addLog(FdbClientLogEvents::EventGetRange( + startTime, trState->cx->clientLocality.dcId(), now() - startTime, bytes, begin.getKey(), end.getKey())); } if (!snapshot) { @@ -3421,8 +3442,7 @@ void getRangeFinished(Database cx, // Sadly we need GetKeyValuesFamilyReply because cannot do something like: state // REPLY_TYPE(GetKeyValuesFamilyRequest) rep; ACTOR template -Future getRange(Database cx, - Reference trLogInfo, +Future getRange(Reference trState, Future fVersion, KeySelector begin, KeySelector end, @@ -3430,18 +3450,16 @@ Future getRange(Database cx, GetRangeLimits limits, Promise> conflictRange, Snapshot snapshot, - Reverse reverse, - TransactionInfo info, - TagSet tags) { + Reverse reverse) { state GetRangeLimits originalLimits(limits); state KeySelector originalBegin = begin; state KeySelector originalEnd = end; state RangeResult output; - state Span span("NAPI:getRange"_loc, info.spanID); + state Span span("NAPI:getRange"_loc, trState->spanID); try { state Version version = wait(fVersion); - cx->validateVersion(version); + trState->cx->validateVersion(version); state double startTime = now(); state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the @@ -3460,21 +3478,21 @@ Future getRange(Database cx, loop { if (end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual())) { getRangeFinished( - cx, trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); return output; } Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena()); Reverse locationBackward{ reverse ? (end - 1).isBackward() : begin.isBackward() }; state std::pair> beginServer = wait(getKeyLocation( - cx, locationKey, getRangeRequestStream(), info, locationBackward)); + trState, locationKey, getRangeRequestStream(), locationBackward)); state KeyRange shard = beginServer.first; state bool modifiedSelectors = false; state GetKeyValuesFamilyRequest req; req.mapper = mapper; req.arena.dependsOn(mapper.arena()); - req.isFetchKeys = (info.taskID == TaskPriority::FetchKeys); + req.isFetchKeys = (trState->taskID == TaskPriority::FetchKeys); req.version = readVersion; // In case of async tss comparison, also make req arena depend on begin, end, and/or shard's arena depending @@ -3508,13 +3526,14 @@ Future getRange(Database cx, transformRangeLimits(limits, reverse, req); ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); - req.tags = cx->sampleReadTags() ? tags : Optional(); - req.debugID = info.debugID; + req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional(); + req.debugID = trState->debugID; req.spanContext = span.context; try { - if (info.debugID.present()) { - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before"); - /*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get()) + if (trState->debugID.present()) { + g_traceBatch.addEvent( + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.getRange.Before"); + /*TraceEvent("TransactionDebugGetRangeInfo", trState->debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) .detail("OriginalBegin", originalBegin.toString()) @@ -3530,7 +3549,7 @@ Future getRange(Database cx, .detail("Servers", beginServer.second->description());*/ } - ++cx->transactionPhysicalReads; + ++trState->cx->transactionPhysicalReads; state GetKeyValuesFamilyReply rep; try { if (CLIENT_BUGGIFY_WITH_PROB(.01)) { @@ -3539,25 +3558,25 @@ Future getRange(Database cx, } // state AnnotateActor annotation(currentLineage); GetKeyValuesFamilyReply _rep = - wait(loadBalance(cx.getPtr(), + wait(loadBalance(trState->cx.getPtr(), beginServer.second, getRangeRequestStream(), req, TaskPriority::DefaultPromiseEndpoint, AtMostOnce::False, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr)); + trState->cx->enableLocalityLoadBalance ? &trState->cx->queueModel : nullptr)); rep = _rep; - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; } catch (Error&) { - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; throw; } - if (info.debugID.present()) { + if (trState->debugID.present()) { g_traceBatch.addEvent("TransactionDebug", - info.debugID.get().first(), + trState->debugID.get().first(), "NativeAPI.getRange.After"); //.detail("SizeOf", rep.data.size()); - /*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get()) + /*TraceEvent("TransactionDebugGetRangeDone", trState->debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) .detail("RepIsMore", rep.more) @@ -3600,15 +3619,8 @@ Future getRange(Database cx, output = copy; output.more = true; - getRangeFinished(cx, - trLogInfo, - startTime, - originalBegin, - originalEnd, - snapshot, - conflictRange, - reverse, - output); + getRangeFinished( + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); return output; } @@ -3618,7 +3630,7 @@ Future getRange(Database cx, } getRangeFinished( - cx, trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); return output; } @@ -3633,7 +3645,7 @@ Future getRange(Database cx, output.more = modifiedSelectors || limits.isReached() || rep.more; getRangeFinished( - cx, trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output); return output; } @@ -3645,16 +3657,9 @@ Future getRange(Database cx, if (!rep.data.size()) { RangeResult result = wait(getRangeFallback( - cx, version, originalBegin, originalEnd, mapper, originalLimits, reverse, info, tags)); - getRangeFinished(cx, - trLogInfo, - startTime, - originalBegin, - originalEnd, - snapshot, - conflictRange, - reverse, - result); + trState, version, originalBegin, originalEnd, mapper, originalLimits, reverse)); + getRangeFinished( + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result); return result; } @@ -3671,38 +3676,33 @@ Future getRange(Database cx, } } catch (Error& e) { - if (info.debugID.present()) { - g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error"); - TraceEvent("TransactionDebugError", info.debugID.get()).error(e); + if (trState->debugID.present()) { + g_traceBatch.addEvent( + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.getRange.Error"); + TraceEvent("TransactionDebugError", trState->debugID.get()).error(e); } if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || (e.code() == error_code_transaction_too_old && readVersion == latestVersion)) { - cx->invalidateCache(reverse ? end.getKey() : begin.getKey(), - Reverse{ reverse ? (end - 1).isBackward() : begin.isBackward() }); + trState->cx->invalidateCache(reverse ? end.getKey() : begin.getKey(), + Reverse{ reverse ? (end - 1).isBackward() : begin.isBackward() }); if (e.code() == error_code_wrong_shard_server) { RangeResult result = wait(getRangeFallback( - cx, version, originalBegin, originalEnd, mapper, originalLimits, reverse, info, tags)); - getRangeFinished(cx, - trLogInfo, - startTime, - originalBegin, - originalEnd, - snapshot, - conflictRange, - reverse, - result); + trState, version, originalBegin, originalEnd, mapper, originalLimits, reverse)); + getRangeFinished( + trState, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result); return result; } - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); } else { - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, - cx->clientLocality.dcId(), - static_cast(e.code()), - begin.getKey(), - end.getKey())); + if (trState->trLogInfo) + trState->trLogInfo->addLog( + FdbClientLogEvents::EventGetRangeError(startTime, + trState->cx->clientLocality.dcId(), + static_cast(e.code()), + begin.getKey(), + end.getKey())); throw e; } @@ -3886,20 +3886,17 @@ maybeDuplicateTSSStreamFragment(Request& req, QueueModel* model, RequestStream getRangeStreamFragment(ParallelStream::Fragment* results, - Database cx, - Reference trLogInfo, +ACTOR Future getRangeStreamFragment(Reference trState, + ParallelStream::Fragment* results, Version version, KeyRange keys, GetRangeLimits limits, Snapshot snapshot, Reverse reverse, - TransactionInfo info, - TagSet tags, SpanID spanContext) { loop { state std::vector>> locations = wait(getKeyRangeLocations( - cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValuesStream, info)); + trState, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValuesStream)); ASSERT(locations.size()); state int shard = 0; loop { @@ -3920,19 +3917,19 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse); // FIXME: buggify byte limits on internal functions that use them, instead of globally - req.tags = cx->sampleReadTags() ? tags : Optional(); - req.debugID = info.debugID; + req.tags = trState->cx->sampleReadTags() ? trState->options.readTags : Optional(); + req.debugID = trState->debugID; try { - if (info.debugID.present()) { + if (trState->debugID.present()) { g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.RangeStream.Before"); + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.RangeStream.Before"); } - ++cx->transactionPhysicalReads; + ++trState->cx->transactionPhysicalReads; state GetKeyValuesStreamReply rep; if (locations[shard].second->size() == 0) { - wait(cx->connectionFileChanged()); + wait(trState->cx->connectionFileChanged()); results->sendError(transaction_too_old()); return Void(); } @@ -3983,7 +3980,7 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* tssDuplicateStream = maybeDuplicateTSSStreamFragment( req, - cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr, + trState->cx->enableLocalityLoadBalance ? &trState->cx->queueModel : nullptr, &locations[shard].second->get(useIdx, &StorageServerInterface::getKeyValuesStream)); state bool breakAgain = false; @@ -3991,7 +3988,7 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* wait(results->onEmpty()); try { choose { - when(wait(cx->connectionFileChanged())) { + when(wait(trState->cx->connectionFileChanged())) { results->sendError(transaction_too_old()); if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) { tssDuplicateStream.get().stream.sendError(transaction_too_old()); @@ -4001,9 +3998,9 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* when(GetKeyValuesStreamReply _rep = waitNext(replyStream.getFuture())) { rep = _rep; } } - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; } catch (Error& e) { - ++cx->transactionPhysicalReadsCompleted; + ++trState->cx->transactionPhysicalReadsCompleted; if (e.code() == error_code_broken_promise) { if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) { tssDuplicateStream.get().stream.sendError(connection_failed()); @@ -4018,9 +4015,9 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* } rep = GetKeyValuesStreamReply(); } - if (info.debugID.present()) + if (trState->debugID.present()) g_traceBatch.addEvent( - "TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); + "TransactionDebug", trState->debugID.get().first(), "NativeAPI.getExactRange.After"); RangeResult output(RangeResultRef(rep.data, rep.more), rep.arena); if (tssDuplicateStream.present() && !tssDuplicateStream.get().done()) { @@ -4039,8 +4036,8 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* bytes += kv.key.size() + kv.value.size(); } - cx->transactionBytesRead += bytes; - cx->transactionKeysRead += output.size(); + trState->cx->transactionBytesRead += bytes; + trState->cx->transactionKeysRead += output.size(); // If the reply says there is more but we know that we finished the shard, then fix rep.more if (reverse && output.more && rep.data.size() > 0 && @@ -4136,8 +4133,8 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* else keys = KeyRangeRef(range.begin, keys.end); - cx->invalidateCache(keys); - wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); + trState->cx->invalidateCache(keys); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, trState->taskID)); break; } else { results->sendError(e); @@ -4148,7 +4145,9 @@ ACTOR Future getRangeStreamFragment(ParallelStream::Fragment* } } -ACTOR Future>> getRangeSplitPoints(Database cx, KeyRange keys, int64_t chunkSize); +ACTOR Future>> getRangeSplitPoints(Reference trState, + KeyRange keys, + int64_t chunkSize); static KeyRange intersect(KeyRangeRef lhs, KeyRangeRef rhs) { return KeyRange(KeyRangeRef(std::max(lhs.begin, rhs.begin), std::min(lhs.end, rhs.end))); @@ -4156,29 +4155,26 @@ static KeyRange intersect(KeyRangeRef lhs, KeyRangeRef rhs) { // Divides the requested key range into 1MB fragments, create range streams for each fragment, and merges the results so // the client get them in order -ACTOR Future getRangeStream(PromiseStream _results, - Database cx, - Reference trLogInfo, +ACTOR Future getRangeStream(Reference trState, + PromiseStream _results, Future fVersion, KeySelector begin, KeySelector end, GetRangeLimits limits, Promise> conflictRange, Snapshot snapshot, - Reverse reverse, - TransactionInfo info, - TagSet tags) { + Reverse reverse) { state ParallelStream results(_results, CLIENT_KNOBS->RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT); // FIXME: better handling to disable row limits ASSERT(!limits.hasRowLimit()); - state Span span("NAPI:getRangeStream"_loc, info.spanID); + state Span span("NAPI:getRangeStream"_loc, trState->spanID); state Version version = wait(fVersion); - cx->validateVersion(version); + trState->cx->validateVersion(version); - Future fb = resolveKey(cx, begin, version, info, tags); - state Future fe = resolveKey(cx, end, version, info, tags); + Future fb = resolveKey(trState, begin, version); + state Future fe = resolveKey(trState, end, version); state Key b = wait(fb); state Key e = wait(fe); @@ -4201,10 +4197,10 @@ ACTOR Future getRangeStream(PromiseStream _results, state std::vector> outstandingRequests; while (b < e) { state std::pair> ssi = - wait(getKeyLocation(cx, reverse ? e : b, &StorageServerInterface::getKeyValuesStream, info, reverse)); + wait(getKeyLocation(trState, reverse ? e : b, &StorageServerInterface::getKeyValuesStream, reverse)); state KeyRange shardIntersection = intersect(ssi.first, KeyRangeRef(b, e)); state Standalone> splitPoints = - wait(getRangeSplitPoints(cx, shardIntersection, CLIENT_KNOBS->RANGESTREAM_FRAGMENT_SIZE)); + wait(getRangeSplitPoints(trState, shardIntersection, CLIENT_KNOBS->RANGESTREAM_FRAGMENT_SIZE)); state std::vector toSend; // state std::vector::iterator>> outstandingRequests; @@ -4227,7 +4223,7 @@ ACTOR Future getRangeStream(PromiseStream _results, } ParallelStream::Fragment* fragment = wait(results.createFragment()); outstandingRequests.push_back(getRangeStreamFragment( - fragment, cx, trLogInfo, version, toSend[useIdx], limits, snapshot, reverse, info, tags, span.context)); + trState, fragment, version, toSend[useIdx], limits, snapshot, reverse, span.context)); } if (reverse) { e = shardIntersection.begin; @@ -4239,26 +4235,14 @@ ACTOR Future getRangeStream(PromiseStream _results, return Void(); } -Future getRange(Database const& cx, +Future getRange(Reference const& trState, Future const& fVersion, KeySelector const& begin, KeySelector const& end, GetRangeLimits const& limits, - Reverse const& reverse, - TransactionInfo const& info, - TagSet const& tags) { - return getRange(cx, - Reference(), - fVersion, - begin, - end, - ""_sr, - limits, - Promise>(), - Snapshot::True, - reverse, - info, - tags); + Reverse const& reverse) { + return getRange( + trState, fVersion, begin, end, ""_sr, limits, Promise>(), Snapshot::True, reverse); } bool DatabaseContext::debugUseTags = false; @@ -4266,7 +4250,7 @@ const std::vector DatabaseContext::debugTransactionTagChoices = { " "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t" }; -void debugAddTags(Transaction* tr) { +void debugAddTags(Reference trState) { int numTags = deterministicRandom()->randomInt(0, CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION + 1); for (int i = 0; i < numTags; ++i) { TransactionTag tag; @@ -4283,9 +4267,9 @@ void debugAddTags(Transaction* tr) { } if (deterministicRandom()->coinflip()) { - tr->options.readTags.addTag(tag); + trState->options.readTags.addTag(tag); } - tr->options.tags.addTag(tag); + trState->options.tags.addTag(tag); } } @@ -4307,14 +4291,17 @@ SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = Span } } -Transaction::Transaction() : info(TaskPriority::DefaultEndpoint, generateSpanID(false)) {} +Transaction::Transaction() + : trState(makeReference(TaskPriority::DefaultEndpoint, generateSpanID(false))) {} Transaction::Transaction(Database const& cx) - : info(cx->taskID, generateSpanID(cx->transactionTracingSample)), numErrors(0), options(cx), - span(info.spanID, "Transaction"_loc), trLogInfo(createTrLogInfoProbabilistically(cx)), cx(cx), - backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), tr(info.spanID) { + : trState(makeReference(cx, + cx->taskID, + generateSpanID(cx->transactionTracingSample), + createTrLogInfoProbabilistically(cx))), + span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) { if (DatabaseContext::debugUseTags) { - debugAddTags(this); + debugAddTags(trState); } } @@ -4325,33 +4312,28 @@ Transaction::~Transaction() { void Transaction::operator=(Transaction&& r) noexcept { flushTrLogsIfEnabled(); - cx = std::move(r.cx); tr = std::move(r.tr); readVersion = std::move(r.readVersion); + trState = std::move(r.trState); metadataVersion = std::move(r.metadataVersion); extraConflictRanges = std::move(r.extraConflictRanges); commitResult = std::move(r.commitResult); committing = std::move(r.committing); - options = std::move(r.options); - info = r.info; backoff = r.backoff; - numErrors = r.numErrors; - committedVersion = r.committedVersion; - versionstampPromise = std::move(r.versionstampPromise); watches = r.watches; - trLogInfo = std::move(r.trLogInfo); } void Transaction::flushTrLogsIfEnabled() { - if (trLogInfo && trLogInfo->logsAdded && trLogInfo->trLogWriter.getData()) { - ASSERT(trLogInfo->flushed == false); - cx->clientStatusUpdater.inStatusQ.push_back({ trLogInfo->identifier, std::move(trLogInfo->trLogWriter) }); - trLogInfo->flushed = true; + if (trState && trState->trLogInfo && trState->trLogInfo->logsAdded && trState->trLogInfo->trLogWriter.getData()) { + ASSERT(trState->trLogInfo->flushed == false); + trState->cx->clientStatusUpdater.inStatusQ.push_back( + { trState->trLogInfo->identifier, std::move(trState->trLogInfo->trLogWriter) }); + trState->trLogInfo->flushed = true; } } void Transaction::setVersion(Version v) { - startTime = now(); + trState->startTime = now(); if (readVersion.isValid()) throw read_version_already_set(); if (v <= 0) @@ -4360,8 +4342,8 @@ void Transaction::setVersion(Version v) { } Future> Transaction::get(const Key& key, Snapshot snapshot) { - ++cx->transactionLogicalReads; - ++cx->transactionGetValueRequests; + ++trState->cx->transactionLogicalReads; + ++trState->cx->transactionGetValueRequests; // ASSERT (key < allKeys.end); // There are no keys in the database with size greater than KEY_SIZE_LIMIT @@ -4378,39 +4360,40 @@ Future> Transaction::get(const Key& key, Snapshot snapshot) { tr.transaction.read_conflict_ranges.push_back(tr.arena, singleKeyRange(key, tr.arena)); if (key == metadataVersionKey) { - ++cx->transactionMetadataVersionReads; + ++trState->cx->transactionMetadataVersionReads; if (!ver.isReady() || metadataVersion.isSet()) { return metadataVersion.getFuture(); } else { if (ver.isError()) return ver.getError(); - if (ver.get() == cx->metadataVersionCache[cx->mvCacheInsertLocation].first) { - return cx->metadataVersionCache[cx->mvCacheInsertLocation].second; + if (ver.get() == trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) { + return trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].second; } Version v = ver.get(); - int hi = cx->mvCacheInsertLocation; - int lo = (cx->mvCacheInsertLocation + 1) % cx->metadataVersionCache.size(); + int hi = trState->cx->mvCacheInsertLocation; + int lo = (trState->cx->mvCacheInsertLocation + 1) % trState->cx->metadataVersionCache.size(); while (hi != lo) { int cu = hi > lo ? (hi + lo) / 2 - : ((hi + cx->metadataVersionCache.size() + lo) / 2) % cx->metadataVersionCache.size(); - if (v == cx->metadataVersionCache[cu].first) { - return cx->metadataVersionCache[cu].second; + : ((hi + trState->cx->metadataVersionCache.size() + lo) / 2) % + trState->cx->metadataVersionCache.size(); + if (v == trState->cx->metadataVersionCache[cu].first) { + return trState->cx->metadataVersionCache[cu].second; } if (cu == lo) { break; } - if (v < cx->metadataVersionCache[cu].first) { + if (v < trState->cx->metadataVersionCache[cu].first) { hi = cu; } else { - lo = (cu + 1) % cx->metadataVersionCache.size(); + lo = (cu + 1) % trState->cx->metadataVersionCache.size(); } } } } - return getValue(ver, key, cx, info, trLogInfo, options.readTags); + return getValue(trState, key, ver); } void Watch::setWatch(Future watchFuture) { @@ -4421,7 +4404,13 @@ void Watch::setWatch(Future watchFuture) { } // FIXME: This seems pretty horrible. Now a Database can't die until all of its watches do... -ACTOR Future watch(Reference watch, Database cx, TagSet tags, TransactionInfo info) { +ACTOR Future watch(Reference watch, + Database cx, + TagSet tags, + SpanID spanID, + TaskPriority taskID, + Optional debugID, + UseProvisionalProxies useProvisionalProxies) { try { choose { // RYOW write to value that is being watched (if applicable) @@ -4439,8 +4428,15 @@ ACTOR Future watch(Reference watch, Database cx, TagSet tags, Trans when(wait(cx->connectionFileChanged())) { TEST(true); // Recreated a watch after switch cx->clearWatchMetadata(); - watch->watchFuture = - watchValueMap(cx->minAcceptableReadVersion, watch->key, watch->value, cx, info, tags); + watch->watchFuture = watchValueMap(cx->minAcceptableReadVersion, + watch->key, + watch->value, + cx, + tags, + spanID, + taskID, + debugID, + useProvisionalProxies); } } } @@ -4456,44 +4452,39 @@ ACTOR Future watch(Reference watch, Database cx, TagSet tags, Trans } Future Transaction::getRawReadVersion() { - return ::getRawVersion(cx, info.spanID); + return ::getRawVersion(trState); } Future Transaction::watch(Reference watch) { - ++cx->transactionWatchRequests; - cx->addWatch(); + ++trState->cx->transactionWatchRequests; + trState->cx->addWatch(); watches.push_back(watch); - return ::watch(watch, cx, options.readTags, info); + return ::watch(watch, + trState->cx, + trState->options.readTags, + trState->spanID, + trState->taskID, + trState->debugID, + trState->useProvisionalProxies); } -ACTOR Future>> getAddressesForKeyActor(Key key, +ACTOR Future>> getAddressesForKeyActor(Reference trState, Future ver, - Database cx, - TransactionInfo info, - TransactionOptions options) { + Key key) { state std::vector ssi; // If key >= allKeys.end, then getRange will return a kv-pair with an empty value. This will result in our // serverInterfaces vector being empty, which will cause us to return an empty addresses list. - state Key ksKey = keyServersKey(key); - state RangeResult serverTagResult = wait(getRange(cx, + state RangeResult serverTagResult = wait(getRange(trState, ver, lastLessOrEqual(serverTagKeys.begin), firstGreaterThan(serverTagKeys.end), GetRangeLimits(CLIENT_KNOBS->TOO_MANY), - Reverse::False, - info, - options.readTags)); + Reverse::False)); ASSERT(!serverTagResult.more && serverTagResult.size() < CLIENT_KNOBS->TOO_MANY); - Future futureServerUids = getRange(cx, - ver, - lastLessOrEqual(ksKey), - firstGreaterThan(ksKey), - GetRangeLimits(1), - Reverse::False, - info, - options.readTags); + Future futureServerUids = + getRange(trState, ver, lastLessOrEqual(ksKey), firstGreaterThan(ksKey), GetRangeLimits(1), Reverse::False); RangeResult serverUids = wait(futureServerUids); ASSERT(serverUids.size()); // every shard needs to have a team @@ -4504,7 +4495,7 @@ ACTOR Future>> getAddressesForKeyActor(Key key // the move is finished, because it could be cancelled at any time. decodeKeyServersValue(serverTagResult, serverUids[0].value, src, ignore); Optional> serverInterfaces = - wait(transactionalGetServerInterfaces(ver, cx, info, src, options.readTags)); + wait(transactionalGetServerInterfaces(trState, ver, src)); ASSERT(serverInterfaces.present()); // since this is happening transactionally, /FF/keyServers and /FF/serverList // need to be consistent with one another @@ -4512,7 +4503,7 @@ ACTOR Future>> getAddressesForKeyActor(Key key Standalone> addresses; for (auto i : ssi) { - std::string ipString = options.includePort ? i.address().toString() : i.address().ip.toString(); + std::string ipString = trState->options.includePort ? i.address().toString() : i.address().ip.toString(); char* c_string = new (addresses.arena()) char[ipString.length() + 1]; strcpy(c_string, ipString.c_str()); addresses.push_back(addresses.arena(), c_string); @@ -4521,21 +4512,19 @@ ACTOR Future>> getAddressesForKeyActor(Key key } Future>> Transaction::getAddressesForKey(const Key& key) { - ++cx->transactionLogicalReads; - ++cx->transactionGetAddressesForKeyRequests; + ++trState->cx->transactionLogicalReads; + ++trState->cx->transactionGetAddressesForKeyRequests; auto ver = getReadVersion(); - return getAddressesForKeyActor(key, ver, cx, info, options); + return getAddressesForKeyActor(trState, ver, key); } -ACTOR Future getKeyAndConflictRange(Database cx, +ACTOR Future getKeyAndConflictRange(Reference trState, KeySelector k, Future version, - Promise> conflictRange, - TransactionInfo info, - TagSet tags) { + Promise> conflictRange) { try { - Key rep = wait(getKey(cx, k, version, info, tags)); + Key rep = wait(getKey(trState, k, version)); if (k.offset <= 0) conflictRange.send(std::make_pair(rep, k.orEqual ? keyAfter(k.getKey()) : Key(k.getKey(), k.arena()))); else @@ -4549,14 +4538,14 @@ ACTOR Future getKeyAndConflictRange(Database cx, } Future Transaction::getKey(const KeySelector& key, Snapshot snapshot) { - ++cx->transactionLogicalReads; - ++cx->transactionGetKeyRequests; + ++trState->cx->transactionLogicalReads; + ++trState->cx->transactionGetKeyRequests; if (snapshot) - return ::getKey(cx, key, getReadVersion(), info, options.readTags); + return ::getKey(trState, key, getReadVersion()); Promise> conflictRange; extraConflictRanges.push_back(conflictRange.getFuture()); - return getKeyAndConflictRange(cx, key, getReadVersion(), conflictRange, info, options.readTags); + return getKeyAndConflictRange(trState, key, getReadVersion(), conflictRange); } template @@ -4577,8 +4566,8 @@ Future Transaction::getRangeInternal(const KeySelector& begin, GetRangeLimits limits, Snapshot snapshot, Reverse reverse) { - ++cx->transactionLogicalReads; - increaseCounterForRequest(cx); + ++trState->cx->transactionLogicalReads; + increaseCounterForRequest(trState->cx); if (limits.isReached()) return RangeResult(); @@ -4610,18 +4599,8 @@ Future Transaction::getRangeInternal(const KeySelector& begin, extraConflictRanges.push_back(conflictRange.getFuture()); } - return ::getRange(cx, - trLogInfo, - getReadVersion(), - b, - e, - mapper, - limits, - conflictRange, - snapshot, - reverse, - info, - options.readTags); + return ::getRange( + trState, getReadVersion(), b, e, mapper, limits, conflictRange, snapshot, reverse); } Future Transaction::getRange(const KeySelector& begin, @@ -4658,8 +4637,8 @@ Future Transaction::getRangeStream(const PromiseStream& resul GetRangeLimits limits, Snapshot snapshot, Reverse reverse) { - ++cx->transactionLogicalReads; - ++cx->transactionGetRangeStreamRequests; + ++trState->cx->transactionLogicalReads; + ++trState->cx->transactionGetRangeStreamRequests; // FIXME: limits are not implemented yet, and this code has not be tested with reverse=true ASSERT(!limits.hasByteLimit() && !limits.hasRowLimit() && !reverse); @@ -4687,19 +4666,8 @@ Future Transaction::getRangeStream(const PromiseStream& resul extraConflictRanges.push_back(conflictRange.getFuture()); } - return forwardErrors(::getRangeStream(results, - cx, - trLogInfo, - getReadVersion(), - b, - e, - limits, - conflictRange, - snapshot, - reverse, - info, - options.readTags), - results); + return forwardErrors( + ::getRangeStream(trState, results, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse), results); } Future Transaction::getRangeStream(const PromiseStream& results, @@ -4751,7 +4719,7 @@ void Transaction::makeSelfConflicting() { } void Transaction::set(const KeyRef& key, const ValueRef& value, AddConflictRange addConflictRange) { - ++cx->transactionSetMutations; + ++trState->cx->transactionSetMutations; if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) throw key_too_large(); @@ -4773,7 +4741,7 @@ void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationRef::Type operationType, AddConflictRange addConflictRange) { - ++cx->transactionAtomicMutations; + ++trState->cx->transactionAtomicMutations; if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) throw key_too_large(); @@ -4801,7 +4769,7 @@ void Transaction::atomicOp(const KeyRef& key, } void Transaction::clear(const KeyRangeRef& range, AddConflictRange addConflictRange) { - ++cx->transactionClearMutations; + ++trState->cx->transactionClearMutations; auto& req = tr; auto& t = req.transaction; @@ -4833,7 +4801,7 @@ void Transaction::clear(const KeyRangeRef& range, AddConflictRange addConflictRa t.write_conflict_ranges.push_back(req.arena, r); } void Transaction::clear(const KeyRef& key, AddConflictRange addConflictRange) { - ++cx->transactionClearMutations; + ++trState->cx->transactionClearMutations; // There aren't any keys in the database with size larger than KEY_SIZE_LIMIT if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) @@ -4887,9 +4855,9 @@ double Transaction::getBackoff(int errCode) { double returnedBackoff = backoff; if (errCode == error_code_tag_throttled) { - auto priorityItr = cx->throttledTags.find(options.priority); - for (auto& tag : options.tags) { - if (priorityItr != cx->throttledTags.end()) { + auto priorityItr = trState->cx->throttledTags.find(trState->options.priority); + for (auto& tag : trState->options.tags) { + if (priorityItr != trState->cx->throttledTags.end()) { auto tagItr = priorityItr->second.find(tag); if (tagItr != priorityItr->second.end()) { TEST(true); // Returning throttle backoff @@ -4910,7 +4878,7 @@ double Transaction::getBackoff(int errCode) { if (errCode == error_code_proxy_memory_limit_exceeded) { backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, CLIENT_KNOBS->RESOURCE_CONSTRAINED_MAX_BACKOFF); } else { - backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, options.maxBackoff); + backoff = std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, trState->options.maxBackoff); } return returnedBackoff; @@ -4956,33 +4924,33 @@ void TransactionOptions::reset(Database const& cx) { } void Transaction::reset() { - tr = CommitTransactionRequest(); + tr = CommitTransactionRequest(trState->spanID); readVersion = Future(); metadataVersion = Promise>(); extraConflictRanges.clear(); - versionstampPromise = Promise>(); commitResult = Promise(); committing = Future(); - info.taskID = cx->taskID; - info.debugID = Optional(); flushTrLogsIfEnabled(); - trLogInfo = Reference(createTrLogInfoProbabilistically(cx)); + trState->versionstampPromise = Promise>(); + trState->taskID = trState->cx->taskID; + trState->debugID = Optional(); + trState->trLogInfo = Reference(createTrLogInfoProbabilistically(trState->cx)); cancelWatches(); if (apiVersionAtLeast(16)) { - options.reset(cx); + trState->options.reset(trState->cx); } } void Transaction::fullReset() { + trState->spanID = generateSpanID(trState->cx->transactionTracingSample); reset(); - info.spanID = generateSpanID(cx->transactionTracingSample); - span = Span(info.spanID, "Transaction"_loc); + span = Span(trState->spanID, "Transaction"_loc); backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; } int Transaction::apiVersionAtLeast(int minVersion) const { - return cx->apiVersionAtLeast(minVersion); + return trState->cx->apiVersionAtLeast(minVersion); } class MutationBlock { @@ -5021,17 +4989,16 @@ Optional intersects(VectorRef lhs, VectorRef(); } -ACTOR void checkWrites(Database cx, +ACTOR void checkWrites(Reference trState, Future committed, Promise outCommitted, - CommitTransactionRequest req, - Transaction* checkTr) { + CommitTransactionRequest req) { state Version version; try { wait(committed); // If the commit is successful, by definition the transaction still exists for now. Grab the version, and don't // use it again. - version = checkTr->getCommittedVersion(); + version = trState->committedVersion; outCommitted.send(Void()); } catch (Error& e) { outCommitted.sendError(e); @@ -5054,7 +5021,7 @@ ACTOR void checkWrites(Database cx, } try { - state Transaction tr(cx); + state Transaction tr(trState->cx); tr.setVersion(version); state int checkedRanges = 0; state KeyRangeMap::Ranges ranges = expectedValues.ranges(); @@ -5097,19 +5064,16 @@ ACTOR void checkWrites(Database cx, } } -ACTOR static Future commitDummyTransaction(Database cx, - KeyRange range, - TransactionInfo info, - TransactionOptions options) { - state Transaction tr(cx); +ACTOR static Future commitDummyTransaction(Reference trState, KeyRange range) { + state Transaction tr(trState->cx); state int retries = 0; - state Span span("NAPI:dummyTransaction"_loc, info.spanID); + state Span span("NAPI:dummyTransaction"_loc, trState->spanID); tr.span.addParent(span.context); loop { try { TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries); - tr.options = options; - tr.info.taskID = info.taskID; + tr.trState->options = trState->options; + tr.trState->taskID = trState->taskID; tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY); tr.setOption(FDBTransactionOptions::LOCK_AWARE); @@ -5141,8 +5105,15 @@ void Transaction::setupWatches() { Future watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion(); for (int i = 0; i < watches.size(); ++i) - watches[i]->setWatch( - watchValueMap(watchVersion, watches[i]->key, watches[i]->value, cx, info, options.readTags)); + watches[i]->setWatch(watchValueMap(watchVersion, + watches[i]->key, + watches[i]->value, + trState->cx, + trState->options.readTags, + trState->spanID, + trState->taskID, + trState->debugID, + trState->useProvisionalProxies)); watches.clear(); } catch (Error&) { @@ -5151,7 +5122,7 @@ void Transaction::setupWatches() { } } -ACTOR Future> estimateCommitCosts(Transaction* self, +ACTOR Future> estimateCommitCosts(Reference trState, CommitTransactionRef const* transaction) { state ClientTrCommitCostEstimation trCommitCosts; state KeyRangeRef keyRange; @@ -5166,20 +5137,15 @@ ACTOR Future> estimateCommitCosts(Transac } else if (it->type == MutationRef::Type::ClearRange) { trCommitCosts.opsCount++; keyRange = KeyRangeRef(it->param1, it->param2); - if (self->options.expensiveClearCostEstimation) { - StorageMetrics m = wait(self->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY)); + if (trState->options.expensiveClearCostEstimation) { + StorageMetrics m = wait(trState->cx->getStorageMetrics(keyRange, CLIENT_KNOBS->TOO_MANY)); trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(m.bytes)); trCommitCosts.writeCosts += getWriteOperationCost(m.bytes); ++trCommitCosts.expensiveCostEstCount; - ++self->getDatabase()->transactionsExpensiveClearCostEstCount; + ++trState->cx->transactionsExpensiveClearCostEstCount; } else { - std::vector>> locations = - wait(getKeyRangeLocations(self->getDatabase(), - keyRange, - CLIENT_KNOBS->TOO_MANY, - Reverse::False, - &StorageServerInterface::getShardState, - self->info)); + std::vector>> locations = wait(getKeyRangeLocations( + trState, keyRange, CLIENT_KNOBS->TOO_MANY, Reverse::False, &StorageServerInterface::getShardState)); if (locations.empty()) { continue; } @@ -5189,7 +5155,7 @@ ACTOR Future> estimateCommitCosts(Transac bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS; } else { // small clear on the boundary will hit two shards but be much smaller than the shard size bytes = CLIENT_KNOBS->INCOMPLETE_SHARD_PLUS * 2 + - (locations.size() - 2) * (int64_t)self->getDatabase()->smoothMidShardSize.smoothTotal(); + (locations.size() - 2) * (int64_t)trState->cx->smoothMidShardSize.smoothTotal(); } trCommitCosts.clearIdxCosts.emplace_back(i, getWriteOperationCost(bytes)); @@ -5199,7 +5165,7 @@ ACTOR Future> estimateCommitCosts(Transac } // sample on written bytes - if (!self->getDatabase()->sampleOnCost(trCommitCosts.writeCosts)) + if (!trState->cx->sampleOnCost(trCommitCosts.writeCosts)) return Optional(); // sample clear op: the expectation of #sampledOp is every COMMIT_SAMPLE_COST sample once @@ -5227,54 +5193,51 @@ ACTOR Future> estimateCommitCosts(Transac return trCommitCosts; } -ACTOR static Future tryCommit(Database cx, - Reference trLogInfo, +ACTOR static Future tryCommit(Reference trState, CommitTransactionRequest req, - Future readVersion, - TransactionInfo info, - Version* pCommittedVersion, - Transaction* tr, - TransactionOptions options) { + Future readVersion) { state TraceInterval interval("TransactionCommit"); state double startTime = now(); - state Span span("NAPI:tryCommit"_loc, info.spanID); - req.spanContext = span.context; - if (info.debugID.present()) - TraceEvent(interval.begin()).detail("Parent", info.debugID.get()); + state Span span("NAPI:tryCommit"_loc, trState->spanID); + state Optional debugID = trState->debugID; + if (debugID.present()) { + TraceEvent(interval.begin()).detail("Parent", debugID.get()); + } try { if (CLIENT_BUGGIFY) { throw deterministicRandom()->randomChoice(std::vector{ not_committed(), transaction_too_old(), proxy_memory_limit_exceeded(), commit_unknown_result() }); } - if (req.tagSet.present() && tr->options.priority < TransactionPriority::IMMEDIATE) { + if (req.tagSet.present() && trState->options.priority < TransactionPriority::IMMEDIATE) { wait(store(req.transaction.read_snapshot, readVersion) && - store(req.commitCostEstimation, estimateCommitCosts(tr, &req.transaction))); + store(req.commitCostEstimation, estimateCommitCosts(trState, &req.transaction))); } else { wait(store(req.transaction.read_snapshot, readVersion)); } startTime = now(); state Optional commitID = Optional(); - if (info.debugID.present()) { + + if (debugID.present()) { commitID = nondeterministicRandom()->randomUniqueID(); - g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first()); + g_traceBatch.addAttach("CommitAttachID", debugID.get().first(), commitID.get().first()); g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before"); } req.debugID = commitID; state Future reply; - if (options.commitOnFirstProxy) { - if (cx->clientInfo->get().firstCommitProxy.present()) { + if (trState->options.commitOnFirstProxy) { + if (trState->cx->clientInfo->get().firstCommitProxy.present()) { reply = throwErrorOr(brokenPromiseToMaybeDelivered( - cx->clientInfo->get().firstCommitProxy.get().commit.tryGetReply(req))); + trState->cx->clientInfo->get().firstCommitProxy.get().commit.tryGetReply(req))); } else { - const std::vector& proxies = cx->clientInfo->get().commitProxies; + const std::vector& proxies = trState->cx->clientInfo->get().commitProxies; reply = proxies.size() ? throwErrorOr(brokenPromiseToMaybeDelivered(proxies[0].commit.tryGetReply(req))) : Never(); } } else { - reply = basicLoadBalance(cx->getCommitProxies(info.useProvisionalProxies), + reply = basicLoadBalance(trState->cx->getCommitProxies(trState->useProvisionalProxies), &CommitProxyInterface::commit, req, TaskPriority::DefaultPromiseEndpoint, @@ -5282,7 +5245,7 @@ ACTOR static Future tryCommit(Database cx, } choose { - when(wait(cx->onProxiesChanged())) { + when(wait(trState->cx->onProxiesChanged())) { reply.cancel(); throw request_maybe_delivered(); } @@ -5292,43 +5255,46 @@ ACTOR static Future tryCommit(Database cx, if (CLIENT_BUGGIFY) { throw commit_unknown_result(); } - if (info.debugID.present()) + if (debugID.present()) TraceEvent(interval.end()).detail("CommittedVersion", v); - *pCommittedVersion = v; - if (v > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) { - cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1) % cx->metadataVersionCache.size(); - cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(v, ci.metadataVersion); + trState->committedVersion = v; + if (v > trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) { + trState->cx->mvCacheInsertLocation = + (trState->cx->mvCacheInsertLocation + 1) % trState->cx->metadataVersionCache.size(); + trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation] = + std::make_pair(v, ci.metadataVersion); } Standalone ret = makeString(10); placeVersionstamp(mutateString(ret), v, ci.txnBatchId); - tr->versionstampPromise.send(ret); + trState->versionstampPromise.send(ret); - tr->numErrors = 0; - ++cx->transactionsCommitCompleted; - cx->transactionCommittedMutations += req.transaction.mutations.size(); - cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize(); + trState->numErrors = 0; + ++trState->cx->transactionsCommitCompleted; + trState->cx->transactionCommittedMutations += req.transaction.mutations.size(); + trState->cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize(); - if (info.debugID.present()) + if (commitID.present()) g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After"); double latency = now() - startTime; - cx->commitLatencies.addSample(latency); - cx->latencies.addSample(now() - tr->startTime); - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventCommit_V2(startTime, - cx->clientLocality.dcId(), - latency, - req.transaction.mutations.size(), - req.transaction.mutations.expectedSize(), - ci.version, - req)); + trState->cx->commitLatencies.addSample(latency); + trState->cx->latencies.addSample(now() - trState->startTime); + if (trState->trLogInfo) + trState->trLogInfo->addLog( + FdbClientLogEvents::EventCommit_V2(startTime, + trState->cx->clientLocality.dcId(), + latency, + req.transaction.mutations.size(), + req.transaction.mutations.expectedSize(), + ci.version, + req)); return Void(); } else { // clear the RYW transaction which contains previous conflicting keys - tr->info.conflictingKeys.reset(); + trState->conflictingKeys.reset(); if (ci.conflictingKRIndices.present()) { - tr->info.conflictingKeys = + trState->conflictingKeys = std::make_shared>(conflictingKeysFalse, specialKeys.end); state Standalone> conflictingKRIndices = ci.conflictingKRIndices.get(); // drop duplicate indices and merge overlapped ranges @@ -5339,14 +5305,14 @@ ACTOR static Future tryCommit(Database cx, const KeyRangeRef kr = req.transaction.read_conflict_ranges[rCRIndex]; const KeyRange krWithPrefix = KeyRangeRef(kr.begin.withPrefix(conflictingKeysRange.begin), kr.end.withPrefix(conflictingKeysRange.begin)); - tr->info.conflictingKeys->insert(krWithPrefix, conflictingKeysTrue); + trState->conflictingKeys->insert(krWithPrefix, conflictingKeysTrue); } } - if (info.debugID.present()) + if (debugID.present()) TraceEvent(interval.end()).detail("Conflict", 1); - if (info.debugID.present()) + if (commitID.present()) g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After"); throw not_committed(); @@ -5357,7 +5323,7 @@ ACTOR static Future tryCommit(Database cx, if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) { // We don't know if the commit happened, and it might even still be in flight. - if (!options.causalWriteRisky) { + if (!trState->options.causalWriteRisky) { // Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the // version we submitted with is dead, or by committing a conflicting transaction successfully // if ( cx->getCommitProxies()->masterGeneration <= originalMasterGeneration ) @@ -5372,7 +5338,7 @@ ACTOR static Future tryCommit(Database cx, TEST(true); // Waiting for dummy transaction to report commit_unknown_result - wait(commitDummyTransaction(cx, singleKeyRange(selfConflictingRange.begin), info, tr->options)); + wait(commitDummyTransaction(trState, singleKeyRange(selfConflictingRange.begin))); } // The user needs to be informed that we aren't sure whether the commit happened. Standard retry loops @@ -5384,9 +5350,9 @@ ACTOR static Future tryCommit(Database cx, e.code() != error_code_batch_transaction_throttled && e.code() != error_code_tag_throttled) { TraceEvent(SevError, "TryCommitError").error(e); } - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventCommitError( - startTime, cx->clientLocality.dcId(), static_cast(e.code()), req)); + if (trState->trLogInfo) + trState->trLogInfo->addLog(FdbClientLogEvents::EventCommitError( + startTime, trState->cx->clientLocality.dcId(), static_cast(e.code()), req)); throw; } } @@ -5396,22 +5362,22 @@ Future Transaction::commitMutations() { try { // if this is a read-only transaction return immediately if (!tr.transaction.write_conflict_ranges.size() && !tr.transaction.mutations.size()) { - numErrors = 0; + trState->numErrors = 0; - committedVersion = invalidVersion; - versionstampPromise.sendError(no_commit_version()); + trState->committedVersion = invalidVersion; + trState->versionstampPromise.sendError(no_commit_version()); return Void(); } - ++cx->transactionsCommitStarted; + ++trState->cx->transactionsCommitStarted; - if (options.readOnly) + if (trState->options.readOnly) return transaction_read_only(); - cx->mutationsPerCommit.addSample(tr.transaction.mutations.size()); - cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize()); - if (options.tags.size()) - tr.tagSet = options.tags; + trState->cx->mutationsPerCommit.addSample(tr.transaction.mutations.size()); + trState->cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize()); + if (trState->options.tags.size()) + tr.tagSet = trState->options.tags; size_t transactionSize = getSize(); if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) { @@ -5421,7 +5387,7 @@ Future Transaction::commitMutations() { .detail("NumMutations", tr.transaction.mutations.size()) .detail("ReadConflictSize", tr.transaction.read_conflict_ranges.expectedSize()) .detail("WriteConflictSize", tr.transaction.write_conflict_ranges.expectedSize()) - .detail("DebugIdentifier", trLogInfo ? trLogInfo->identifier : ""); + .detail("DebugIdentifier", trState->trLogInfo ? trState->trLogInfo->identifier : ""); } if (!apiVersionAtLeast(300)) { @@ -5430,7 +5396,7 @@ Future Transaction::commitMutations() { // determining whether to throw transaction_too_large } - if (transactionSize > options.sizeLimit) { + if (transactionSize > trState->options.sizeLimit) { return transaction_too_large(); } @@ -5439,14 +5405,14 @@ Future Transaction::commitMutations() { GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY); // sets up readVersion field. We had no reads, so no // need for (expensive) full causal consistency. - bool isCheckingWrites = options.checkWritesEnabled && deterministicRandom()->random01() < 0.01; + bool isCheckingWrites = trState->options.checkWritesEnabled && deterministicRandom()->random01() < 0.01; for (int i = 0; i < extraConflictRanges.size(); i++) if (extraConflictRanges[i].isReady() && extraConflictRanges[i].get().first < extraConflictRanges[i].get().second) tr.transaction.read_conflict_ranges.emplace_back( tr.arena, extraConflictRanges[i].get().first, extraConflictRanges[i].get().second); - if (!options.causalWriteRisky && + if (!trState->options.causalWriteRisky && !intersects(tr.transaction.write_conflict_ranges, tr.transaction.read_conflict_ranges).present()) makeSelfConflicting(); @@ -5456,7 +5422,7 @@ Future Transaction::commitMutations() { tr.arena, tr.transaction.write_conflict_ranges.begin(), tr.transaction.write_conflict_ranges.size()); } - if (options.debugDump) { + if (trState->options.debugDump) { UID u = nondeterministicRandom()->randomUniqueID(); TraceEvent("TransactionDump", u).log(); for (auto i = tr.transaction.mutations.begin(); i != tr.transaction.mutations.end(); ++i) @@ -5466,22 +5432,21 @@ Future Transaction::commitMutations() { .detail("P2", i->param2); } - if (options.lockAware) { + if (trState->options.lockAware) { tr.flags = tr.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE; } - if (options.firstInBatch) { + if (trState->options.firstInBatch) { tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH; } - if (options.reportConflictingKeys) { + if (trState->options.reportConflictingKeys) { tr.transaction.report_conflicting_keys = true; } - Future commitResult = - tryCommit(cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options); + Future commitResult = tryCommit(trState, tr, readVersion); if (isCheckingWrites) { Promise committed; - checkWrites(cx, commitResult, committed, tr, this); + checkWrites(trState, commitResult, committed, tr); return committed.getFuture(); } return commitResult; @@ -5517,7 +5482,7 @@ ACTOR Future commitAndWatch(Transaction* self) { self->cancelWatches(e); } - self->versionstampPromise.sendError(transaction_invalid_version()); + self->trState->versionstampPromise.sendError(transaction_invalid_version()); if (!self->apiVersionAtLeast(700)) { self->reset(); @@ -5541,42 +5506,42 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optionaloptions.causalWriteRisky = true; break; case FDBTransactionOptions::CAUSAL_READ_RISKY: validateOptionValueNotPresent(value); - options.getReadVersionFlags |= GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY; + trState->options.getReadVersionFlags |= GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY; break; case FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE: validateOptionValueNotPresent(value); - options.priority = TransactionPriority::IMMEDIATE; + trState->options.priority = TransactionPriority::IMMEDIATE; break; case FDBTransactionOptions::PRIORITY_BATCH: validateOptionValueNotPresent(value); - options.priority = TransactionPriority::BATCH; + trState->options.priority = TransactionPriority::BATCH; break; case FDBTransactionOptions::CAUSAL_WRITE_RISKY: validateOptionValueNotPresent(value); - options.causalWriteRisky = true; + trState->options.causalWriteRisky = true; break; case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY: validateOptionValueNotPresent(value); - options.commitOnFirstProxy = true; + trState->options.commitOnFirstProxy = true; break; case FDBTransactionOptions::CHECK_WRITES_ENABLE: validateOptionValueNotPresent(value); - options.checkWritesEnabled = true; + trState->options.checkWritesEnabled = true; break; case FDBTransactionOptions::DEBUG_DUMP: validateOptionValueNotPresent(value); - options.debugDump = true; + trState->options.debugDump = true; break; case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE: @@ -5591,30 +5556,31 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optionalidentifier.empty()) { - trLogInfo->identifier = value.get().printable(); - } else if (trLogInfo->identifier != value.get().printable()) { + if (trState->trLogInfo) { + if (trState->trLogInfo->identifier.empty()) { + trState->trLogInfo->identifier = value.get().printable(); + } else if (trState->trLogInfo->identifier != value.get().printable()) { TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier") - .detail("PreviousIdentifier", trLogInfo->identifier) + .detail("PreviousIdentifier", trState->trLogInfo->identifier) .detail("NewIdentifier", value.get()); throw client_invalid_operation(); } } else { - trLogInfo = makeReference(value.get().printable(), TransactionLogInfo::DONT_LOG); - trLogInfo->maxFieldLength = options.maxTransactionLoggingFieldLength; + trState->trLogInfo = + makeReference(value.get().printable(), TransactionLogInfo::DONT_LOG); + trState->trLogInfo->maxFieldLength = trState->options.maxTransactionLoggingFieldLength; } - if (info.debugID.present()) { + if (trState->debugID.present()) { TraceEvent(SevInfo, "TransactionBeingTraced") - .detail("DebugTransactionID", trLogInfo->identifier) - .detail("ServerTraceID", info.debugID.get()); + .detail("DebugTransactionID", trState->trLogInfo->identifier) + .detail("ServerTraceID", trState->debugID.get()); } break; case FDBTransactionOptions::LOG_TRANSACTION: validateOptionValueNotPresent(value); - if (trLogInfo && !trLogInfo->identifier.empty()) { - trLogInfo->logTo(TransactionLogInfo::TRACE_LOG); + if (trState->trLogInfo && !trState->trLogInfo->identifier.empty()) { + trState->trLogInfo->logTo(TransactionLogInfo::TRACE_LOG); } else { TraceEvent(SevWarn, "DebugTransactionIdentifierNotSet") .detail("Error", "Debug Transaction Identifier option must be set before logging the transaction"); @@ -5629,72 +5595,72 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optionaloptions.maxTransactionLoggingFieldLength = maxFieldLength; } - if (trLogInfo) { - trLogInfo->maxFieldLength = options.maxTransactionLoggingFieldLength; + if (trState->trLogInfo) { + trState->trLogInfo->maxFieldLength = trState->options.maxTransactionLoggingFieldLength; } break; case FDBTransactionOptions::SERVER_REQUEST_TRACING: validateOptionValueNotPresent(value); debugTransaction(deterministicRandom()->randomUniqueID()); - if (trLogInfo && !trLogInfo->identifier.empty()) { + if (trState->trLogInfo && !trState->trLogInfo->identifier.empty()) { TraceEvent(SevInfo, "TransactionBeingTraced") - .detail("DebugTransactionID", trLogInfo->identifier) - .detail("ServerTraceID", info.debugID.get()); + .detail("DebugTransactionID", trState->trLogInfo->identifier) + .detail("ServerTraceID", trState->debugID.get()); } break; case FDBTransactionOptions::MAX_RETRY_DELAY: validateOptionValuePresent(value); - options.maxBackoff = extractIntOption(value, 0, std::numeric_limits::max()) / 1000.0; + trState->options.maxBackoff = extractIntOption(value, 0, std::numeric_limits::max()) / 1000.0; break; case FDBTransactionOptions::SIZE_LIMIT: validateOptionValuePresent(value); - options.sizeLimit = extractIntOption(value, 32, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); + trState->options.sizeLimit = extractIntOption(value, 32, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); break; case FDBTransactionOptions::LOCK_AWARE: validateOptionValueNotPresent(value); - options.lockAware = true; - options.readOnly = false; + trState->options.lockAware = true; + trState->options.readOnly = false; break; case FDBTransactionOptions::READ_LOCK_AWARE: validateOptionValueNotPresent(value); - if (!options.lockAware) { - options.lockAware = true; - options.readOnly = true; + if (!trState->options.lockAware) { + trState->options.lockAware = true; + trState->options.readOnly = true; } break; case FDBTransactionOptions::FIRST_IN_BATCH: validateOptionValueNotPresent(value); - options.firstInBatch = true; + trState->options.firstInBatch = true; break; case FDBTransactionOptions::USE_PROVISIONAL_PROXIES: validateOptionValueNotPresent(value); - options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES; - info.useProvisionalProxies = true; + trState->options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES; + trState->useProvisionalProxies = UseProvisionalProxies::True; break; case FDBTransactionOptions::INCLUDE_PORT_IN_ADDRESS: validateOptionValueNotPresent(value); - options.includePort = true; + trState->options.includePort = true; break; case FDBTransactionOptions::TAG: validateOptionValuePresent(value); - options.tags.addTag(value.get()); + trState->options.tags.addTag(value.get()); break; case FDBTransactionOptions::AUTO_THROTTLE_TAG: validateOptionValuePresent(value); - options.tags.addTag(value.get()); - options.readTags.addTag(value.get()); + trState->options.tags.addTag(value.get()); + trState->options.readTags.addTag(value.get()); break; case FDBTransactionOptions::SPAN_PARENT: @@ -5707,12 +5673,12 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optionaloptions.reportConflictingKeys = true; break; case FDBTransactionOptions::EXPENSIVE_CLEAR_COST_ESTIMATION_ENABLE: validateOptionValueNotPresent(value); - options.expensiveClearCostEstimation = true; + trState->options.expensiveClearCostEstimation = true; break; default: @@ -5738,11 +5704,12 @@ ACTOR Future getConsistentReadVersion(SpanID parentSpan, choose { when(wait(cx->onProxiesChanged())) {} - when(GetReadVersionReply v = wait(basicLoadBalance( - cx->getGrvProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), - &GrvProxyInterface::getConsistentReadVersion, - req, - cx->taskID))) { + when(GetReadVersionReply v = + wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies( + flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES)), + &GrvProxyInterface::getConsistentReadVersion, + req, + cx->taskID))) { if (tags.size() != 0) { auto& priorityThrottledTags = cx->throttledTags[priority]; for (auto& tag : tags) { @@ -5876,58 +5843,52 @@ ACTOR Future readVersionBatcher(DatabaseContext* cx, } } -ACTOR Future extractReadVersion(Location location, +ACTOR Future extractReadVersion(Reference trState, + Location location, SpanID spanContext, - SpanID parent, - DatabaseContext* cx, - TransactionPriority priority, - Reference trLogInfo, Future f, - LockAware lockAware, - double startTime, - Promise> metadataVersion, - TagSet tags) { - state Span span(spanContext, location, { parent }); + Promise> metadataVersion) { + state Span span(spanContext, location, { trState->spanID }); GetReadVersionReply rep = wait(f); - double latency = now() - startTime; - cx->GRVLatencies.addSample(latency); - if (trLogInfo) - trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3( - startTime, cx->clientLocality.dcId(), latency, priority, rep.version)); - if (rep.locked && !lockAware) + double latency = now() - trState->startTime; + trState->cx->GRVLatencies.addSample(latency); + if (trState->trLogInfo) + trState->trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3( + trState->startTime, trState->cx->clientLocality.dcId(), latency, trState->options.priority, rep.version)); + if (rep.locked && !trState->options.lockAware) throw database_locked(); - ++cx->transactionReadVersionsCompleted; - switch (priority) { + ++trState->cx->transactionReadVersionsCompleted; + switch (trState->options.priority) { case TransactionPriority::IMMEDIATE: - ++cx->transactionImmediateReadVersionsCompleted; + ++trState->cx->transactionImmediateReadVersionsCompleted; break; case TransactionPriority::DEFAULT: - ++cx->transactionDefaultReadVersionsCompleted; + ++trState->cx->transactionDefaultReadVersionsCompleted; break; case TransactionPriority::BATCH: - ++cx->transactionBatchReadVersionsCompleted; + ++trState->cx->transactionBatchReadVersionsCompleted; break; default: ASSERT(false); } - if (tags.size() != 0) { - auto& priorityThrottledTags = cx->throttledTags[priority]; - for (auto& tag : tags) { + if (trState->options.tags.size() != 0) { + auto& priorityThrottledTags = trState->cx->throttledTags[trState->options.priority]; + for (auto& tag : trState->options.tags) { auto itr = priorityThrottledTags.find(tag); if (itr != priorityThrottledTags.end()) { if (itr->second.expired()) { priorityThrottledTags.erase(itr); } else if (itr->second.throttleDuration() > 0) { TEST(true); // throttling transaction after getting read version - ++cx->transactionReadVersionsThrottled; + ++trState->cx->transactionReadVersionsThrottled; throw tag_throttled(); } } } - for (auto& tag : tags) { + for (auto& tag : trState->options.tags) { auto itr = priorityThrottledTags.find(tag); if (itr != priorityThrottledTags.end()) { itr->second.addReleased(1); @@ -5935,9 +5896,11 @@ ACTOR Future extractReadVersion(Location location, } } - if (rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) { - cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1) % cx->metadataVersionCache.size(); - cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(rep.version, rep.metadataVersion); + if (rep.version > trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) { + trState->cx->mvCacheInsertLocation = + (trState->cx->mvCacheInsertLocation + 1) % trState->cx->metadataVersionCache.size(); + trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation] = + std::make_pair(rep.version, rep.metadataVersion); } metadataVersion.send(rep.metadataVersion); @@ -5946,31 +5909,31 @@ ACTOR Future extractReadVersion(Location location, Future Transaction::getReadVersion(uint32_t flags) { if (!readVersion.isValid()) { - ++cx->transactionReadVersions; - flags |= options.getReadVersionFlags; - switch (options.priority) { + ++trState->cx->transactionReadVersions; + flags |= trState->options.getReadVersionFlags; + switch (trState->options.priority) { case TransactionPriority::IMMEDIATE: flags |= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE; - ++cx->transactionImmediateReadVersions; + ++trState->cx->transactionImmediateReadVersions; break; case TransactionPriority::DEFAULT: flags |= GetReadVersionRequest::PRIORITY_DEFAULT; - ++cx->transactionDefaultReadVersions; + ++trState->cx->transactionDefaultReadVersions; break; case TransactionPriority::BATCH: flags |= GetReadVersionRequest::PRIORITY_BATCH; - ++cx->transactionBatchReadVersions; + ++trState->cx->transactionBatchReadVersions; break; default: ASSERT(false); } - if (options.tags.size() != 0) { + if (trState->options.tags.size() != 0) { double maxThrottleDelay = 0.0; bool canRecheck = false; - auto& priorityThrottledTags = cx->throttledTags[options.priority]; - for (auto& tag : options.tags) { + auto& priorityThrottledTags = trState->cx->throttledTags[trState->options.priority]; + for (auto& tag : trState->options.tags) { auto itr = priorityThrottledTags.find(tag); if (itr != priorityThrottledTags.end()) { if (!itr->second.expired()) { @@ -5984,14 +5947,14 @@ Future Transaction::getReadVersion(uint32_t flags) { if (maxThrottleDelay > 0.0 && !canRecheck) { // TODO: allow delaying? TEST(true); // Throttling tag before GRV request - ++cx->transactionReadVersionsThrottled; + ++trState->cx->transactionReadVersionsThrottled; readVersion = tag_throttled(); return readVersion; } else { TEST(maxThrottleDelay > 0.0); // Rechecking throttle } - for (auto& tag : options.tags) { + for (auto& tag : trState->options.tags) { auto itr = priorityThrottledTags.find(tag); if (itr != priorityThrottledTags.end()) { itr->second.updateChecked(); @@ -5999,27 +5962,18 @@ Future Transaction::getReadVersion(uint32_t flags) { } } - auto& batcher = cx->versionBatcher[flags]; + auto& batcher = trState->cx->versionBatcher[flags]; if (!batcher.actor.isValid()) { - batcher.actor = readVersionBatcher(cx.getPtr(), batcher.stream.getFuture(), options.priority, flags); + batcher.actor = + readVersionBatcher(trState->cx.getPtr(), batcher.stream.getFuture(), trState->options.priority, flags); } Location location = "NAPI:getReadVersion"_loc; - UID spanContext = generateSpanID(cx->transactionTracingSample, info.spanID); - auto const req = DatabaseContext::VersionRequest(spanContext, options.tags, info.debugID); + UID spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanID); + auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->debugID); batcher.stream.send(req); - startTime = now(); - readVersion = extractReadVersion(location, - spanContext, - info.spanID, - cx.getPtr(), - options.priority, - trLogInfo, - req.reply.getFuture(), - LockAware{ options.lockAware }, - startTime, - metadataVersion, - options.tags); + trState->startTime = now(); + readVersion = extractReadVersion(trState, location, spanContext, req.reply.getFuture(), metadataVersion); } return readVersion; } @@ -6036,7 +5990,7 @@ Future> Transaction::getVersionstamp() { if (committing.isValid()) { return transaction_invalid_version(); } - return versionstampPromise.getFuture(); + return trState->versionstampPromise.getFuture(); } // Gets the protocol version reported by a coordinator via the protocol info interface @@ -6144,34 +6098,34 @@ Future Transaction::onError(Error const& e) { e.code() == error_code_process_behind || e.code() == error_code_batch_transaction_throttled || e.code() == error_code_tag_throttled) { if (e.code() == error_code_not_committed) - ++cx->transactionsNotCommitted; + ++trState->cx->transactionsNotCommitted; else if (e.code() == error_code_commit_unknown_result) - ++cx->transactionsMaybeCommitted; + ++trState->cx->transactionsMaybeCommitted; else if (e.code() == error_code_proxy_memory_limit_exceeded) - ++cx->transactionsResourceConstrained; + ++trState->cx->transactionsResourceConstrained; else if (e.code() == error_code_process_behind) - ++cx->transactionsProcessBehind; + ++trState->cx->transactionsProcessBehind; else if (e.code() == error_code_batch_transaction_throttled || e.code() == error_code_tag_throttled) { - ++cx->transactionsThrottled; + ++trState->cx->transactionsThrottled; } double backoff = getBackoff(e.code()); reset(); - return delay(backoff, info.taskID); + return delay(backoff, trState->taskID); } if (e.code() == error_code_transaction_too_old || e.code() == error_code_future_version) { if (e.code() == error_code_transaction_too_old) - ++cx->transactionsTooOld; + ++trState->cx->transactionsTooOld; else if (e.code() == error_code_future_version) - ++cx->transactionsFutureVersions; + ++trState->cx->transactionsFutureVersions; - double maxBackoff = options.maxBackoff; + double maxBackoff = trState->options.maxBackoff; reset(); - return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID); + return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), trState->taskID); } - if (g_network->isSimulated() && ++numErrors % 10 == 0) - TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", numErrors); + if (g_network->isSimulated() && ++trState->numErrors % 10 == 0) + TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", trState->numErrors); return e; } @@ -6207,7 +6161,9 @@ ACTOR Future getStorageMetricsLargeKeyRange(Database cx, KeyRang std::numeric_limits::max(), Reverse::False, &StorageServerInterface::waitMetrics, - TransactionInfo(TaskPriority::DataDistribution, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); state int nLocs = locations.size(); state std::vector> fx(nLocs); state StorageMetrics total; @@ -6306,7 +6262,9 @@ ACTOR Future>> getReadHotRanges(Da shardLimit, Reverse::False, &StorageServerInterface::getReadHotRanges, - TransactionInfo(TaskPriority::DataDistribution, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); try { // TODO: how to handle this? // This function is called whenever a shard becomes read-hot. But somehow the shard was splitted across more @@ -6374,7 +6332,9 @@ ACTOR Future, int>> waitStorageMetrics(Databa shardLimit, Reverse::False, &StorageServerInterface::waitMetrics, - TransactionInfo(TaskPriority::DataDistribution, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); if (expectedShardCount >= 0 && locations.size() != expectedShardCount) { return std::make_pair(Optional(), locations.size()); } @@ -6415,22 +6375,35 @@ ACTOR Future, int>> waitStorageMetrics(Databa } } -Future, int>> Transaction::waitStorageMetrics(KeyRange const& keys, - StorageMetrics const& min, - StorageMetrics const& max, - StorageMetrics const& permittedError, - int shardLimit, - int expectedShardCount) { - return ::waitStorageMetrics(cx, keys, min, max, permittedError, shardLimit, expectedShardCount); +Future, int>> DatabaseContext::waitStorageMetrics( + KeyRange const& keys, + StorageMetrics const& min, + StorageMetrics const& max, + StorageMetrics const& permittedError, + int shardLimit, + int expectedShardCount) { + return ::waitStorageMetrics(Database(Reference::addRef(this)), + keys, + min, + max, + permittedError, + shardLimit, + expectedShardCount); } -Future Transaction::getStorageMetrics(KeyRange const& keys, int shardLimit) { +Future DatabaseContext::getStorageMetrics(KeyRange const& keys, int shardLimit) { if (shardLimit > 0) { StorageMetrics m; m.bytes = -1; - return extractMetrics(::waitStorageMetrics(cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit, -1)); + return extractMetrics(::waitStorageMetrics(Database(Reference::addRef(this)), + keys, + StorageMetrics(), + m, + StorageMetrics(), + shardLimit, + -1)); } else { - return ::getStorageMetricsLargeKeyRange(cx, keys); + return ::getStorageMetricsLargeKeyRange(Database(Reference::addRef(this)), keys); } } @@ -6441,7 +6414,7 @@ ACTOR Future>> waitDataDistributionMetricsLis choose { when(wait(cx->onProxiesChanged())) {} when(ErrorOr rep = - wait(errorOr(basicLoadBalance(cx->getCommitProxies(false), + wait(errorOr(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), &CommitProxyInterface::getDDMetrics, GetDDMetricsRequest(keys, shardLimit))))) { if (rep.isError()) { @@ -6453,20 +6426,17 @@ ACTOR Future>> waitDataDistributionMetricsLis } } -Future>> Transaction::getReadHotRanges(KeyRange const& keys) { - return ::getReadHotRanges(cx, keys); +Future>> DatabaseContext::getReadHotRanges(KeyRange const& keys) { + return ::getReadHotRanges(Database(Reference::addRef(this)), keys); } -ACTOR Future>> getRangeSplitPoints(Database cx, KeyRange keys, int64_t chunkSize) { - state Span span("NAPI:GetRangeSplitPoints"_loc); +ACTOR Future>> getRangeSplitPoints(Reference trState, + KeyRange keys, + int64_t chunkSize) { + state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanID); loop { - state std::vector>> locations = - wait(getKeyRangeLocations(cx, - keys, - CLIENT_KNOBS->TOO_MANY, - Reverse::False, - &StorageServerInterface::getRangeSplitPoints, - TransactionInfo(TaskPriority::DataDistribution, span.context))); + state std::vector>> locations = wait(getKeyRangeLocations( + trState, keys, CLIENT_KNOBS->TOO_MANY, Reverse::False, &StorageServerInterface::getRangeSplitPoints)); try { state int nLocs = locations.size(); state std::vector> fReplies(nLocs); @@ -6505,14 +6475,14 @@ ACTOR Future>> getRangeSplitPoints(Database cx, Key TraceEvent(SevError, "GetRangeSplitPoints").error(e); throw; } - cx->invalidateCache(keys); + trState->cx->invalidateCache(keys); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution)); } } } Future>> Transaction::getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize) { - return ::getRangeSplitPoints(cx, keys, chunkSize); + return ::getRangeSplitPoints(trState, keys, chunkSize); } #define BG_REQUEST_DEBUG false @@ -6558,7 +6528,6 @@ struct BWLocationInfo : MultiInterface> }; ACTOR Future>> readBlobGranulesActor( - Database cx, Transaction* self, KeyRange range, Version begin, @@ -6631,13 +6600,13 @@ ACTOR Future>> readBlobGranulesActor( workerId.toString().c_str()); } - if (!cx->blobWorker_interf.count(workerId)) { + if (!self->trState->cx->blobWorker_interf.count(workerId)) { Optional workerInterface = wait(self->get(blobWorkerListKeyFor(workerId))); if (!workerInterface.present()) { throw wrong_shard_server(); } // FIXME: maybe just want to insert here if there are racing queries for the same worker or something? - cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get()); + self->trState->cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get()); if (BG_REQUEST_DEBUG) { printf(" decoded worker interface for %s\n", workerId.toString().c_str()); } @@ -6667,7 +6636,8 @@ ACTOR Future>> readBlobGranulesActor( req.readVersion = *readVersionOut; std::vector>> v; - v.push_back(makeReference>(cx->blobWorker_interf[workerId])); + v.push_back( + makeReference>(self->trState->cx->blobWorker_interf[workerId])); state Reference>> location = makeReference(v); // use load balance with one option for now for retry and error handling @@ -6716,7 +6686,7 @@ Future>> Transaction::readBlobGranules Version begin, Optional readVersion, Version* readVersionOut) { - return readBlobGranulesActor(cx, this, range, begin, readVersion, readVersionOut); + return readBlobGranulesActor(this, range, begin, readVersion, readVersionOut); } ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) { @@ -6750,7 +6720,9 @@ ACTOR Future>> splitStorageMetrics(Database cx, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, Reverse::False, &StorageServerInterface::splitMetrics, - TransactionInfo(TaskPriority::DataDistribution, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); state StorageMetrics used; state Standalone> results; @@ -6805,14 +6777,14 @@ ACTOR Future>> splitStorageMetrics(Database cx, } } -Future>> Transaction::splitStorageMetrics(KeyRange const& keys, - StorageMetrics const& limit, - StorageMetrics const& estimated) { - return ::splitStorageMetrics(cx, keys, limit, estimated); +Future>> DatabaseContext::splitStorageMetrics(KeyRange const& keys, + StorageMetrics const& limit, + StorageMetrics const& estimated) { + return ::splitStorageMetrics(Database(Reference::addRef(this)), keys, limit, estimated); } void Transaction::checkDeferredError() const { - cx->checkDeferredError(); + trState->cx->checkDeferredError(); } Reference Transaction::createTrLogInfoProbabilistically(const Database& cx) { @@ -6831,12 +6803,12 @@ Reference Transaction::createTrLogInfoProbabilistically(cons void Transaction::setTransactionID(uint64_t id) { ASSERT(getSize() == 0); - info.spanID = SpanID(id, info.spanID.second()); + trState->spanID = SpanID(id, trState->spanID.second()); } void Transaction::setToken(uint64_t token) { ASSERT(getSize() == 0); - info.spanID = SpanID(info.spanID.first(), token); + trState->spanID = SpanID(trState->spanID.first(), token); } void enableClientInfoLogging() { @@ -6851,7 +6823,7 @@ ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID sn loop { choose { when(wait(cx->onProxiesChanged())) {} - when(wait(basicLoadBalance(cx->getCommitProxies(false), + when(wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), &CommitProxyInterface::proxySnapReq, ProxySnapRequest(snapCmd, snapUID, snapUID), cx->taskID, @@ -6878,7 +6850,7 @@ ACTOR Future checkSafeExclusions(Database cx, std::vectoronProxiesChanged())) {} when(ExclusionSafetyCheckReply _ddCheck = - wait(basicLoadBalance(cx->getCommitProxies(false), + wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), &CommitProxyInterface::exclusionSafetyCheckReq, req, cx->taskID))) { @@ -7330,7 +7302,9 @@ ACTOR Future getChangeFeedStreamActor(Reference db, CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT, Reverse::False, &StorageServerInterface::changeFeedStream, - TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) { ASSERT_WE_THINK(false); @@ -7511,7 +7485,9 @@ ACTOR Future> getOverlappingChangeFeedsA CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT, Reverse::False, &StorageServerInterface::overlappingChangeFeeds, - TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) { TraceEvent(SevError, "OverlappingRangeTooLarge") @@ -7589,7 +7565,9 @@ ACTOR Future popChangeFeedMutationsActor(Reference db, Ke 3, Reverse::False, &StorageServerInterface::changeFeedPop, - TransactionInfo(TaskPriority::DefaultEndpoint, span.context))); + span.context, + Optional(), + UseProvisionalProxies::False)); if (locations.size() > 2) { wait(popChangeFeedBackup(cx, rangeID, version)); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index c713d5744b..2c2b6e6ed7 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -46,6 +46,8 @@ (getSBVar(__FILE__, __LINE__, BuggifyType::Client) && deterministicRandom()->random01() < (x)) #define CLIENT_BUGGIFY CLIENT_BUGGIFY_WITH_PROB(P_BUGGIFIED_SECTION_FIRES[int(BuggifyType::Client)]) +FDB_DECLARE_BOOLEAN_PARAM(UseProvisionalProxies); + // Incomplete types that are reference counted class DatabaseContext; template <> @@ -173,22 +175,6 @@ private: }; class ReadYourWritesTransaction; // workaround cyclic dependency -struct TransactionInfo { - Optional debugID; - TaskPriority taskID; - SpanID spanID; - bool useProvisionalProxies; - // Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled - // prefix/ : '1' - any keys equal or larger than this key are (probably) conflicting keys - // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys - std::shared_ptr> conflictingKeys; - - // Only available so that Transaction can have a default constructor, for use in state variables - TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {} - - explicit TransactionInfo(TaskPriority taskID, SpanID spanID) - : taskID(taskID), spanID(spanID), useProvisionalProxies(false) {} -}; struct TransactionLogInfo : public ReferenceCounted, NonCopyable { enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 }; @@ -246,6 +232,34 @@ struct Watch : public ReferenceCounted, NonCopyable { void setWatch(Future watchFuture); }; +struct TransactionState : ReferenceCounted { + Database cx; + Reference trLogInfo; + TransactionOptions options; + + Optional debugID; + TaskPriority taskID; + SpanID spanID; + UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False; + + int numErrors = 0; + double startTime = 0; + Promise> versionstampPromise; + + Version committedVersion{ invalidVersion }; + + // Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled + // prefix/ : '1' - any keys equal or larger than this key are (probably) conflicting keys + // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys + std::shared_ptr> conflictingKeys; + + // Only available so that Transaction can have a default constructor, for use in state variables + TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID) {} + + TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference trLogInfo) + : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID) {} +}; + class Transaction : NonCopyable { public: explicit Transaction(Database const& cx); @@ -298,16 +312,6 @@ public: Snapshot = Snapshot::False, Reverse = Reverse::False); -private: - template - Future getRangeInternal(const KeySelector& begin, - const KeySelector& end, - const Key& mapper, - GetRangeLimits limits, - Snapshot snapshot, - Reverse reverse); - -public: // A method for streaming data from the storage server that is more efficient than getRange when reading large // amounts of data [[nodiscard]] Future getRangeStream(const PromiseStream>& results, @@ -354,20 +358,7 @@ public: void addWriteConflictRange(KeyRangeRef const& keys); void makeSelfConflicting(); - Future warmRange(Database cx, KeyRange keys); - - Future, int>> waitStorageMetrics(KeyRange const& keys, - StorageMetrics const& min, - StorageMetrics const& max, - StorageMetrics const& permittedError, - int shardLimit, - int expectedShardCount); - // Pass a negative value for `shardLimit` to indicate no limit on the shard number. - Future getStorageMetrics(KeyRange const& keys, int shardLimit); - Future>> splitStorageMetrics(KeyRange const& keys, - StorageMetrics const& limit, - StorageMetrics const& estimated); - Future>> getReadHotRanges(KeyRange const& keys); + Future warmRange(KeyRange keys); // Try to split the given range into equally sized chunks based on estimated size. // The returned list would still be in form of [keys.begin, splitPoint1, splitPoint2, ... , keys.end] @@ -387,20 +378,20 @@ public: AddConflictRange = AddConflictRange::True); void clear(const KeyRangeRef& range, AddConflictRange = AddConflictRange::True); void clear(const KeyRef& key, AddConflictRange = AddConflictRange::True); - [[nodiscard]] Future commit(); // Throws not_committed or commit_unknown_result errors in normal operation + + // Throws not_committed or commit_unknown_result errors in normal operation + [[nodiscard]] Future commit(); void setOption(FDBTransactionOptions::Option option, Optional value = Optional()); - Version getCommittedVersion() const { - return committedVersion; - } // May be called only after commit() returns success - [[nodiscard]] Future> - getVersionstamp(); // Will be fulfilled only after commit() returns success + // May be called only after commit() returns success + Version getCommittedVersion() const { return trState->committedVersion; } + + // Will be fulfilled only after commit() returns success + [[nodiscard]] Future> getVersionstamp(); Future getProtocolVersion(); - Promise> versionstampPromise; - uint32_t getSize(); [[nodiscard]] Future onError(Error const& e); void flushTrLogsIfEnabled(); @@ -412,27 +403,17 @@ public: void reset(); void fullReset(); double getBackoff(int errCode); - void debugTransaction(UID dID) { info.debugID = dID; } + void debugTransaction(UID dID) { trState->debugID = dID; } Future commitMutations(); void setupWatches(); void cancelWatches(Error const& e = transaction_cancelled()); - TransactionInfo info; - int numErrors; - - std::vector> watches; - int apiVersionAtLeast(int minVersion) const; - void checkDeferredError() const; - Database getDatabase() const { return cx; } + Database getDatabase() const { return trState->cx; } static Reference createTrLogInfoProbabilistically(const Database& cx); - TransactionOptions options; - Span span; - double startTime; - Reference trLogInfo; void setTransactionID(uint64_t id); void setToken(uint64_t token); @@ -445,12 +426,22 @@ public: return Standalone>(tr.transaction.write_conflict_ranges, tr.arena); } + Reference trState; + std::vector> watches; + Span span; + private: Future getReadVersion(uint32_t flags); - Database cx; + + template + Future getRangeInternal(const KeySelector& begin, + const KeySelector& end, + const Key& mapper, + GetRangeLimits limits, + Snapshot snapshot, + Reverse reverse); double backoff; - Version committedVersion{ invalidVersion }; CommitTransactionRequest tr; Future readVersion; Promise> metadataVersion; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 7b37bb9fa0..fdc1d5c771 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1653,7 +1653,7 @@ Future ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyR if (resetPromise.isSet()) return resetPromise.getFuture().getError(); - return map(waitOrError(tr.getStorageMetrics(keys, -1), resetPromise.getFuture()), + return map(waitOrError(tr.getDatabase()->getStorageMetrics(keys, -1), resetPromise.getFuture()), [](const StorageMetrics& m) { return m.bytes; }); } @@ -1911,7 +1911,7 @@ void ReadYourWritesTransaction::setToken(uint64_t token) { RangeResult ReadYourWritesTransaction::getReadConflictRangeIntersecting(KeyRangeRef kr) { TEST(true); // Special keys read conflict range ASSERT(readConflictRangeKeysRange.contains(kr)); - ASSERT(!tr.options.checkWritesEnabled); + ASSERT(!tr.trState->options.checkWritesEnabled); RangeResult result; if (!options.readYourWritesDisabled) { kr = kr.removePrefix(readConflictRangeKeysRange.begin); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index e422ba5e0b..b8ccd23e54 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -168,7 +168,7 @@ public: Database getDatabase() const { return tr.getDatabase(); } - const TransactionInfo& getTransactionInfo() const { return tr.info; } + Reference getTransactionState() const { return tr.trState; } void setTransactionID(uint64_t id); void setToken(uint64_t token); diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index cd10cd9304..d6cd61f5b8 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -653,8 +653,8 @@ ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeReadIm Future ConflictingKeysImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { RangeResult result; - if (ryw->getTransactionInfo().conflictingKeys) { - auto krMapPtr = ryw->getTransactionInfo().conflictingKeys.get(); + if (ryw->getTransactionState()->conflictingKeys) { + auto krMapPtr = ryw->getTransactionState()->conflictingKeys.get(); auto beginIter = krMapPtr->rangeContaining(kr.begin); if (beginIter->begin() != kr.begin) ++beginIter; @@ -1539,10 +1539,10 @@ Future TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, if (key.endsWith(kTracingTransactionIdKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first()))); + KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first()))); } else if (key.endsWith(kTracingTokenKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second()))); + KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second()))); } } return result; diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index c8045bb0b5..5e311be449 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -434,10 +434,11 @@ struct BackupData { GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION); choose { when(wait(self->cx->onProxiesChanged())) {} - when(GetReadVersionReply reply = wait(basicLoadBalance(self->cx->getGrvProxies(false), - &GrvProxyInterface::getConsistentReadVersion, - request, - self->cx->taskID))) { + when(GetReadVersionReply reply = + wait(basicLoadBalance(self->cx->getGrvProxies(UseProvisionalProxies::False), + &GrvProxyInterface::getConsistentReadVersion, + request, + self->cx->taskID))) { return reply.version; } } diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 20aabeb6c1..f03b23e502 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -25,6 +25,7 @@ #include "fdbclient/BlobGranuleCommon.h" #include "fdbclient/BlobWorkerInterface.h" #include "fdbclient/KeyRangeMap.h" +#include "fdbclient/DatabaseContext.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" #include "fdbserver/BlobManagerInterface.h" @@ -233,7 +234,8 @@ ACTOR Future>> splitRange(ReferencegetTransaction().getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY)); + StorageMetrics estimated = + wait(tr->getTransaction().getDatabase()->getStorageMetrics(range, CLIENT_KNOBS->TOO_MANY)); if (BM_DEBUG) { fmt::print("Estimated bytes for [{0} - {1}): {2}\n", @@ -252,7 +254,7 @@ ACTOR Future>> splitRange(Reference> keys = - wait(tr->getTransaction().splitStorageMetrics(range, splitMetrics, estimated)); + wait(tr->getTransaction().getDatabase()->splitStorageMetrics(range, splitMetrics, estimated)); return keys; } else { // printf(" Not splitting range\n"); diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index 08e388f6f3..11b8cdbf6f 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -274,12 +274,12 @@ ACTOR Future trackShardMetrics(DataDistributionTracker::SafeAccessor self, Transaction tr(self()->cx); // metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range std::pair, int> metrics = - wait(tr.waitStorageMetrics(keys, - bounds.min, - bounds.max, - bounds.permittedError, - CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, - shardCount)); + wait(self()->cx->waitStorageMetrics(keys, + bounds.min, + bounds.max, + bounds.permittedError, + CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, + shardCount)); if (metrics.first.present()) { BandwidthStatus newBandwidthStatus = getBandwidthStatus(metrics.first.get()); if (newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) { @@ -336,7 +336,8 @@ ACTOR Future readHotDetector(DataDistributionTracker* self) { state Transaction tr(self->cx); loop { try { - Standalone> readHotRanges = wait(tr.getReadHotRanges(keys)); + Standalone> readHotRanges = + wait(self->cx->getReadHotRanges(keys)); for (const auto& keyRange : readHotRanges) { TraceEvent("ReadHotRangeLog") .detail("ReadDensity", keyRange.density) @@ -378,7 +379,8 @@ ACTOR Future>> getSplitKeys(DataDistributionTracker loop { state Transaction tr(self->cx); try { - Standalone> keys = wait(tr.splitStorageMetrics(splitRange, splitMetrics, estimated)); + Standalone> keys = + wait(self->cx->splitStorageMetrics(splitRange, splitMetrics, estimated)); return keys; } catch (Error& e) { wait(tr.onError(e)); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index a7b1899152..006e61d581 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -400,7 +400,7 @@ ACTOR static Future startMoveKeys(Database occ, // Keep track of shards for all src servers so that we can preserve their values in serverKeys state Map> shardMap; - tr->getTransaction().info.taskID = TaskPriority::MoveKeys; + tr->getTransaction().trState->taskID = TaskPriority::MoveKeys; tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); @@ -603,7 +603,7 @@ ACTOR Future checkFetchingState(Database cx, if (BUGGIFY) wait(delay(5)); - tr.info.taskID = TaskPriority::MoveKeys; + tr.trState->taskID = TaskPriority::MoveKeys; tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); std::vector>> serverListEntries; @@ -696,7 +696,7 @@ ACTOR static Future finishMoveKeys(Database occ, loop { try { - tr.info.taskID = TaskPriority::MoveKeys; + tr.trState->taskID = TaskPriority::MoveKeys; tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); releaser.release(); @@ -1317,7 +1317,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, state Transaction tr(cx); loop { try { - tr.info.taskID = TaskPriority::MoveKeys; + tr.trState->taskID = TaskPriority::MoveKeys; tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); wait(checkMoveKeysLock(&tr, lock, ddEnabledState)); TraceEvent("RemoveKeysFromFailedServerLocked") diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 3b255f39ba..12dfb9ec25 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1189,7 +1189,7 @@ ACTOR Future tryFetchRange(Database cx, ASSERT(!cx->switchable); tr.setVersion(version); - tr.info.taskID = TaskPriority::FetchKeys; + tr.trState->taskID = TaskPriority::FetchKeys; limits.minRows = 0; try { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 3dc7e90920..00680f2ed1 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -2104,7 +2104,7 @@ ACTOR Future> quickGetValue(StorageServer* data, state Transaction tr(data->cx); tr.setVersion(version); // TODO: is DefaultPromiseEndpoint the best priority for this? - tr.info.taskID = TaskPriority::DefaultPromiseEndpoint; + tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint; Future> valueFuture = tr.get(key, Snapshot::True); // TODO: async in case it needs to read from other servers. state Optional valueOption = wait(valueFuture); @@ -2635,7 +2635,7 @@ ACTOR Future quickGetKeyValues(StorageServer* data, state Transaction tr(data->cx); tr.setVersion(version); // TODO: is DefaultPromiseEndpoint the best priority for this? - tr.info.taskID = TaskPriority::DefaultPromiseEndpoint; + tr.trState->taskID = TaskPriority::DefaultPromiseEndpoint; Future rangeResultFuture = tr.getRange(prefixRange(prefix), Snapshot::True); // TODO: async in case it needs to read from other servers. RangeResult rangeResult = wait(rangeResultFuture); @@ -4143,7 +4143,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front(); tr.setVersion(fetchVersion); - tr.info.taskID = TaskPriority::FetchKeys; + tr.trState->taskID = TaskPriority::FetchKeys; state PromiseStream results; state Future hold = SERVER_KNOBS->FETCH_USING_STREAMING ? tr.getRangeStream(results, keys, GetRangeLimits(), Snapshot::True) diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index af1cc183f5..a9961f5582 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -877,7 +877,8 @@ struct ConsistencyCheckWorkload : TestWorkload { state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc); while (begin < end) { - state Reference commitProxyInfo = wait(cx->getCommitProxiesFuture(false)); + state Reference commitProxyInfo = + wait(cx->getCommitProxiesFuture(UseProvisionalProxies::False)); keyServerLocationFutures.clear(); for (int i = 0; i < commitProxyInfo->size(); i++) keyServerLocationFutures.push_back( @@ -1124,7 +1125,7 @@ struct ConsistencyCheckWorkload : TestWorkload { loop { try { StorageMetrics metrics = - wait(tr.getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000)); + wait(tr.getDatabase()->getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000)); return metrics.bytes; } catch (Error& e) { wait(tr.onError(e)); diff --git a/fdbserver/workloads/IndexScan.actor.cpp b/fdbserver/workloads/IndexScan.actor.cpp index 96579e2eb0..774dc79c3f 100644 --- a/fdbserver/workloads/IndexScan.actor.cpp +++ b/fdbserver/workloads/IndexScan.actor.cpp @@ -76,7 +76,7 @@ struct IndexScanWorkload : KVWorkload { loop { state Transaction tr(cx); try { - wait(tr.warmRange(cx, allKeys)); + wait(tr.warmRange(allKeys)); break; } catch (Error& e) { wait(tr.onError(e)); diff --git a/fdbserver/workloads/ReadHotDetection.actor.cpp b/fdbserver/workloads/ReadHotDetection.actor.cpp index 82a5326def..2bba376132 100644 --- a/fdbserver/workloads/ReadHotDetection.actor.cpp +++ b/fdbserver/workloads/ReadHotDetection.actor.cpp @@ -98,11 +98,11 @@ struct ReadHotDetectionWorkload : TestWorkload { loop { state Transaction tr(cx); try { - StorageMetrics sm = wait(tr.getStorageMetrics(self->wholeRange, 100)); + StorageMetrics sm = wait(cx->getStorageMetrics(self->wholeRange, 100)); // TraceEvent("RHDCheckPhaseLog") // .detail("KeyRangeSize", sm.bytes) // .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond); - Standalone> keyRanges = wait(tr.getReadHotRanges(self->wholeRange)); + Standalone> keyRanges = wait(cx->getReadHotRanges(self->wholeRange)); // TraceEvent("RHDCheckPhaseLog") // .detail("KeyRangesSize", keyRanges.size()) // .detail("ReadKey", self->readKey.printable().c_str()) diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index 7913c3b58b..4eae887c77 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -592,7 +592,7 @@ struct ReadWriteWorkload : KVWorkload { try { self->setupTransaction(&tr); wait(self->readOp(&tr, keys, self, false)); - wait(tr.warmRange(cx, allKeys)); + wait(tr.warmRange(allKeys)); break; } catch (Error& e) { wait(tr.onError(e)); diff --git a/fdbserver/workloads/TargetedKill.actor.cpp b/fdbserver/workloads/TargetedKill.actor.cpp index 3d58f82a93..e1ce5bc1e9 100644 --- a/fdbserver/workloads/TargetedKill.actor.cpp +++ b/fdbserver/workloads/TargetedKill.actor.cpp @@ -105,7 +105,7 @@ struct TargetedKillWorkload : TestWorkload { if (self->machineToKill == "master") { machine = self->dbInfo->get().master.address(); } else if (self->machineToKill == "commitproxy") { - auto commitProxies = cx->getCommitProxies(false); + auto commitProxies = cx->getCommitProxies(UseProvisionalProxies::False); int o = deterministicRandom()->randomInt(0, commitProxies->size()); for (int i = 0; i < commitProxies->size(); i++) { CommitProxyInterface mpi = commitProxies->getInterface(o); @@ -115,7 +115,7 @@ struct TargetedKillWorkload : TestWorkload { o = ++o % commitProxies->size(); } } else if (self->machineToKill == "grvproxy") { - auto grvProxies = cx->getGrvProxies(false); + auto grvProxies = cx->getGrvProxies(UseProvisionalProxies::False); int o = deterministicRandom()->randomInt(0, grvProxies->size()); for (int i = 0; i < grvProxies->size(); i++) { GrvProxyInterface gpi = grvProxies->getInterface(o); diff --git a/fdbserver/workloads/WriteTagThrottling.actor.cpp b/fdbserver/workloads/WriteTagThrottling.actor.cpp index 59decaf050..1675634790 100644 --- a/fdbserver/workloads/WriteTagThrottling.actor.cpp +++ b/fdbserver/workloads/WriteTagThrottling.actor.cpp @@ -236,8 +236,8 @@ struct WriteTagThrottlingWorkload : KVWorkload { // give tag to client if (self->writeThrottle) { ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION >= MIN_TAGS_PER_TRANSACTION); - tr.options.tags.clear(); - tr.options.readTags.clear(); + tr.trState->options.tags.clear(); + tr.trState->options.readTags.clear(); if (isBadActor) { tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, self->badTag); } else if (deterministicRandom()->coinflip()) {