diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 3e919258ea..f42ca5e2f8 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -90,6 +90,7 @@ set(FDBSERVER_SRCS QuietDatabase.actor.cpp QuietDatabase.h RadixTree.h + Ratekeeper.h Ratekeeper.actor.cpp RatekeeperInterface.h RecoveryState.h @@ -130,6 +131,8 @@ set(FDBSERVER_SRCS storageserver.actor.cpp TagPartitionedLogSystem.actor.cpp TagPartitionedLogSystem.actor.h + TagThrottler.actor.cpp + TagThrottler.h template_fdb.h TCInfo.actor.cpp TCInfo.h diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 079b70eae4..84fdf39877 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,38 +18,12 @@ * limitations under the License. */ -#include "fdbserver/WorkerInterface.actor.h" -#include "flow/IndexedSet.h" -#include "fdbrpc/FailureMonitor.h" -#include "fdbrpc/Smoother.h" -#include "fdbrpc/simulator.h" -#include "fdbclient/DatabaseContext.h" -#include "fdbclient/ReadYourWrites.h" -#include "fdbclient/TagThrottle.actor.h" -#include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" -#include "fdbserver/RatekeeperInterface.h" -#include "fdbserver/ServerDBInfo.h" +#include "fdbserver/Ratekeeper.h" +#include "fdbserver/TagThrottler.h" #include "fdbserver/WaitFailure.h" -#include "flow/actorcompiler.h" // This must be the last #include. -enum limitReason_t { - unlimited, // TODO: rename to workload? - storage_server_write_queue_size, // 1 - storage_server_write_bandwidth_mvcc, - storage_server_readable_behind, - log_server_mvcc_write_bandwidth, - log_server_write_queue, // 5 - storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space - storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio - log_server_min_free_space, - log_server_min_free_space_ratio, - storage_server_durability_lag, // 10 - storage_server_list_fetch_failed, - limitReason_t_end -}; - -int limitReasonEnd = limitReason_t_end; +#include "flow/actorcompiler.h" // must be last include const char* limitReasonName[] = { "workload", "storage_server_write_queue_size", @@ -65,6 +39,8 @@ const char* limitReasonName[] = { "workload", "storage_server_list_fetch_failed" }; static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size"); +int limitReasonEnd = limitReason_t_end; + // NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph) // IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS! const char* limitReasonDesc[] = { "Workload or read performance.", @@ -82,635 +58,7 @@ const char* limitReasonDesc[] = { "Workload or read performance.", static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size"); -struct StorageQueueInfo { - bool valid; - UID id; - LocalityData locality; - StorageQueuingMetricsReply lastReply; - StorageQueuingMetricsReply prevReply; - Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; - Smoother smoothDurableVersion, smoothLatestVersion; - Smoother smoothFreeSpace; - Smoother smoothTotalSpace; - limitReason_t limitReason; - - Optional busiestReadTag, busiestWriteTag; - double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; - double busiestReadTagRate = 0, busiestWriteTagRate = 0; - - Reference busiestWriteTagEventHolder; - - // refresh periodically - TransactionTagMap tagCostEst; - uint64_t totalWriteCosts = 0; - int totalWriteOps = 0; - - StorageQueueInfo(UID id, LocalityData locality) - : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - limitReason(limitReason_t::unlimited), - busiestWriteTagEventHolder(makeReference(id.toString() + "/BusiestWriteTag")) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo - lastReply.instanceID = -1; - } -}; - -struct TLogQueueInfo { - bool valid; - UID id; - TLogQueuingMetricsReply lastReply; - TLogQueuingMetricsReply prevReply; - Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; - Smoother smoothFreeSpace; - Smoother smoothTotalSpace; - TLogQueueInfo(UID id) - : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from - // storageQueueInfO) - lastReply.instanceID = -1; - } -}; - -class RkTagThrottleCollection : NonCopyable { -private: - struct RkTagData { - Smoother requestRate; - RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - }; - - struct RkTagThrottleData { - ClientTagThrottleLimits limits; - Smoother clientRate; - - // Only used by auto-throttles - double created = now(); - double lastUpdated = 0; - double lastReduced = now(); - bool rateSet = false; - - RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - - double getTargetRate(Optional requestRate) { - if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { - return limits.tpsRate; - } else { - return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal()); - } - } - - Optional updateAndGetClientRate(Optional requestRate) { - if (limits.expiration > now()) { - double targetRate = getTargetRate(requestRate); - if (targetRate == std::numeric_limits::max()) { - rateSet = false; - return targetRate; - } - if (!rateSet) { - rateSet = true; - clientRate.reset(targetRate); - } else { - clientRate.setTotal(targetRate); - } - - double rate = clientRate.smoothTotal(); - ASSERT(rate >= 0); - return rate; - } else { - TEST(true); // Get throttle rate for expired throttle - rateSet = false; - return Optional(); - } - } - }; - - void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); } - -public: - RkTagThrottleCollection() {} - - RkTagThrottleCollection(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - void operator=(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) { - ASSERT(currentBusyness > 0); - - if (targetBusyness < 1) { - double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness); - return requestRate * targetFraction; - } else { - return std::numeric_limits::max(); - } - } - - // Returns the TPS rate if the throttle is updated, otherwise returns an empty optional - Optional autoThrottleTag(UID id, - TransactionTag const& tag, - double fractionalBusyness, - Optional tpsRate = Optional(), - Optional expiration = Optional()) { - ASSERT(!tpsRate.present() || tpsRate.get() >= 0); - ASSERT(!expiration.present() || expiration.get() > now()); - - auto itr = autoThrottledTags.find(tag); - bool present = (itr != autoThrottledTags.end()); - if (!present) { - if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) { - TEST(true); // Reached auto-throttle limit - return Optional(); - } - - itr = autoThrottledTags.try_emplace(tag).first; - initializeTag(tag); - } else if (itr->second.limits.expiration <= now()) { - TEST(true); // Re-throttling expired tag that hasn't been cleaned up - present = false; - itr->second = RkTagThrottleData(); - } - - auto& throttle = itr->second; - - if (!tpsRate.present()) { - if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) { - tpsRate = std::numeric_limits::max(); - if (present) { - return Optional(); - } - } else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) { - TEST(true); // Tag auto-throttled too quickly - return Optional(); - } else { - tpsRate = computeTargetTpsRate(fractionalBusyness, - SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, - tagData[tag].requestRate.smoothRate()); - - if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) { - TEST(true); // Tag auto-throttle rate increase attempt while active - return Optional(); - } - - throttle.lastUpdated = now(); - if (tpsRate.get() < throttle.limits.tpsRate) { - throttle.lastReduced = now(); - } - } - } - if (!expiration.present()) { - expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION; - } - - ASSERT(tpsRate.present() && tpsRate.get() >= 0); - - throttle.limits.tpsRate = tpsRate.get(); - throttle.limits.expiration = expiration.get(); - - Optional clientRate = throttle.updateAndGetClientRate(getRequestRate(tag)); - - TraceEvent("RkSetAutoThrottle", id) - .detail("Tag", tag) - .detail("TargetRate", tpsRate.get()) - .detail("Expiration", expiration.get() - now()) - .detail("ClientRate", clientRate) - .detail("Created", now() - throttle.created) - .detail("LastUpdate", now() - throttle.lastUpdated) - .detail("LastReduced", now() - throttle.lastReduced); - - if (tpsRate.get() != std::numeric_limits::max()) { - return tpsRate.get(); - } else { - return Optional(); - } - } - - void manualThrottleTag(UID id, - TransactionTag const& tag, - TransactionPriority priority, - double tpsRate, - double expiration, - Optional const& oldLimits) { - ASSERT(tpsRate >= 0); - ASSERT(expiration > now()); - - auto& priorityThrottleMap = manualThrottledTags[tag]; - auto result = priorityThrottleMap.try_emplace(priority); - initializeTag(tag); - ASSERT(result.second); // Updating to the map is done by copying the whole map - - result.first->second.limits.tpsRate = tpsRate; - result.first->second.limits.expiration = expiration; - - if (!oldLimits.present()) { - TEST(true); // Transaction tag manually throttled - TraceEvent("RatekeeperAddingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) { - TEST(true); // Manual transaction tag throttle updated - TraceEvent("RatekeeperUpdatingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } - - Optional clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag)); - ASSERT(clientRate.present()); - } - - Optional getManualTagThrottleLimits(TransactionTag const& tag, - TransactionPriority priority) { - auto itr = manualThrottledTags.find(tag); - if (itr != manualThrottledTags.end()) { - auto priorityItr = itr->second.find(priority); - if (priorityItr != itr->second.end()) { - return priorityItr->second.limits; - } - } - - return Optional(); - } - - PrioritizedTransactionTagMap getClientRates(bool autoThrottlingEnabled) { - PrioritizedTransactionTagMap clientRates; - - for (auto tagItr = tagData.begin(); tagItr != tagData.end();) { - bool tagPresent = false; - - double requestRate = tagItr->second.requestRate.smoothRate(); - auto manualItr = manualThrottledTags.find(tagItr->first); - if (manualItr != manualThrottledTags.end()) { - Optional manualClientRate; - for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); - ++priority) { - auto priorityItr = manualItr->second.find(*priority); - if (priorityItr != manualItr->second.end()) { - Optional priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate); - if (!priorityClientRate.present()) { - TEST(true); // Manual priority throttle expired - priorityItr = manualItr->second.erase(priorityItr); - } else { - if (!manualClientRate.present() || - manualClientRate.get().tpsRate > priorityClientRate.get()) { - manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), - priorityItr->second.limits.expiration); - } else { - TEST(true); // Manual throttle overriden by higher priority - } - - ++priorityItr; - } - } - - if (manualClientRate.present()) { - tagPresent = true; - TEST(true); // Using manual throttle - clientRates[*priority][tagItr->first] = manualClientRate.get(); - } - } - - if (manualItr->second.empty()) { - TEST(true); // All manual throttles expired - manualThrottledTags.erase(manualItr); - break; - } - } - - auto autoItr = autoThrottledTags.find(tagItr->first); - if (autoItr != autoThrottledTags.end()) { - Optional autoClientRate = autoItr->second.updateAndGetClientRate(requestRate); - if (autoClientRate.present()) { - double adjustedRate = autoClientRate.get(); - double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - if (now() >= rampStartTime && adjustedRate != std::numeric_limits::max()) { - TEST(true); // Tag auto-throttle ramping up - - double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS; - if (targetBusyness == 0) { - targetBusyness = 0.01; - } - - double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - adjustedRate = - computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate); - } - - tagPresent = true; - if (autoThrottlingEnabled) { - auto result = clientRates[TransactionPriority::DEFAULT].try_emplace( - tagItr->first, adjustedRate, autoItr->second.limits.expiration); - if (!result.second && result.first->second.tpsRate > adjustedRate) { - result.first->second = - ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration); - } else { - TEST(true); // Auto throttle overriden by manual throttle - } - clientRates[TransactionPriority::BATCH][tagItr->first] = - ClientTagThrottleLimits(0, autoItr->second.limits.expiration); - } - } else { - ASSERT(autoItr->second.limits.expiration <= now()); - TEST(true); // Auto throttle expired - if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup - tagPresent = true; - } else { - autoThrottledTags.erase(autoItr); - } - } - } - - if (!tagPresent) { - TEST(true); // All tag throttles expired - tagItr = tagData.erase(tagItr); - } else { - ++tagItr; - } - } - - return clientRates; - } - - void addRequests(TransactionTag const& tag, int requests) { - if (requests > 0) { - TEST(true); // Requests reported for throttled tag - - auto tagItr = tagData.try_emplace(tag); - tagItr.first->second.requestRate.addDelta(requests); - - double requestRate = tagItr.first->second.requestRate.smoothRate(); - - auto autoItr = autoThrottledTags.find(tag); - if (autoItr != autoThrottledTags.end()) { - autoItr->second.updateAndGetClientRate(requestRate); - } - - auto manualItr = manualThrottledTags.find(tag); - if (manualItr != manualThrottledTags.end()) { - for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); - ++priorityItr) { - priorityItr->second.updateAndGetClientRate(requestRate); - } - } - } - } - - Optional getRequestRate(TransactionTag const& tag) { - auto itr = tagData.find(tag); - if (itr != tagData.end()) { - return itr->second.requestRate.smoothRate(); - } - return Optional(); - } - - int64_t autoThrottleCount() const { return autoThrottledTags.size(); } - - int64_t manualThrottleCount() const { - int64_t count = 0; - for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) { - count += itr->second.size(); - } - - return count; - } - - TransactionTagMap autoThrottledTags; - TransactionTagMap> manualThrottledTags; - TransactionTagMap tagData; - uint32_t busyReadTagCount = 0, busyWriteTagCount = 0; -}; - -struct RatekeeperLimits { - double tpsLimit; - Int64MetricHandle tpsLimitMetric; - Int64MetricHandle reasonMetric; - - int64_t storageTargetBytes; - int64_t storageSpringBytes; - int64_t logTargetBytes; - int64_t logSpringBytes; - double maxVersionDifference; - - int64_t durabilityLagTargetVersions; - int64_t lastDurabilityLag; - double durabilityLagLimit; - - TransactionPriority priority; - std::string context; - - Reference rkUpdateEventCacheHolder; - - RatekeeperLimits(TransactionPriority priority, - std::string context, - int64_t storageTargetBytes, - int64_t storageSpringBytes, - int64_t logTargetBytes, - int64_t logSpringBytes, - double maxVersionDifference, - int64_t durabilityLagTargetVersions) - : tpsLimit(std::numeric_limits::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)), - reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes), - storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes), - maxVersionDifference(maxVersionDifference), - durabilityLagTargetVersions( - durabilityLagTargetVersions + - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not - // be durable on the storage servers - lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits::infinity()), priority(priority), - context(context), rkUpdateEventCacheHolder(makeReference("RkUpdate" + context)) {} -}; - -namespace RatekeeperActorCpp { - -// Differentiate from GrvProxyInfo in DatabaseContext.h -struct GrvProxyInfo { - int64_t totalTransactions; - int64_t batchTransactions; - uint64_t lastThrottledTagChangeId; - - double lastUpdateTime; - double lastTagPushTime; - - GrvProxyInfo() - : totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), lastTagPushTime(0) { - } -}; - -} // namespace RatekeeperActorCpp - -struct RatekeeperData { - UID id; - Database db; - - Map storageQueueInfo; - Map tlogQueueInfo; - - std::map grvProxyInfo; - Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes; - HealthMetrics healthMetrics; - DatabaseConfiguration configuration; - PromiseStream> addActor; - - Int64MetricHandle actualTpsMetric; - - double lastWarning; - double lastSSListFetchedTimestamp; - double lastBusiestCommitTagPick; - - RkTagThrottleCollection throttledTags; - uint64_t throttledTagChangeId; - - RatekeeperLimits normalLimits; - RatekeeperLimits batchLimits; - - Deque actualTpsHistory; - Optional remoteDC; - - Future expiredTagThrottleCleanup; - - bool autoThrottlingEnabled; - - RatekeeperData(UID id, Database db) - : id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()), - lastBusiestCommitTagPick(0), throttledTagChangeId(0), - normalLimits(TransactionPriority::DEFAULT, - "", - SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, - SERVER_KNOBS->TARGET_BYTES_PER_TLOG, - SERVER_KNOBS->SPRING_BYTES_TLOG, - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, - SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS), - batchLimits(TransactionPriority::BATCH, - "Batch", - SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, - SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, - SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, - SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), - autoThrottlingEnabled(false) { - expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); }, - SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); - } -}; - -// SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function -ACTOR Future trackStorageServerQueueInfo(RatekeeperData* self, StorageServerInterface ssi) { - self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality))); - state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); - TraceEvent("RkTracking", self->id).detail("StorageServer", ssi.id()).detail("Locality", ssi.locality.toString()); - try { - loop { - ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( - StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? - if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.reset(reply.get().version); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version); - } - - myQueueInfo->value.busiestReadTag = reply.get().busiestTag; - myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; - myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate; - } else { - if (myQueueInfo->value.valid) { - TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id()); - } - myQueueInfo->value.valid = false; - } - - wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && - IFailureMonitor::failureMonitor().onStateEqual(ssi.getQueuingMetrics.getEndpoint(), - FailureStatus(false))); - } - } catch (...) { - // including cancellation - self->storageQueueInfo.erase(myQueueInfo); - throw; - } -} - -ACTOR Future trackTLogQueueInfo(RatekeeperData* self, TLogInterface tli) { - self->tlogQueueInfo.insert(mapPair(tli.id(), TLogQueueInfo(tli.id()))); - state Map::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id()); - TraceEvent("RkTracking", self->id).detail("TransactionLog", tli.id()); - try { - loop { - ErrorOr reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor( - TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? - if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - } - } else { - if (myQueueInfo->value.valid) { - TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id()); - } - myQueueInfo->value.valid = false; - } - - wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && - IFailureMonitor::failureMonitor().onStateEqual(tli.getQueuingMetrics.getEndpoint(), - FailureStatus(false))); - } - } catch (...) { - // including cancellation - self->tlogQueueInfo.erase(myQueueInfo); - throw; - } -} - -ACTOR Future splitError(Future in, Promise errOut) { +ACTOR static Future splitError(Future in, Promise errOut) { try { wait(in); return Void(); @@ -721,292 +69,434 @@ ACTOR Future splitError(Future in, Promise errOut) { } } -ACTOR Future trackEachStorageServer( - RatekeeperData* self, - FutureStream>> serverChanges) { - state Map> actors; - state Promise err; - loop choose { - when(state std::pair> change = waitNext(serverChanges)) { - wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack - if (change.second.present()) { - if (!change.second.get().isTss()) { - auto& a = actors[change.first]; - a = Future(); - a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err); - } - } else - actors.erase(change.first); - } - when(wait(err.getFuture())) {} - } -} +class RatekeeperImpl { +public: + ACTOR static Future configurationMonitor(Ratekeeper* self) { + loop { + state ReadYourWritesTransaction tr(self->db); -ACTOR Future monitorServerListChange( - RatekeeperData* self, - PromiseStream>> serverChanges) { - state std::map oldServers; - state Transaction tr(self->db); + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - loop { - try { - if (now() - self->lastSSListFetchedTimestamp > 2 * SERVER_KNOBS->SERVER_LIST_DELAY) { - TraceEvent(SevWarnAlways, "RatekeeperGetSSListLongLatency", self->id) - .detail("Latency", now() - self->lastSSListFetchedTimestamp); - } - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - std::vector> results = - wait(getServerListAndProcessClasses(&tr)); - self->lastSSListFetchedTimestamp = now(); + self->configuration.fromKeyValues((VectorRef)results); - std::map newServers; - for (const auto& [ssi, _] : results) { - const UID serverId = ssi.id(); - newServers[serverId] = ssi; - - if (oldServers.count(serverId)) { - if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { - serverChanges.send(std::make_pair(serverId, Optional(ssi))); - } - oldServers.erase(serverId); - } else { - serverChanges.send(std::make_pair(serverId, Optional(ssi))); + state Future watchFuture = + tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || + tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || + tr.watch(failedLocalityVersionKey); + wait(tr.commit()); + wait(watchFuture); + break; + } catch (Error& e) { + wait(tr.onError(e)); } } - - for (const auto& it : oldServers) { - serverChanges.send(std::make_pair(it.first, Optional())); - } - - oldServers.swap(newServers); - tr = Transaction(self->db); - wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY)); - } catch (Error& e) { - TraceEvent("RatekeeperGetSSListError", self->id).error(e).suppressFor(1.0); - wait(tr.onError(e)); } } -} -ACTOR Future monitorThrottlingChanges(RatekeeperData* self) { - state bool committed = false; - loop { - state ReadYourWritesTransaction tr(self->db); + ACTOR static Future monitorServerListChange( + Ratekeeper* self, + PromiseStream>> serverChanges) { + state std::map oldServers; + state Transaction tr(self->db); loop { try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + if (now() - self->lastSSListFetchedTimestamp > 2 * SERVER_KNOBS->SERVER_LIST_DELAY) { + TraceEvent(SevWarnAlways, "RatekeeperGetSSListLongLatency", self->id) + .detail("Latency", now() - self->lastSSListFetchedTimestamp); + } tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + std::vector> results = + wait(getServerListAndProcessClasses(&tr)); + self->lastSSListFetchedTimestamp = now(); - state Future throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY); - state Future> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey); + std::map newServers; + for (const auto& [ssi, _] : results) { + const UID serverId = ssi.id(); + newServers[serverId] = ssi; - if (!committed) { - BinaryWriter limitWriter(Unversioned()); - limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS; - tr.set(tagThrottleLimitKey, limitWriter.toValue()); - } - - wait(success(throttledTagKeys) && success(autoThrottlingEnabled)); - - if (autoThrottlingEnabled.get().present() && - autoThrottlingEnabled.get().get() == LiteralStringRef("0")) { - TEST(true); // Auto-throttling disabled - if (self->autoThrottlingEnabled) { - TraceEvent("AutoTagThrottlingDisabled", self->id).log(); - } - self->autoThrottlingEnabled = false; - } else if (autoThrottlingEnabled.get().present() && - autoThrottlingEnabled.get().get() == LiteralStringRef("1")) { - TEST(true); // Auto-throttling enabled - if (!self->autoThrottlingEnabled) { - TraceEvent("AutoTagThrottlingEnabled", self->id).log(); - } - self->autoThrottlingEnabled = true; - } else { - TEST(true); // Auto-throttling unspecified - if (autoThrottlingEnabled.get().present()) { - TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id) - .detail("Value", autoThrottlingEnabled.get().get()); - } - self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED; - if (!committed) - tr.set(tagThrottleAutoEnabledKey, LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0")); - } - - RkTagThrottleCollection updatedTagThrottles; - - TraceEvent("RatekeeperReadThrottledTags", self->id) - .detail("NumThrottledTags", throttledTagKeys.get().size()); - for (auto entry : throttledTagKeys.get()) { - TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key); - TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value); - - ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported - - if (tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) { - TEST(true); // Converting tag throttle duration to absolute time - tagValue.expirationTime = now() + tagValue.initialDuration; - BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason())); - wr << tagValue; - state Value value = wr.toValue(); - - tr.set(entry.key, value); - } - - if (tagValue.expirationTime > now()) { - TransactionTag tag = *tagKey.tags.begin(); - Optional oldLimits = - self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority); - - if (tagKey.throttleType == TagThrottleType::AUTO) { - updatedTagThrottles.autoThrottleTag( - self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); - if (tagValue.reason == TagThrottledReason::BUSY_READ) { - updatedTagThrottles.busyReadTagCount++; - } else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) { - updatedTagThrottles.busyWriteTagCount++; - } - } else { - updatedTagThrottles.manualThrottleTag( - self->id, tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits); + if (oldServers.count(serverId)) { + if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { + serverChanges.send(std::make_pair(serverId, Optional(ssi))); } + oldServers.erase(serverId); + } else { + serverChanges.send(std::make_pair(serverId, Optional(ssi))); } } - self->throttledTags = std::move(updatedTagThrottles); - ++self->throttledTagChangeId; + for (const auto& it : oldServers) { + serverChanges.send(std::make_pair(it.first, Optional())); + } - state Future watchFuture = tr.watch(tagThrottleSignalKey); - wait(tr.commit()); - committed = true; - - wait(watchFuture); - TraceEvent("RatekeeperThrottleSignaled", self->id).log(); - TEST(true); // Tag throttle changes detected - break; + oldServers.swap(newServers); + tr = Transaction(self->db); + wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY)); } catch (Error& e) { - TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e); + TraceEvent("RatekeeperGetSSListError", self->id).error(e).suppressFor(1.0); wait(tr.onError(e)); } } } -} -Future refreshStorageServerCommitCost(RatekeeperData* self) { - if (self->lastBusiestCommitTagPick == 0) { // the first call should be skipped - self->lastBusiestCommitTagPick = now(); + ACTOR static Future trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) { + self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality))); + state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); + TraceEvent("RkTracking", self->id) + .detail("StorageServer", ssi.id()) + .detail("Locality", ssi.locality.toString()); + try { + loop { + ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( + StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? + if (reply.present()) { + myQueueInfo->value.valid = true; + myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; + myQueueInfo->value.lastReply = reply.get(); + if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { + myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); + myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); + myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); + myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); + myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); + myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion); + myQueueInfo->value.smoothLatestVersion.reset(reply.get().version); + } else { + self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - + myQueueInfo->value.prevReply.bytesDurable); + myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); + myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); + myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); + myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); + myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); + myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion); + myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version); + } + + myQueueInfo->value.busiestReadTag = reply.get().busiestTag; + myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; + myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate; + } else { + if (myQueueInfo->value.valid) { + TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id()); + } + myQueueInfo->value.valid = false; + } + + wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && + IFailureMonitor::failureMonitor().onStateEqual(ssi.getQueuingMetrics.getEndpoint(), + FailureStatus(false))); + } + } catch (...) { + // including cancellation + self->storageQueueInfo.erase(myQueueInfo); + throw; + } + } + + ACTOR static Future trackTLogQueueInfo(Ratekeeper* self, TLogInterface tli) { + self->tlogQueueInfo.insert(mapPair(tli.id(), TLogQueueInfo(tli.id()))); + state Map::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id()); + TraceEvent("RkTracking", self->id).detail("TransactionLog", tli.id()); + try { + loop { + ErrorOr reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor( + TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? + if (reply.present()) { + myQueueInfo->value.valid = true; + myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; + myQueueInfo->value.lastReply = reply.get(); + if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { + myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); + myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); + myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); + myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); + myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); + } else { + self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - + myQueueInfo->value.prevReply.bytesDurable); + myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); + myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); + myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); + myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); + myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); + } + } else { + if (myQueueInfo->value.valid) { + TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id()); + } + myQueueInfo->value.valid = false; + } + + wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && + IFailureMonitor::failureMonitor().onStateEqual(tli.getQueuingMetrics.getEndpoint(), + FailureStatus(false))); + } + } catch (...) { + // including cancellation + self->tlogQueueInfo.erase(myQueueInfo); + throw; + } + } + + ACTOR static Future trackEachStorageServer( + Ratekeeper* self, + FutureStream>> serverChanges) { + state Map> actors; + state Promise err; + loop choose { + when(state std::pair> change = waitNext(serverChanges)) { + wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack + if (change.second.present()) { + if (!change.second.get().isTss()) { + auto& a = actors[change.first]; + a = Future(); + a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err); + } + } else + actors.erase(change.first); + } + when(wait(err.getFuture())) {} + } + } + + ACTOR static Future monitorThrottlingChanges(Ratekeeper* self) { + wait(self->tagThrottler->monitorThrottlingChanges()); return Void(); } - double elapsed = now() - self->lastBusiestCommitTagPick; - // for each SS, select the busiest commit tag from ssTrTagCommitCost - for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) { - it->value.busiestWriteTag.reset(); - TransactionTag busiestTag; - TransactionCommitCostEstimation maxCost; - double maxRate = 0, maxBusyness = 0; - for (const auto& [tag, cost] : it->value.tagCostEst) { - double rate = cost.getCostSum() / elapsed; - if (rate > maxRate) { - busiestTag = tag; - maxRate = rate; - maxCost = cost; + + ACTOR static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo) { + state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); + state Future timeout = Void(); + state std::vector> tlogTrackers; + state std::vector tlogInterfs; + state Promise err; + state Future collection = actorCollection(self.addActor.getFuture()); + + TraceEvent("RatekeeperStarting", rkInterf.id()); + self.addActor.send(waitFailureServer(rkInterf.waitFailure.getFuture())); + self.addActor.send(self.configurationMonitor()); + + PromiseStream>> serverChanges; + self.addActor.send(self.monitorServerListChange(serverChanges)); + self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture())); + self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); + + self.addActor.send(self.monitorThrottlingChanges()); + Ratekeeper* selfPtr = &self; // let flow compiler capture self + self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); }, + SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + + TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) + .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) + .detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) + .detail( + "Rate", + (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / + ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + + 2.0)); + + TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id()) + .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) + .detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) + .detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES) + .detail( + "Rate", + (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / + ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + + 2.0)); + + tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); + tlogTrackers.reserve(tlogInterfs.size()); + for (int i = 0; i < tlogInterfs.size(); i++) { + tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err)); + } + + self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); + + try { + state bool lastLimited = false; + loop choose { + when(wait(timeout)) { + self.updateRate(&self.normalLimits); + self.updateRate(&self.batchLimits); + + lastLimited = self.smoothReleasedTransactions.smoothRate() > + SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; + double tooOld = now() - 1.0; + for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) { + if (p->second.lastUpdateTime < tooOld) + p = self.grvProxyInfo.erase(p); + else + ++p; + } + timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE); + } + when(GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { + GetRateInfoReply reply; + + auto& p = self.grvProxyInfo[req.requesterID]; + //TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions); + if (p.totalTransactions > 0) { + self.smoothReleasedTransactions.addDelta(req.totalReleasedTransactions - p.totalTransactions); + + for (auto const& [tag, count] : req.throttledTagCounts) { + self.tagThrottler->addRequests(tag, count); + } + } + if (p.batchTransactions > 0) { + self.smoothBatchReleasedTransactions.addDelta(req.batchReleasedTransactions - + p.batchTransactions); + } + + p.totalTransactions = req.totalReleasedTransactions; + p.batchTransactions = req.batchReleasedTransactions; + p.lastUpdateTime = now(); + + reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size(); + reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size(); + reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE; + + if (p.lastThrottledTagChangeId != self.tagThrottler->getThrottledTagChangeId() || + now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { + p.lastThrottledTagChangeId = self.tagThrottler->getThrottledTagChangeId(); + p.lastTagPushTime = now(); + + reply.throttledTags = self.tagThrottler->getClientRates(); + bool returningTagsToProxy = + reply.throttledTags.present() && reply.throttledTags.get().size() > 0; + TEST(returningTagsToProxy); // Returning tag throttles to a proxy + } + + reply.healthMetrics.update(self.healthMetrics, true, req.detailed); + reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit; + reply.healthMetrics.batchLimited = lastLimited; + + req.reply.send(reply); + } + when(HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) { + req.reply.send(Void()); + TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID); + break; + } + when(ReportCommitCostEstimationRequest req = + waitNext(rkInterf.reportCommitCostEstimation.getFuture())) { + self.updateCommitCostEstimation(req.ssTrTagCommitCost); + req.reply.send(Void()); + } + when(wait(err.getFuture())) {} + when(wait(dbInfo->onChange())) { + if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) { + tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); + tlogTrackers = std::vector>(); + for (int i = 0; i < tlogInterfs.size(); i++) + tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err)); + } + self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); + } + when(wait(collection)) { + ASSERT(false); + throw internal_error(); + } } + } catch (Error& err) { + TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true); } - if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { - it->value.busiestWriteTag = busiestTag; - // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps); - ASSERT(it->value.totalWriteCosts > 0); - maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts; - it->value.busiestWriteTagFractionalBusyness = maxBusyness; - it->value.busiestWriteTagRate = maxRate; - } - - TraceEvent("BusiestWriteTag", it->key) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagOps", maxCost.getOpsSum()) - .detail("TagCost", maxCost.getCostSum()) - .detail("TotalCost", it->value.totalWriteCosts) - .detail("Reported", it->value.busiestWriteTag.present()) - .trackLatest(it->value.busiestWriteTagEventHolder->trackingKey); - - // reset statistics - it->value.tagCostEst.clear(); - it->value.totalWriteOps = 0; - it->value.totalWriteCosts = 0; + return Void(); } - self->lastBusiestCommitTagPick = now(); - return Void(); + +}; // class RatekeeperImpl + +Future Ratekeeper::configurationMonitor() { + return RatekeeperImpl::configurationMonitor(this); } -void tryAutoThrottleTag(RatekeeperData* self, - TransactionTag tag, - double rate, - double busyness, - TagThrottledReason reason) { - // NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE - // currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs. - if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { - TEST(true); // Transaction tag auto-throttled - Optional clientRate = self->throttledTags.autoThrottleTag(self->id, tag, busyness); - if (clientRate.present()) { - TagSet tags; - tags.addTag(tag); +Future Ratekeeper::monitorServerListChange( + PromiseStream>> serverChanges) { + return RatekeeperImpl::monitorServerListChange(this, serverChanges); +} - Reference db = Reference::addRef(self->db.getPtr()); - self->addActor.send(ThrottleApi::throttleTags(db, - tags, - clientRate.get(), - SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, - TagThrottleType::AUTO, - TransactionPriority::DEFAULT, - now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, - reason)); +Future Ratekeeper::trackEachStorageServer( + FutureStream>> serverChanges) { + return RatekeeperImpl::trackEachStorageServer(this, serverChanges); +} + +Future Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) { + return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi); +} + +Future Ratekeeper::trackTLogQueueInfo(TLogInterface tli) { + return RatekeeperImpl::trackTLogQueueInfo(this, tli); +} + +Future Ratekeeper::monitorThrottlingChanges() { + return RatekeeperImpl::monitorThrottlingChanges(this); +} + +Future Ratekeeper::run(RatekeeperInterface rkInterf, Reference const> dbInfo) { + return RatekeeperImpl::run(rkInterf, dbInfo); +} + +Ratekeeper::Ratekeeper(UID id, Database db) + : id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()), + normalLimits(TransactionPriority::DEFAULT, + "", + SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, + SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, + SERVER_KNOBS->TARGET_BYTES_PER_TLOG, + SERVER_KNOBS->SPRING_BYTES_TLOG, + SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, + SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS), + batchLimits(TransactionPriority::BATCH, + "Batch", + SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, + SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, + SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, + SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, + SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, + SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), + lastBusiestCommitTagPick(0.0) { + tagThrottler = std::make_unique(db, id); + expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); }, + SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); +} + +void Ratekeeper::updateCommitCostEstimation( + UIDTransactionTagMap const& costEstimation) { + for (auto it = storageQueueInfo.begin(); it != storageQueueInfo.end(); ++it) { + auto tagCostIt = costEstimation.find(it->key); + if (tagCostIt == costEstimation.end()) + continue; + for (const auto& [tagName, cost] : tagCostIt->second) { + it->value.tagCostEst[tagName] += cost; + it->value.totalWriteCosts += cost.getCostSum(); + it->value.totalWriteOps += cost.getOpsSum(); } } } -void tryAutoThrottleTag(RatekeeperData* self, - StorageQueueInfo& ss, - int64_t storageQueue, - int64_t storageDurabilityLag) { - // NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In most - // of situation, this works. More indicators besides queue size and durability lag could be investigated in the - // future - if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || - storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { - if (ss.busiestWriteTag.present()) { - tryAutoThrottleTag(self, - ss.busiestWriteTag.get(), - ss.busiestWriteTagRate, - ss.busiestWriteTagFractionalBusyness, - TagThrottledReason::BUSY_WRITE); - } - if (ss.busiestReadTag.present()) { - tryAutoThrottleTag(self, - ss.busiestReadTag.get(), - ss.busiestReadTagRate, - ss.busiestReadTagFractionalBusyness, - TagThrottledReason::BUSY_READ); - } - } -} - -void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { +void Ratekeeper::updateRate(RatekeeperLimits* limits) { // double controlFactor = ; // dt / eFoldingTime - double actualTps = self->smoothReleasedTransactions.smoothRate(); - self->actualTpsMetric = (int64_t)actualTps; + double actualTps = smoothReleasedTransactions.smoothRate(); + actualTpsMetric = (int64_t)actualTps; // SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this // value - actualTps = std::max(std::max(1.0, actualTps), - self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); + actualTps = + std::max(std::max(1.0, actualTps), smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); - if (self->actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) { - self->actualTpsHistory.pop_front(); + if (actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) { + actualTpsHistory.pop_front(); } - self->actualTpsHistory.push_back(actualTps); + actualTpsHistory.push_back(actualTps); limits->tpsLimit = std::numeric_limits::infinity(); UID reasonID = UID(); @@ -1029,9 +519,9 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { (deterministicRandom()->random01() < SERVER_KNOBS->RATEKEEPER_LIMIT_REASON_SAMPLE_RATE); // Look at each storage server's write queue and local rate, compute and store the desired rate ratio - for (auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) { + for (auto i = storageQueueInfo.begin(); i != storageQueueInfo.end(); ++i) { auto& ss = i->value; - if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) + if (!ss.valid || (remoteDC.present() && ss.locality.dcId() == remoteDC)) continue; ++sscount; @@ -1074,7 +564,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { storageDurabilityLagReverseIndex.insert(std::make_pair(-1 * storageDurabilityLag, &ss)); - auto& ssMetrics = self->healthMetrics.storageStats[ss.id]; + auto& ssMetrics = healthMetrics.storageStats[ss.id]; ssMetrics.storageQueue = storageQueue; ssMetrics.storageDurabilityLag = storageDurabilityLag; ssMetrics.cpuUsage = ss.lastReply.cpuUsage; @@ -1083,29 +573,29 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0); if (limits->priority == TransactionPriority::DEFAULT) { - tryAutoThrottleTag(self, ss, storageQueue, storageDurabilityLag); + addActor.send(tagThrottler->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag)); } double inputRate = ss.smoothInputBytes.smoothRate(); // inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE ); /*if( deterministicRandom()->random01() < 0.1 ) { - std::string name = "RatekeeperUpdateRate" + limits.context; - TraceEvent(name, ss.id) - .detail("MinFreeSpace", minFreeSpace) - .detail("SpringBytes", springBytes) - .detail("TargetBytes", targetBytes) - .detail("SmoothTotalSpaceTotal", ss.smoothTotalSpace.smoothTotal()) - .detail("SmoothFreeSpaceTotal", ss.smoothFreeSpace.smoothTotal()) - .detail("LastReplyBytesInput", ss.lastReply.bytesInput) - .detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal()) - .detail("TargetRateRatio", targetRateRatio) - .detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate()) - .detail("ActualTPS", actualTps) - .detail("InputRate", inputRate) - .detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate()) - .detail("B", b); - }*/ + std::string name = "RatekeeperUpdateRate" + limits.context; + TraceEvent(name, ss.id) + .detail("MinFreeSpace", minFreeSpace) + .detail("SpringBytes", springBytes) + .detail("TargetBytes", targetBytes) + .detail("SmoothTotalSpaceTotal", ss.smoothTotalSpace.smoothTotal()) + .detail("SmoothFreeSpaceTotal", ss.smoothFreeSpace.smoothTotal()) + .detail("LastReplyBytesInput", ss.lastReply.bytesInput) + .detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal()) + .detail("TargetRateRatio", targetRateRatio) + .detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate()) + .detail("ActualTPS", actualTps) + .detail("InputRate", inputRate) + .detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate()) + .detail("B", b); + }*/ // Don't let any storage server use up its target bytes faster than its MVCC window! double maxBytesPerSecond = @@ -1173,7 +663,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; ++ss) { if (ignoredMachines.size() < - std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { + std::min(configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { ignoredMachines.insert(ss->second->locality.zoneId()); continue; } @@ -1185,7 +675,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal(); limits->tpsLimit = ss->first; reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst - // SS, we still report it as the limiting process + // SS, we still report it as the limiting process limitReason = ssReasons[reasonID]; break; } @@ -1196,7 +686,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { std::set>> ignoredDurabilityLagMachines; for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) { if (ignoredDurabilityLagMachines.size() < - std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { + std::min(configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { ignoredDurabilityLagMachines.insert(ss->second->locality.zoneId()); continue; } @@ -1206,11 +696,11 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { limitingDurabilityLag = -1 * ss->first; if (limitingDurabilityLag > limits->durabilityLagTargetVersions && - self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) { + actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) { if (limits->durabilityLagLimit == std::numeric_limits::infinity()) { double maxTps = 0; - for (int i = 0; i < self->actualTpsHistory.size(); i++) { - maxTps = std::max(maxTps, self->actualTpsHistory[i]); + for (int i = 0; i < actualTpsHistory.size(); i++) { + maxTps = std::max(maxTps, actualTpsHistory[i]); } limits->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER * maxTps; } @@ -1241,10 +731,10 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { break; } - self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer; - self->healthMetrics.limitingStorageQueue = limitingStorageQueueStorageServer; - self->healthMetrics.worstStorageDurabilityLag = worstDurabilityLag; - self->healthMetrics.limitingStorageDurabilityLag = limitingDurabilityLag; + healthMetrics.worstStorageQueue = worstStorageQueueStorageServer; + healthMetrics.limitingStorageQueue = limitingStorageQueueStorageServer; + healthMetrics.worstStorageDurabilityLag = worstDurabilityLag; + healthMetrics.limitingStorageDurabilityLag = limitingDurabilityLag; double writeToReadLatencyLimit = 0; Version worstVersionLag = 0; @@ -1253,9 +743,9 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { { Version minSSVer = std::numeric_limits::max(); Version minLimitingSSVer = std::numeric_limits::max(); - for (const auto& it : self->storageQueueInfo) { + for (const auto& it : storageQueueInfo) { auto& ss = it.value; - if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) + if (!ss.valid || (remoteDC.present() && ss.locality.dcId() == remoteDC)) continue; minSSVer = std::min(minSSVer, ss.lastReply.version); @@ -1267,7 +757,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } Version maxTLVer = std::numeric_limits::min(); - for (const auto& it : self->tlogQueueInfo) { + for (const auto& it : tlogQueueInfo) { auto& tl = it.value; if (!tl.valid) continue; @@ -1286,7 +776,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { int64_t worstFreeSpaceTLog = std::numeric_limits::max(); int64_t worstStorageQueueTLog = 0; int tlcount = 0; - for (auto& it : self->tlogQueueInfo) { + for (auto& it : tlogQueueInfo) { auto& tl = it.value; if (!tl.valid) continue; @@ -1323,14 +813,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal(); - self->healthMetrics.tLogQueue[tl.id] = queue; + healthMetrics.tLogQueue[tl.id] = queue; int64_t b = queue - targetBytes; worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue); if (tl.lastReply.bytesInput - tl.lastReply.bytesDurable > tl.lastReply.storageBytes.free - minFreeSpace / 2) { - if (now() - self->lastWarning > 5.0) { - self->lastWarning = now(); - TraceEvent(SevWarnAlways, "RkTlogMinFreeSpaceZero", self->id).detail("ReasonId", tl.id); + if (now() - lastWarning > 5.0) { + lastWarning = now(); + TraceEvent(SevWarnAlways, "RkTlogMinFreeSpaceZero", id).detail("ReasonId", tl.id); } reasonID = tl.id; limitReason = limitReason_t::log_server_min_free_space; @@ -1411,7 +901,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } } - self->healthMetrics.worstTLogQueue = worstStorageQueueTLog; + healthMetrics.worstTLogQueue = worstStorageQueueTLog; limits->tpsLimit = std::max(limits->tpsLimit, 0.0); @@ -1420,22 +910,22 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { } int64_t totalDiskUsageBytes = 0; - for (auto& t : self->tlogQueueInfo) { + for (auto& t : tlogQueueInfo) { if (t.value.valid) { totalDiskUsageBytes += t.value.lastReply.storageBytes.used; } } - for (auto& s : self->storageQueueInfo) { + for (auto& s : storageQueueInfo) { if (s.value.valid) { totalDiskUsageBytes += s.value.lastReply.storageBytes.used; } } - if (now() - self->lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) { + if (now() - lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) { limits->tpsLimit = 0.0; limitReason = limitReason_t::storage_server_list_fetch_failed; reasonID = UID(); - TraceEvent(SevWarnAlways, "RkSSListFetchTimeout", self->id).suppressFor(1.0); + TraceEvent(SevWarnAlways, "RkSSListFetchTimeout", id).suppressFor(1.0); } else if (limits->tpsLimit == std::numeric_limits::infinity()) { limits->tpsLimit = SERVER_KNOBS->RATEKEEPER_DEFAULT_LIMIT; } @@ -1445,15 +935,15 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { if (deterministicRandom()->random01() < 0.1) { const std::string& name = limits->rkUpdateEventCacheHolder.getPtr()->trackingKey; - TraceEvent(name.c_str(), self->id) + TraceEvent(name.c_str(), id) .detail("TPSLimit", limits->tpsLimit) .detail("Reason", limitReason) .detail("ReasonServerID", reasonID == UID() ? std::string() : Traceable::toString(reasonID)) - .detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate()) - .detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate()) + .detail("ReleasedTPS", smoothReleasedTransactions.smoothRate()) + .detail("ReleasedBatchTPS", smoothBatchReleasedTransactions.smoothRate()) .detail("TPSBasis", actualTps) .detail("StorageServers", sscount) - .detail("GrvProxies", self->grvProxyInfo.size()) + .detail("GrvProxies", grvProxyInfo.size()) .detail("TLogs", tlcount) .detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer) .detail("WorstFreeSpaceTLog", worstFreeSpaceTLog) @@ -1465,187 +955,63 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { .detail("LimitingStorageServerVersionLag", limitingVersionLag) .detail("WorstStorageServerDurabilityLag", worstDurabilityLag) .detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag) - .detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount()) - .detail("TagsAutoThrottledBusyRead", self->throttledTags.busyReadTagCount) - .detail("TagsAutoThrottledBusyWrite", self->throttledTags.busyWriteTagCount) - .detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount()) - .detail("AutoThrottlingEnabled", self->autoThrottlingEnabled) + .detail("TagsAutoThrottled", tagThrottler->autoThrottleCount()) + .detail("TagsAutoThrottledBusyRead", tagThrottler->busyReadTagCount()) + .detail("TagsAutoThrottledBusyWrite", tagThrottler->busyWriteTagCount()) + .detail("TagsManuallyThrottled", tagThrottler->manualThrottleCount()) + .detail("AutoThrottlingEnabled", tagThrottler->isAutoThrottlingEnabled()) .trackLatest(name); } } -static void updateCommitCostEstimation(RatekeeperData* self, - UIDTransactionTagMap const& costEstimation) { - for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) { - auto tagCostIt = costEstimation.find(it->key); - if (tagCostIt == costEstimation.end()) - continue; - for (const auto& [tagName, cost] : tagCostIt->second) { - it->value.tagCostEst[tagName] += cost; - it->value.totalWriteCosts += cost.getCostSum(); - it->value.totalWriteOps += cost.getOpsSum(); - } +Future Ratekeeper::refreshStorageServerCommitCost() { + if (lastBusiestCommitTagPick == 0) { // the first call should be skipped + lastBusiestCommitTagPick = now(); + return Void(); } -} - -ACTOR Future configurationMonitor(RatekeeperData* self) { - loop { - state ReadYourWritesTransaction tr(self->db); - - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - - self->configuration.fromKeyValues((VectorRef)results); - - state Future watchFuture = - tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || - tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || - tr.watch(failedLocalityVersionKey); - wait(tr.commit()); - wait(watchFuture); - break; - } catch (Error& e) { - wait(tr.onError(e)); + double elapsed = now() - lastBusiestCommitTagPick; + // for each SS, select the busiest commit tag from ssTrTagCommitCost + for (auto it = storageQueueInfo.begin(); it != storageQueueInfo.end(); ++it) { + it->value.busiestWriteTag.reset(); + TransactionTag busiestTag; + TransactionCommitCostEstimation maxCost; + double maxRate = 0, maxBusyness = 0; + for (const auto& [tag, cost] : it->value.tagCostEst) { + double rate = cost.getCostSum() / elapsed; + if (rate > maxRate) { + busiestTag = tag; + maxRate = rate; + maxCost = cost; } } + if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { + it->value.busiestWriteTag = busiestTag; + // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps); + ASSERT(it->value.totalWriteCosts > 0); + maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts; + it->value.busiestWriteTagFractionalBusyness = maxBusyness; + it->value.busiestWriteTagRate = maxRate; + } + + TraceEvent("BusiestWriteTag", it->key) + .detail("Elapsed", elapsed) + .detail("Tag", printable(busiestTag)) + .detail("TagOps", maxCost.getOpsSum()) + .detail("TagCost", maxCost.getCostSum()) + .detail("TotalCost", it->value.totalWriteCosts) + .detail("Reported", it->value.busiestWriteTag.present()) + .trackLatest(it->value.busiestWriteTagEventHolder->trackingKey); + + // reset statistics + it->value.tagCostEst.clear(); + it->value.totalWriteOps = 0; + it->value.totalWriteCosts = 0; } + lastBusiestCommitTagPick = now(); + return Void(); } ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { - state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); - state Future timeout = Void(); - state std::vector> tlogTrackers; - state std::vector tlogInterfs; - state Promise err; - state Future collection = actorCollection(self.addActor.getFuture()); - - TraceEvent("RatekeeperStarting", rkInterf.id()); - self.addActor.send(waitFailureServer(rkInterf.waitFailure.getFuture())); - self.addActor.send(configurationMonitor(&self)); - - PromiseStream>> serverChanges; - self.addActor.send(monitorServerListChange(&self, serverChanges)); - self.addActor.send(trackEachStorageServer(&self, serverChanges.getFuture())); - self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); - - self.addActor.send(monitorThrottlingChanges(&self)); - RatekeeperData* selfPtr = &self; // let flow compiler capture self - self.addActor.send( - recurring([selfPtr]() { refreshStorageServerCommitCost(selfPtr); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); - - TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) - .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) - .detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) - .detail("Rate", - (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + - 2.0)); - - TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id()) - .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) - .detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) - .detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES) - .detail("Rate", - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + - 2.0)); - - tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); - tlogTrackers.reserve(tlogInterfs.size()); - for (int i = 0; i < tlogInterfs.size(); i++) { - tlogTrackers.push_back(splitError(trackTLogQueueInfo(&self, tlogInterfs[i]), err)); - } - - self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); - - try { - state bool lastLimited = false; - loop choose { - when(wait(timeout)) { - updateRate(&self, &self.normalLimits); - updateRate(&self, &self.batchLimits); - - lastLimited = self.smoothReleasedTransactions.smoothRate() > - SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; - double tooOld = now() - 1.0; - for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) { - if (p->second.lastUpdateTime < tooOld) - p = self.grvProxyInfo.erase(p); - else - ++p; - } - timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE); - } - when(GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { - GetRateInfoReply reply; - - auto& p = self.grvProxyInfo[req.requesterID]; - //TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions); - if (p.totalTransactions > 0) { - self.smoothReleasedTransactions.addDelta(req.totalReleasedTransactions - p.totalTransactions); - - for (auto tag : req.throttledTagCounts) { - self.throttledTags.addRequests(tag.first, tag.second); - } - } - if (p.batchTransactions > 0) { - self.smoothBatchReleasedTransactions.addDelta(req.batchReleasedTransactions - p.batchTransactions); - } - - p.totalTransactions = req.totalReleasedTransactions; - p.batchTransactions = req.batchReleasedTransactions; - p.lastUpdateTime = now(); - - reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size(); - reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size(); - reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE; - - if (p.lastThrottledTagChangeId != self.throttledTagChangeId || - now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { - p.lastThrottledTagChangeId = self.throttledTagChangeId; - p.lastTagPushTime = now(); - - reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled); - bool returningTagsToProxy = reply.throttledTags.present() && reply.throttledTags.get().size() > 0; - TEST(returningTagsToProxy); // Returning tag throttles to a proxy - } - - reply.healthMetrics.update(self.healthMetrics, true, req.detailed); - reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit; - reply.healthMetrics.batchLimited = lastLimited; - - req.reply.send(reply); - } - when(HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) { - req.reply.send(Void()); - TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID); - break; - } - when(ReportCommitCostEstimationRequest req = waitNext(rkInterf.reportCommitCostEstimation.getFuture())) { - updateCommitCostEstimation(&self, req.ssTrTagCommitCost); - req.reply.send(Void()); - } - when(wait(err.getFuture())) {} - when(wait(dbInfo->onChange())) { - if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) { - tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); - tlogTrackers = std::vector>(); - for (int i = 0; i < tlogInterfs.size(); i++) - tlogTrackers.push_back(splitError(trackTLogQueueInfo(&self, tlogInterfs[i]), err)); - } - self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); - } - when(wait(collection)) { - ASSERT(false); - throw internal_error(); - } - } - } catch (Error& err) { - TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true); - } + wait(Ratekeeper::run(rkInterf, dbInfo)); return Void(); } diff --git a/fdbserver/Ratekeeper.h b/fdbserver/Ratekeeper.h new file mode 100644 index 0000000000..8552eeb521 --- /dev/null +++ b/fdbserver/Ratekeeper.h @@ -0,0 +1,207 @@ +/* + * Ratekeeper.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/DatabaseConfiguration.h" +#include "fdbclient/DatabaseContext.h" +#include "fdbclient/StorageServerInterface.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbrpc/Smoother.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/RatekeeperInterface.h" +#include "fdbserver/ServerDBInfo.h" +#include "fdbserver/TLogInterface.h" + +enum limitReason_t { + unlimited, // TODO: rename to workload? + storage_server_write_queue_size, // 1 + storage_server_write_bandwidth_mvcc, + storage_server_readable_behind, + log_server_mvcc_write_bandwidth, + log_server_write_queue, // 5 + storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space + storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio + log_server_min_free_space, + log_server_min_free_space_ratio, + storage_server_durability_lag, // 10 + storage_server_list_fetch_failed, + limitReason_t_end +}; + +struct StorageQueueInfo { + bool valid; + UID id; + LocalityData locality; + StorageQueuingMetricsReply lastReply; + StorageQueuingMetricsReply prevReply; + Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; + Smoother smoothDurableVersion, smoothLatestVersion; + Smoother smoothFreeSpace; + Smoother smoothTotalSpace; + limitReason_t limitReason; + + Optional busiestReadTag, busiestWriteTag; + double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; + double busiestReadTagRate = 0, busiestWriteTagRate = 0; + + Reference busiestWriteTagEventHolder; + + // refresh periodically + TransactionTagMap tagCostEst; + uint64_t totalWriteCosts = 0; + int totalWriteOps = 0; + + StorageQueueInfo(UID id, LocalityData locality) + : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), + limitReason(limitReason_t::unlimited), + busiestWriteTagEventHolder(makeReference(id.toString() + "/BusiestWriteTag")) { + // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo + lastReply.instanceID = -1; + } +}; + +struct TLogQueueInfo { + bool valid; + UID id; + TLogQueuingMetricsReply lastReply; + TLogQueuingMetricsReply prevReply; + Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; + Smoother smoothFreeSpace; + Smoother smoothTotalSpace; + TLogQueueInfo(UID id) + : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), + smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), + smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { + // FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from + // storageQueueInfO) + lastReply.instanceID = -1; + } +}; + +struct RatekeeperLimits { + double tpsLimit; + Int64MetricHandle tpsLimitMetric; + Int64MetricHandle reasonMetric; + + int64_t storageTargetBytes; + int64_t storageSpringBytes; + int64_t logTargetBytes; + int64_t logSpringBytes; + double maxVersionDifference; + + int64_t durabilityLagTargetVersions; + int64_t lastDurabilityLag; + double durabilityLagLimit; + + TransactionPriority priority; + std::string context; + + Reference rkUpdateEventCacheHolder; + + RatekeeperLimits(TransactionPriority priority, + std::string context, + int64_t storageTargetBytes, + int64_t storageSpringBytes, + int64_t logTargetBytes, + int64_t logSpringBytes, + double maxVersionDifference, + int64_t durabilityLagTargetVersions) + : tpsLimit(std::numeric_limits::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)), + reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes), + storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes), + maxVersionDifference(maxVersionDifference), + durabilityLagTargetVersions( + durabilityLagTargetVersions + + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not + // be durable on the storage servers + lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits::infinity()), priority(priority), + context(context), rkUpdateEventCacheHolder(makeReference("RkUpdate" + context)) {} +}; + +class Ratekeeper { + friend class RatekeeperImpl; + + // Differentiate from GrvProxyInfo in DatabaseContext.h + struct GrvProxyInfo { + int64_t totalTransactions; + int64_t batchTransactions; + uint64_t lastThrottledTagChangeId; + + double lastUpdateTime; + double lastTagPushTime; + + GrvProxyInfo() + : totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), + lastTagPushTime(0) {} + }; + + UID id; + Database db; + + Map storageQueueInfo; + Map tlogQueueInfo; + + std::map grvProxyInfo; + Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes; + HealthMetrics healthMetrics; + DatabaseConfiguration configuration; + PromiseStream> addActor; + + Int64MetricHandle actualTpsMetric; + + double lastWarning; + double lastSSListFetchedTimestamp; + + std::unique_ptr tagThrottler; + + RatekeeperLimits normalLimits; + RatekeeperLimits batchLimits; + + Deque actualTpsHistory; + Optional remoteDC; + + Future expiredTagThrottleCleanup; + + double lastBusiestCommitTagPick; + + Ratekeeper(UID id, Database db); + + Future configurationMonitor(); + void updateCommitCostEstimation(UIDTransactionTagMap const& costEstimation); + void updateRate(RatekeeperLimits* limits); + Future refreshStorageServerCommitCost(); + Future monitorServerListChange(PromiseStream>> serverChanges); + Future trackEachStorageServer(FutureStream>> serverChanges); + + // SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function + Future trackStorageServerQueueInfo(StorageServerInterface); + Future trackTLogQueueInfo(TLogInterface); + + void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason); + void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag); + Future monitorThrottlingChanges(); + +public: + static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo); +}; diff --git a/fdbserver/TagThrottler.actor.cpp b/fdbserver/TagThrottler.actor.cpp new file mode 100644 index 0000000000..2e215a7c9b --- /dev/null +++ b/fdbserver/TagThrottler.actor.cpp @@ -0,0 +1,598 @@ +/* + * TagThrottler.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/TagThrottler.h" + +class RkTagThrottleCollection : NonCopyable { + struct RkTagData { + Smoother requestRate; + RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} + }; + + struct RkTagThrottleData { + ClientTagThrottleLimits limits; + Smoother clientRate; + + // Only used by auto-throttles + double created = now(); + double lastUpdated = 0; + double lastReduced = now(); + bool rateSet = false; + + RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} + + double getTargetRate(Optional requestRate) { + if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { + return limits.tpsRate; + } else { + return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal()); + } + } + + Optional updateAndGetClientRate(Optional requestRate) { + if (limits.expiration > now()) { + double targetRate = getTargetRate(requestRate); + if (targetRate == std::numeric_limits::max()) { + rateSet = false; + return targetRate; + } + if (!rateSet) { + rateSet = true; + clientRate.reset(targetRate); + } else { + clientRate.setTotal(targetRate); + } + + double rate = clientRate.smoothTotal(); + ASSERT(rate >= 0); + return rate; + } else { + TEST(true); // Get throttle rate for expired throttle + rateSet = false; + return Optional(); + } + } + }; + + void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); } + +public: + RkTagThrottleCollection() {} + + RkTagThrottleCollection(RkTagThrottleCollection&& other) { + autoThrottledTags = std::move(other.autoThrottledTags); + manualThrottledTags = std::move(other.manualThrottledTags); + tagData = std::move(other.tagData); + } + + void operator=(RkTagThrottleCollection&& other) { + autoThrottledTags = std::move(other.autoThrottledTags); + manualThrottledTags = std::move(other.manualThrottledTags); + tagData = std::move(other.tagData); + } + + double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) { + ASSERT(currentBusyness > 0); + + if (targetBusyness < 1) { + double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness); + return requestRate * targetFraction; + } else { + return std::numeric_limits::max(); + } + } + + // Returns the TPS rate if the throttle is updated, otherwise returns an empty optional + Optional autoThrottleTag(UID id, + TransactionTag const& tag, + double fractionalBusyness, + Optional tpsRate = Optional(), + Optional expiration = Optional()) { + ASSERT(!tpsRate.present() || tpsRate.get() >= 0); + ASSERT(!expiration.present() || expiration.get() > now()); + + auto itr = autoThrottledTags.find(tag); + bool present = (itr != autoThrottledTags.end()); + if (!present) { + if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) { + TEST(true); // Reached auto-throttle limit + return Optional(); + } + + itr = autoThrottledTags.try_emplace(tag).first; + initializeTag(tag); + } else if (itr->second.limits.expiration <= now()) { + TEST(true); // Re-throttling expired tag that hasn't been cleaned up + present = false; + itr->second = RkTagThrottleData(); + } + + auto& throttle = itr->second; + + if (!tpsRate.present()) { + if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) { + tpsRate = std::numeric_limits::max(); + if (present) { + return Optional(); + } + } else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) { + TEST(true); // Tag auto-throttled too quickly + return Optional(); + } else { + tpsRate = computeTargetTpsRate(fractionalBusyness, + SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, + tagData[tag].requestRate.smoothRate()); + + if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) { + TEST(true); // Tag auto-throttle rate increase attempt while active + return Optional(); + } + + throttle.lastUpdated = now(); + if (tpsRate.get() < throttle.limits.tpsRate) { + throttle.lastReduced = now(); + } + } + } + if (!expiration.present()) { + expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION; + } + + ASSERT(tpsRate.present() && tpsRate.get() >= 0); + + throttle.limits.tpsRate = tpsRate.get(); + throttle.limits.expiration = expiration.get(); + + Optional clientRate = throttle.updateAndGetClientRate(getRequestRate(tag)); + + TraceEvent("RkSetAutoThrottle", id) + .detail("Tag", tag) + .detail("TargetRate", tpsRate.get()) + .detail("Expiration", expiration.get() - now()) + .detail("ClientRate", clientRate) + .detail("Created", now() - throttle.created) + .detail("LastUpdate", now() - throttle.lastUpdated) + .detail("LastReduced", now() - throttle.lastReduced); + + if (tpsRate.get() != std::numeric_limits::max()) { + return tpsRate.get(); + } else { + return Optional(); + } + } + + void manualThrottleTag(UID id, + TransactionTag const& tag, + TransactionPriority priority, + double tpsRate, + double expiration, + Optional const& oldLimits) { + ASSERT(tpsRate >= 0); + ASSERT(expiration > now()); + + auto& priorityThrottleMap = manualThrottledTags[tag]; + auto result = priorityThrottleMap.try_emplace(priority); + initializeTag(tag); + ASSERT(result.second); // Updating to the map is done by copying the whole map + + result.first->second.limits.tpsRate = tpsRate; + result.first->second.limits.expiration = expiration; + + if (!oldLimits.present()) { + TEST(true); // Transaction tag manually throttled + TraceEvent("RatekeeperAddingManualThrottle", id) + .detail("Tag", tag) + .detail("Rate", tpsRate) + .detail("Priority", transactionPriorityToString(priority)) + .detail("SecondsToExpiration", expiration - now()); + } else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) { + TEST(true); // Manual transaction tag throttle updated + TraceEvent("RatekeeperUpdatingManualThrottle", id) + .detail("Tag", tag) + .detail("Rate", tpsRate) + .detail("Priority", transactionPriorityToString(priority)) + .detail("SecondsToExpiration", expiration - now()); + } + + Optional clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag)); + ASSERT(clientRate.present()); + } + + Optional getManualTagThrottleLimits(TransactionTag const& tag, + TransactionPriority priority) { + auto itr = manualThrottledTags.find(tag); + if (itr != manualThrottledTags.end()) { + auto priorityItr = itr->second.find(priority); + if (priorityItr != itr->second.end()) { + return priorityItr->second.limits; + } + } + + return Optional(); + } + + PrioritizedTransactionTagMap getClientRates(bool autoThrottlingEnabled) { + PrioritizedTransactionTagMap clientRates; + + for (auto tagItr = tagData.begin(); tagItr != tagData.end();) { + bool tagPresent = false; + + double requestRate = tagItr->second.requestRate.smoothRate(); + auto manualItr = manualThrottledTags.find(tagItr->first); + if (manualItr != manualThrottledTags.end()) { + Optional manualClientRate; + for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); + ++priority) { + auto priorityItr = manualItr->second.find(*priority); + if (priorityItr != manualItr->second.end()) { + Optional priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate); + if (!priorityClientRate.present()) { + TEST(true); // Manual priority throttle expired + priorityItr = manualItr->second.erase(priorityItr); + } else { + if (!manualClientRate.present() || + manualClientRate.get().tpsRate > priorityClientRate.get()) { + manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), + priorityItr->second.limits.expiration); + } else { + TEST(true); // Manual throttle overriden by higher priority + } + + ++priorityItr; + } + } + + if (manualClientRate.present()) { + tagPresent = true; + TEST(true); // Using manual throttle + clientRates[*priority][tagItr->first] = manualClientRate.get(); + } + } + + if (manualItr->second.empty()) { + TEST(true); // All manual throttles expired + manualThrottledTags.erase(manualItr); + break; + } + } + + auto autoItr = autoThrottledTags.find(tagItr->first); + if (autoItr != autoThrottledTags.end()) { + Optional autoClientRate = autoItr->second.updateAndGetClientRate(requestRate); + if (autoClientRate.present()) { + double adjustedRate = autoClientRate.get(); + double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; + if (now() >= rampStartTime && adjustedRate != std::numeric_limits::max()) { + TEST(true); // Tag auto-throttle ramping up + + double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS; + if (targetBusyness == 0) { + targetBusyness = 0.01; + } + + double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; + adjustedRate = + computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate); + } + + tagPresent = true; + if (autoThrottlingEnabled) { + auto result = clientRates[TransactionPriority::DEFAULT].try_emplace( + tagItr->first, adjustedRate, autoItr->second.limits.expiration); + if (!result.second && result.first->second.tpsRate > adjustedRate) { + result.first->second = + ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration); + } else { + TEST(true); // Auto throttle overriden by manual throttle + } + clientRates[TransactionPriority::BATCH][tagItr->first] = + ClientTagThrottleLimits(0, autoItr->second.limits.expiration); + } + } else { + ASSERT(autoItr->second.limits.expiration <= now()); + TEST(true); // Auto throttle expired + if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup + tagPresent = true; + } else { + autoThrottledTags.erase(autoItr); + } + } + } + + if (!tagPresent) { + TEST(true); // All tag throttles expired + tagItr = tagData.erase(tagItr); + } else { + ++tagItr; + } + } + + return clientRates; + } + + void addRequests(TransactionTag const& tag, int requests) { + if (requests > 0) { + TEST(true); // Requests reported for throttled tag + + auto tagItr = tagData.try_emplace(tag); + tagItr.first->second.requestRate.addDelta(requests); + + double requestRate = tagItr.first->second.requestRate.smoothRate(); + + auto autoItr = autoThrottledTags.find(tag); + if (autoItr != autoThrottledTags.end()) { + autoItr->second.updateAndGetClientRate(requestRate); + } + + auto manualItr = manualThrottledTags.find(tag); + if (manualItr != manualThrottledTags.end()) { + for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); + ++priorityItr) { + priorityItr->second.updateAndGetClientRate(requestRate); + } + } + } + } + + Optional getRequestRate(TransactionTag const& tag) { + auto itr = tagData.find(tag); + if (itr != tagData.end()) { + return itr->second.requestRate.smoothRate(); + } + return Optional(); + } + + int64_t autoThrottleCount() const { return autoThrottledTags.size(); } + + int64_t manualThrottleCount() const { + int64_t count = 0; + for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) { + count += itr->second.size(); + } + + return count; + } + + TransactionTagMap autoThrottledTags; + TransactionTagMap> manualThrottledTags; + TransactionTagMap tagData; + uint32_t busyReadTagCount = 0, busyWriteTagCount = 0; +}; + +class TagThrottlerImpl { + Database db; + UID id; + RkTagThrottleCollection throttledTags; + uint64_t throttledTagChangeId{ 0 }; + bool autoThrottlingEnabled{ false }; + + ACTOR static Future monitorThrottlingChanges(TagThrottlerImpl* self) { + state bool committed = false; + loop { + state ReadYourWritesTransaction tr(self->db); + + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state Future throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY); + state Future> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey); + + if (!committed) { + BinaryWriter limitWriter(Unversioned()); + limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS; + tr.set(tagThrottleLimitKey, limitWriter.toValue()); + } + + wait(success(throttledTagKeys) && success(autoThrottlingEnabled)); + + if (autoThrottlingEnabled.get().present() && + autoThrottlingEnabled.get().get() == LiteralStringRef("0")) { + TEST(true); // Auto-throttling disabled + if (self->autoThrottlingEnabled) { + TraceEvent("AutoTagThrottlingDisabled", self->id).log(); + } + self->autoThrottlingEnabled = false; + } else if (autoThrottlingEnabled.get().present() && + autoThrottlingEnabled.get().get() == LiteralStringRef("1")) { + TEST(true); // Auto-throttling enabled + if (!self->autoThrottlingEnabled) { + TraceEvent("AutoTagThrottlingEnabled", self->id).log(); + } + self->autoThrottlingEnabled = true; + } else { + TEST(true); // Auto-throttling unspecified + if (autoThrottlingEnabled.get().present()) { + TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id) + .detail("Value", autoThrottlingEnabled.get().get()); + } + self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED; + if (!committed) + tr.set(tagThrottleAutoEnabledKey, + LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0")); + } + + RkTagThrottleCollection updatedTagThrottles; + + TraceEvent("RatekeeperReadThrottledTags", self->id) + .detail("NumThrottledTags", throttledTagKeys.get().size()); + for (auto entry : throttledTagKeys.get()) { + TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key); + TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value); + + ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported + + if (tagValue.expirationTime == 0 || + tagValue.expirationTime > now() + tagValue.initialDuration) { + TEST(true); // Converting tag throttle duration to absolute time + tagValue.expirationTime = now() + tagValue.initialDuration; + BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason())); + wr << tagValue; + state Value value = wr.toValue(); + + tr.set(entry.key, value); + } + + if (tagValue.expirationTime > now()) { + TransactionTag tag = *tagKey.tags.begin(); + Optional oldLimits = + self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority); + + if (tagKey.throttleType == TagThrottleType::AUTO) { + updatedTagThrottles.autoThrottleTag( + self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); + if (tagValue.reason == TagThrottledReason::BUSY_READ) { + updatedTagThrottles.busyReadTagCount++; + } else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) { + updatedTagThrottles.busyWriteTagCount++; + } + } else { + updatedTagThrottles.manualThrottleTag(self->id, + tag, + tagKey.priority, + tagValue.tpsRate, + tagValue.expirationTime, + oldLimits); + } + } + } + + self->throttledTags = std::move(updatedTagThrottles); + ++self->throttledTagChangeId; + + state Future watchFuture = tr.watch(tagThrottleSignalKey); + wait(tr.commit()); + committed = true; + + wait(watchFuture); + TraceEvent("RatekeeperThrottleSignaled", self->id).log(); + TEST(true); // Tag throttle changes detected + break; + } catch (Error& e) { + TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e); + wait(tr.onError(e)); + } + } + } + } + + Optional autoThrottleTag(UID id, TransactionTag tag, double busyness) { + return throttledTags.autoThrottleTag(id, tag, busyness); + } + + Future tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) { + // NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE + // currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs. + if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { + TEST(true); // Transaction tag auto-throttled + Optional clientRate = autoThrottleTag(id, tag, busyness); + if (clientRate.present()) { + TagSet tags; + tags.addTag(tag); + + Reference dbRef = Reference::addRef(db.getPtr()); + return ThrottleApi::throttleTags(dbRef, + tags, + clientRate.get(), + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, + TagThrottleType::AUTO, + TransactionPriority::DEFAULT, + now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, + reason); + } + } + return Void(); + } + +public: + TagThrottlerImpl(Database db, UID id) : db(db), id(id) {} + Future monitorThrottlingChanges() { return monitorThrottlingChanges(this); } + + void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); } + uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; } + PrioritizedTransactionTagMap getClientRates() { + return throttledTags.getClientRates(autoThrottlingEnabled); + } + int64_t autoThrottleCount() const { return throttledTags.autoThrottleCount(); } + uint32_t busyReadTagCount() const { return throttledTags.busyReadTagCount; } + uint32_t busyWriteTagCount() const { return throttledTags.busyWriteTagCount; } + int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); } + bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; } + + Future tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) { + // NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In + // most of situation, this works. More indicators besides queue size and durability lag could be investigated in + // the future + if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || + storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { + if (ss.busiestWriteTag.present()) { + return tryAutoThrottleTag(ss.busiestWriteTag.get(), + ss.busiestWriteTagRate, + ss.busiestWriteTagFractionalBusyness, + TagThrottledReason::BUSY_WRITE); + } + if (ss.busiestReadTag.present()) { + return tryAutoThrottleTag(ss.busiestReadTag.get(), + ss.busiestReadTagRate, + ss.busiestReadTagFractionalBusyness, + TagThrottledReason::BUSY_READ); + } + } + return Void(); + } + +}; // class TagThrottlerImpl + +TagThrottler::TagThrottler(Database db, UID id) : impl(PImpl::create(db, id)) {} +TagThrottler::~TagThrottler() = default; +Future TagThrottler::monitorThrottlingChanges() { + return impl->monitorThrottlingChanges(); +} +void TagThrottler::addRequests(TransactionTag tag, int count) { + impl->addRequests(tag, count); +} +uint64_t TagThrottler::getThrottledTagChangeId() const { + return impl->getThrottledTagChangeId(); +} +PrioritizedTransactionTagMap TagThrottler::getClientRates() { + return impl->getClientRates(); +} +int64_t TagThrottler::autoThrottleCount() const { + return impl->autoThrottleCount(); +} +uint32_t TagThrottler::busyReadTagCount() const { + return impl->busyReadTagCount(); +} +uint32_t TagThrottler::busyWriteTagCount() const { + return impl->busyWriteTagCount(); +} +int64_t TagThrottler::manualThrottleCount() const { + return impl->manualThrottleCount(); +} +bool TagThrottler::isAutoThrottlingEnabled() const { + return impl->isAutoThrottlingEnabled(); +} +Future TagThrottler::tryAutoThrottleTag(StorageQueueInfo& ss, + int64_t storageQueue, + int64_t storageDurabilityLag) { + return impl->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag); +} diff --git a/fdbserver/TagThrottler.h b/fdbserver/TagThrottler.h new file mode 100644 index 0000000000..4070162545 --- /dev/null +++ b/fdbserver/TagThrottler.h @@ -0,0 +1,42 @@ +/* + * TagThrottler.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/PImpl.h" +#include "fdbserver/Ratekeeper.h" + +class TagThrottler { + PImpl impl; + +public: + TagThrottler(Database db, UID id); + ~TagThrottler(); + Future monitorThrottlingChanges(); + void addRequests(TransactionTag tag, int count); + uint64_t getThrottledTagChangeId() const; + PrioritizedTransactionTagMap getClientRates(); + int64_t autoThrottleCount() const; + uint32_t busyReadTagCount() const; + uint32_t busyWriteTagCount() const; + int64_t manualThrottleCount() const; + bool isAutoThrottlingEnabled() const; + Future tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag); +};