diff --git a/fdbclient/CommitProxyInterface.h b/fdbclient/CommitProxyInterface.h index 7e670ed08a..5556579992 100644 --- a/fdbclient/CommitProxyInterface.h +++ b/fdbclient/CommitProxyInterface.h @@ -198,8 +198,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply { bool locked; Optional metadataVersion; int64_t midShardSize = 0; - uint32_t queueIterations = 0; - bool rkThrottled = false; + bool rkDefaultThrottled = false; + bool rkBatchThrottled = false; TransactionTagMap tagThrottleInfo; @@ -214,8 +214,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply { metadataVersion, tagThrottleInfo, midShardSize, - queueIterations, - rkThrottled); + rkDefaultThrottled, + rkBatchThrottled); } }; @@ -238,22 +238,21 @@ struct GetReadVersionRequest : TimedRequest { uint32_t transactionCount; uint32_t flags; TransactionPriority priority; - uint32_t queueIterations; TransactionTagMap tags; Optional debugID; ReplyPromise reply; - GetReadVersionRequest() : transactionCount(1), flags(0), queueIterations(0) {} + GetReadVersionRequest() : transactionCount(1), flags(0) {} GetReadVersionRequest(SpanID spanContext, uint32_t transactionCount, TransactionPriority priority, uint32_t flags = 0, TransactionTagMap tags = TransactionTagMap(), Optional debugID = Optional()) - : spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority), - queueIterations(0), tags(tags), debugID(debugID) { + : spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority), tags(tags), + debugID(debugID) { flags = flags & ~FLAG_PRIORITY_MASK; switch (priority) { case TransactionPriority::BATCH: @@ -274,7 +273,7 @@ struct GetReadVersionRequest : TimedRequest { template void serialize(Ar& ar) { - serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, queueIterations); + serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext); if (ar.isDeserializing) { if ((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) { diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 0ba157a04d..959e7decfe 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -475,7 +475,8 @@ public: void updateCachedReadVersion(double t, Version v); Version getCachedReadVersion(); double getLastGrvTime(); - double lastRkThrottleTime; + double lastRkBatchThrottleTime; + double lastRkDefaultThrottleTime; // Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether // Because our checks for ratekeeper throttling requires communication with the proxies, // we want to track the last time in order to periodically contact the proxy to check for throttling diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 4d05bf2974..8070c23a66 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1306,10 +1306,10 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), specialKeySpace(std::make_unique(specialKeys.begin, specialKeys.end, /* test */ false)), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) { @@ -5963,11 +5963,15 @@ ACTOR Future extractReadVersion(Reference trState, Promise> metadataVersion) { state Span span(spanContext, location, { trState->spanID }); GetReadVersionReply rep = wait(f); - double latency = now() - trState->startTime; + double replyTime = now(); + double latency = replyTime - trState->startTime; trState->cx->lastProxyRequestTime = trState->startTime; trState->cx->updateCachedReadVersion(trState->startTime, rep.version); - if (rep.rkThrottled && trState->options.priority != TransactionPriority::IMMEDIATE) { - trState->cx->lastRkThrottleTime = now(); + if (rep.rkBatchThrottled) { + trState->cx->lastRkBatchThrottleTime = replyTime; + } + if (rep.rkDefaultThrottled) { + trState->cx->lastRkDefaultThrottleTime = replyTime; } trState->cx->GRVLatencies.addSample(latency); if (trState->trLogInfo) @@ -6025,11 +6029,21 @@ ACTOR Future extractReadVersion(Reference trState, return rep.version; } -bool rkThrottlingCooledDown(DatabaseContext* cx) { - if (cx->lastRkThrottleTime == 0.0) { +bool rkThrottlingCooledDown(DatabaseContext* cx, TransactionPriority priority) { + if (priority == TransactionPriority::IMMEDIATE) { return true; + } else if (priority == TransactionPriority::BATCH) { + if (cx->lastRkBatchThrottleTime == 0.0) { + return true; + } + return (now() - cx->lastRkBatchThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN); + } else if (priority == TransactionPriority::DEFAULT) { + if (cx->lastRkDefaultThrottleTime == 0.0) { + return true; + } + return (now() - cx->lastRkDefaultThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN); } - return (now() - cx->lastRkThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN); + return false; } Future Transaction::getReadVersion(uint32_t flags) { @@ -6037,7 +6051,7 @@ Future Transaction::getReadVersion(uint32_t flags) { if (!CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && !trState->options.skipGrvCache && (deterministicRandom()->random01() <= CLIENT_KNOBS->DEBUG_USE_GRV_CACHE_CHANCE || trState->options.useGrvCache) && - rkThrottlingCooledDown(getDatabase().getPtr())) { + rkThrottlingCooledDown(getDatabase().getPtr(), trState->options.priority)) { // Upon our first request to use cached RVs, start the background updater if (!trState->cx->grvUpdateHandler.isValid()) { trState->cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr()); diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index a3fe28b002..82692728bd 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -47,8 +47,10 @@ struct GrvProxyStats { double percentageOfDefaultGRVQueueProcessed; double percentageOfBatchGRVQueueProcessed; - bool lastTxnThrottled; - double throttleStartTime; + bool lastBatchQueueThrottled; + bool lastDefaultQueueThrottled; + double batchThrottleStartTime; + double defaultThrottleStartTime; LatencySample defaultTxnGRVTimeInQueue; LatencySample batchTxnGRVTimeInQueue; @@ -102,7 +104,8 @@ struct GrvProxyStats { updatesFromRatekeeper("UpdatesFromRatekeeper", cc), leaseTimeouts("LeaseTimeouts", cc), systemGRVQueueSize(0), defaultGRVQueueSize(0), batchGRVQueueSize(0), transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0), percentageOfDefaultGRVQueueProcessed(0), - percentageOfBatchGRVQueueProcessed(0), lastTxnThrottled(false), throttleStartTime(0.0), + percentageOfBatchGRVQueueProcessed(0), lastBatchQueueThrottled(false), lastDefaultQueueThrottled(false), + batchThrottleStartTime(0.0), defaultThrottleStartTime(0.0), defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, @@ -653,31 +656,16 @@ ACTOR Future sendGrvReplies(Future replyFuture, } } - reply.queueIterations = request.queueIterations; - TraceEvent(SevDebug, "DebugGrvProxyThrottleCheck") - .detail("QueueIterations", reply.queueIterations) - .detail("ThrottleThreshold", CLIENT_KNOBS->GRV_THROTTLING_THRESHOLD) - .detail("LastTxnThrottled", stats->lastTxnThrottled) - .detail("ThrottleStartTime", format("%.6f", stats->throttleStartTime)) - .detail("Diff", now() - stats->throttleStartTime) - .detail("SustainedThrottlingThreshold", CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD); - if (reply.queueIterations >= CLIENT_KNOBS->GRV_THROTTLING_THRESHOLD) { - if (stats->lastTxnThrottled) { - // Check if this throttling has been sustained for a certain amount of time to avoid false positives - if (now() - stats->throttleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) { - reply.rkThrottled = true; - } - } else { // !stats->lastTxnThrottled - // If not previously throttled, this request/reply is our new starting point - // for judging whether we are being actively throttled by ratekeeper now - stats->lastTxnThrottled = true; - stats->throttleStartTime = now(); + if (stats->lastBatchQueueThrottled) { + // Check if this throttling has been sustained for a certain amount of time to avoid false positives + if (now() - stats->batchThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) { + reply.rkBatchThrottled = true; } - } else { - // If an immediate priority txn comes in, it may get processed on the first time through and - // reset our throttling state prematurely. - if (request.priority != TransactionPriority::IMMEDIATE) { - stats->lastTxnThrottled = false; + } + if (stats->lastDefaultQueueThrottled) { + // Check if this throttling has been sustained for a certain amount of time to avoid false positives + if (now() - stats->defaultThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) { + reply.rkDefaultThrottled = true; } } request.reply.send(reply); @@ -841,7 +829,6 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, // transactionQueue->span.swap(span); auto& req = transactionQueue->front(); - req.queueIterations++; int tc = req.transactionCount; if (req.priority < TransactionPriority::DEFAULT && @@ -877,6 +864,18 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, transactionQueue->pop_front(); requestsToStart++; } + if (!batchQueue.empty()) { + grvProxyData->stats.lastBatchQueueThrottled = true; + grvProxyData->stats.batchThrottleStartTime = now(); + } else { + grvProxyData->stats.lastBatchQueueThrottled = false; + } + if (!defaultQueue.empty()) { + grvProxyData->stats.lastDefaultQueueThrottled = true; + grvProxyData->stats.defaultThrottleStartTime = now(); + } else { + grvProxyData->stats.lastDefaultQueueThrottled = false; + } if (!systemQueue.empty() || !defaultQueue.empty() || !batchQueue.empty()) { forwardPromise( diff --git a/fdbserver/workloads/SidebandSingle.actor.cpp b/fdbserver/workloads/SidebandSingle.actor.cpp index 0ec80a9ef9..7a82b20c12 100644 --- a/fdbserver/workloads/SidebandSingle.actor.cpp +++ b/fdbserver/workloads/SidebandSingle.actor.cpp @@ -74,11 +74,7 @@ struct SidebandSingleWorkload : TestWorkload { } std::string description() const override { return "SidebandSingleWorkload"; } - Future setup(Database const& cx) override { - if (clientId != 0) - return Void(); - return persistInterface(this, cx); - } + Future setup(Database const& cx) override { return Void(); } Future start(Database const& cx) override { if (clientId != 0) return Void(); @@ -108,51 +104,8 @@ struct SidebandSingleWorkload : TestWorkload { m.push_back(keysUnexpectedlyPresent.getMetric()); } - ACTOR Future persistInterface(SidebandSingleWorkload* self, Database cx) { - state Transaction tr(cx); - BinaryWriter wr(IncludeVersion()); - wr << self->interf; - state Standalone serializedInterface = wr.toValue(); - loop { - try { - Optional val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId)))); - if (val.present()) { - if (val.get() != serializedInterface) - throw operation_failed(); - break; - } - tr.set(format("Sideband/Client/%d", self->clientId), serializedInterface); - wait(tr.commit()); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - TraceEvent("SidebandPersisted", self->interf.id()).detail("ClientIdx", self->clientId); - return Void(); - } - - ACTOR Future fetchSideband(SidebandSingleWorkload* self, Database cx) { - state Transaction tr(cx); - loop { - try { - Optional val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId)))); - if (!val.present()) { - throw operation_failed(); - } - SidebandInterface sideband; - BinaryReader br(val.get(), IncludeVersion()); - br >> sideband; - TraceEvent("SidebandFetched", sideband.id()).detail("ClientIdx", self->clientId); - return sideband; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - } - ACTOR Future mutator(SidebandSingleWorkload* self, Database cx) { - state SidebandInterface checker = wait(self->fetchSideband(self, cx)); + state SidebandInterface checker = self->interf; state double lastTime = now(); state Version commitVersion; state bool unknown = false;