diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 079b70eae4..76d8939a11 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -28,1624 +28,8 @@ #include "fdbclient/TagThrottle.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" +#include "fdbserver/RatekeeperData.h" #include "fdbserver/RatekeeperInterface.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/WaitFailure.h" #include "flow/actorcompiler.h" // This must be the last #include. - -enum limitReason_t { - unlimited, // TODO: rename to workload? - storage_server_write_queue_size, // 1 - storage_server_write_bandwidth_mvcc, - storage_server_readable_behind, - log_server_mvcc_write_bandwidth, - log_server_write_queue, // 5 - storage_server_min_free_space, // a storage server's normal limits are being reduced by low free space - storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio - log_server_min_free_space, - log_server_min_free_space_ratio, - storage_server_durability_lag, // 10 - storage_server_list_fetch_failed, - limitReason_t_end -}; - -int limitReasonEnd = limitReason_t_end; - -const char* limitReasonName[] = { "workload", - "storage_server_write_queue_size", - "storage_server_write_bandwidth_mvcc", - "storage_server_readable_behind", - "log_server_mvcc_write_bandwidth", - "log_server_write_queue", - "storage_server_min_free_space", - "storage_server_min_free_space_ratio", - "log_server_min_free_space", - "log_server_min_free_space_ratio", - "storage_server_durability_lag", - "storage_server_list_fetch_failed" }; -static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size"); - -// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph) -// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS! -const char* limitReasonDesc[] = { "Workload or read performance.", - "Storage server performance (storage queue).", - "Storage server MVCC memory.", - "Storage server version falling behind.", - "Log server MVCC memory.", - "Storage server performance (log queue).", - "Storage server running out of space (approaching 100MB limit).", - "Storage server running out of space (approaching 5% limit).", - "Log server running out of space (approaching 100MB limit).", - "Log server running out of space (approaching 5% limit).", - "Storage server durable version falling behind.", - "Unable to fetch storage server list." }; - -static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size"); - -struct StorageQueueInfo { - bool valid; - UID id; - LocalityData locality; - StorageQueuingMetricsReply lastReply; - StorageQueuingMetricsReply prevReply; - Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; - Smoother smoothDurableVersion, smoothLatestVersion; - Smoother smoothFreeSpace; - Smoother smoothTotalSpace; - limitReason_t limitReason; - - Optional busiestReadTag, busiestWriteTag; - double busiestReadTagFractionalBusyness = 0, busiestWriteTagFractionalBusyness = 0; - double busiestReadTagRate = 0, busiestWriteTagRate = 0; - - Reference busiestWriteTagEventHolder; - - // refresh periodically - TransactionTagMap tagCostEst; - uint64_t totalWriteCosts = 0; - int totalWriteOps = 0; - - StorageQueueInfo(UID id, LocalityData locality) - : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), - limitReason(limitReason_t::unlimited), - busiestWriteTagEventHolder(makeReference(id.toString() + "/BusiestWriteTag")) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo - lastReply.instanceID = -1; - } -}; - -struct TLogQueueInfo { - bool valid; - UID id; - TLogQueuingMetricsReply lastReply; - TLogQueuingMetricsReply prevReply; - Smoother smoothDurableBytes, smoothInputBytes, verySmoothDurableBytes; - Smoother smoothFreeSpace; - Smoother smoothTotalSpace; - TLogQueueInfo(UID id) - : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) { - // FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from - // storageQueueInfO) - lastReply.instanceID = -1; - } -}; - -class RkTagThrottleCollection : NonCopyable { -private: - struct RkTagData { - Smoother requestRate; - RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - }; - - struct RkTagThrottleData { - ClientTagThrottleLimits limits; - Smoother clientRate; - - // Only used by auto-throttles - double created = now(); - double lastUpdated = 0; - double lastReduced = now(); - bool rateSet = false; - - RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - - double getTargetRate(Optional requestRate) { - if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { - return limits.tpsRate; - } else { - return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal()); - } - } - - Optional updateAndGetClientRate(Optional requestRate) { - if (limits.expiration > now()) { - double targetRate = getTargetRate(requestRate); - if (targetRate == std::numeric_limits::max()) { - rateSet = false; - return targetRate; - } - if (!rateSet) { - rateSet = true; - clientRate.reset(targetRate); - } else { - clientRate.setTotal(targetRate); - } - - double rate = clientRate.smoothTotal(); - ASSERT(rate >= 0); - return rate; - } else { - TEST(true); // Get throttle rate for expired throttle - rateSet = false; - return Optional(); - } - } - }; - - void initializeTag(TransactionTag const& tag) { tagData.try_emplace(tag); } - -public: - RkTagThrottleCollection() {} - - RkTagThrottleCollection(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - void operator=(RkTagThrottleCollection&& other) { - autoThrottledTags = std::move(other.autoThrottledTags); - manualThrottledTags = std::move(other.manualThrottledTags); - tagData = std::move(other.tagData); - } - - double computeTargetTpsRate(double currentBusyness, double targetBusyness, double requestRate) { - ASSERT(currentBusyness > 0); - - if (targetBusyness < 1) { - double targetFraction = targetBusyness * (1 - currentBusyness) / ((1 - targetBusyness) * currentBusyness); - return requestRate * targetFraction; - } else { - return std::numeric_limits::max(); - } - } - - // Returns the TPS rate if the throttle is updated, otherwise returns an empty optional - Optional autoThrottleTag(UID id, - TransactionTag const& tag, - double fractionalBusyness, - Optional tpsRate = Optional(), - Optional expiration = Optional()) { - ASSERT(!tpsRate.present() || tpsRate.get() >= 0); - ASSERT(!expiration.present() || expiration.get() > now()); - - auto itr = autoThrottledTags.find(tag); - bool present = (itr != autoThrottledTags.end()); - if (!present) { - if (autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) { - TEST(true); // Reached auto-throttle limit - return Optional(); - } - - itr = autoThrottledTags.try_emplace(tag).first; - initializeTag(tag); - } else if (itr->second.limits.expiration <= now()) { - TEST(true); // Re-throttling expired tag that hasn't been cleaned up - present = false; - itr->second = RkTagThrottleData(); - } - - auto& throttle = itr->second; - - if (!tpsRate.present()) { - if (now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) { - tpsRate = std::numeric_limits::max(); - if (present) { - return Optional(); - } - } else if (now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) { - TEST(true); // Tag auto-throttled too quickly - return Optional(); - } else { - tpsRate = computeTargetTpsRate(fractionalBusyness, - SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS, - tagData[tag].requestRate.smoothRate()); - - if (throttle.limits.expiration > now() && tpsRate.get() >= throttle.limits.tpsRate) { - TEST(true); // Tag auto-throttle rate increase attempt while active - return Optional(); - } - - throttle.lastUpdated = now(); - if (tpsRate.get() < throttle.limits.tpsRate) { - throttle.lastReduced = now(); - } - } - } - if (!expiration.present()) { - expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION; - } - - ASSERT(tpsRate.present() && tpsRate.get() >= 0); - - throttle.limits.tpsRate = tpsRate.get(); - throttle.limits.expiration = expiration.get(); - - Optional clientRate = throttle.updateAndGetClientRate(getRequestRate(tag)); - - TraceEvent("RkSetAutoThrottle", id) - .detail("Tag", tag) - .detail("TargetRate", tpsRate.get()) - .detail("Expiration", expiration.get() - now()) - .detail("ClientRate", clientRate) - .detail("Created", now() - throttle.created) - .detail("LastUpdate", now() - throttle.lastUpdated) - .detail("LastReduced", now() - throttle.lastReduced); - - if (tpsRate.get() != std::numeric_limits::max()) { - return tpsRate.get(); - } else { - return Optional(); - } - } - - void manualThrottleTag(UID id, - TransactionTag const& tag, - TransactionPriority priority, - double tpsRate, - double expiration, - Optional const& oldLimits) { - ASSERT(tpsRate >= 0); - ASSERT(expiration > now()); - - auto& priorityThrottleMap = manualThrottledTags[tag]; - auto result = priorityThrottleMap.try_emplace(priority); - initializeTag(tag); - ASSERT(result.second); // Updating to the map is done by copying the whole map - - result.first->second.limits.tpsRate = tpsRate; - result.first->second.limits.expiration = expiration; - - if (!oldLimits.present()) { - TEST(true); // Transaction tag manually throttled - TraceEvent("RatekeeperAddingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } else if (oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) { - TEST(true); // Manual transaction tag throttle updated - TraceEvent("RatekeeperUpdatingManualThrottle", id) - .detail("Tag", tag) - .detail("Rate", tpsRate) - .detail("Priority", transactionPriorityToString(priority)) - .detail("SecondsToExpiration", expiration - now()); - } - - Optional clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag)); - ASSERT(clientRate.present()); - } - - Optional getManualTagThrottleLimits(TransactionTag const& tag, - TransactionPriority priority) { - auto itr = manualThrottledTags.find(tag); - if (itr != manualThrottledTags.end()) { - auto priorityItr = itr->second.find(priority); - if (priorityItr != itr->second.end()) { - return priorityItr->second.limits; - } - } - - return Optional(); - } - - PrioritizedTransactionTagMap getClientRates(bool autoThrottlingEnabled) { - PrioritizedTransactionTagMap clientRates; - - for (auto tagItr = tagData.begin(); tagItr != tagData.end();) { - bool tagPresent = false; - - double requestRate = tagItr->second.requestRate.smoothRate(); - auto manualItr = manualThrottledTags.find(tagItr->first); - if (manualItr != manualThrottledTags.end()) { - Optional manualClientRate; - for (auto priority = allTransactionPriorities.rbegin(); !(priority == allTransactionPriorities.rend()); - ++priority) { - auto priorityItr = manualItr->second.find(*priority); - if (priorityItr != manualItr->second.end()) { - Optional priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate); - if (!priorityClientRate.present()) { - TEST(true); // Manual priority throttle expired - priorityItr = manualItr->second.erase(priorityItr); - } else { - if (!manualClientRate.present() || - manualClientRate.get().tpsRate > priorityClientRate.get()) { - manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), - priorityItr->second.limits.expiration); - } else { - TEST(true); // Manual throttle overriden by higher priority - } - - ++priorityItr; - } - } - - if (manualClientRate.present()) { - tagPresent = true; - TEST(true); // Using manual throttle - clientRates[*priority][tagItr->first] = manualClientRate.get(); - } - } - - if (manualItr->second.empty()) { - TEST(true); // All manual throttles expired - manualThrottledTags.erase(manualItr); - break; - } - } - - auto autoItr = autoThrottledTags.find(tagItr->first); - if (autoItr != autoThrottledTags.end()) { - Optional autoClientRate = autoItr->second.updateAndGetClientRate(requestRate); - if (autoClientRate.present()) { - double adjustedRate = autoClientRate.get(); - double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - if (now() >= rampStartTime && adjustedRate != std::numeric_limits::max()) { - TEST(true); // Tag auto-throttle ramping up - - double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS; - if (targetBusyness == 0) { - targetBusyness = 0.01; - } - - double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME; - adjustedRate = - computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate); - } - - tagPresent = true; - if (autoThrottlingEnabled) { - auto result = clientRates[TransactionPriority::DEFAULT].try_emplace( - tagItr->first, adjustedRate, autoItr->second.limits.expiration); - if (!result.second && result.first->second.tpsRate > adjustedRate) { - result.first->second = - ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration); - } else { - TEST(true); // Auto throttle overriden by manual throttle - } - clientRates[TransactionPriority::BATCH][tagItr->first] = - ClientTagThrottleLimits(0, autoItr->second.limits.expiration); - } - } else { - ASSERT(autoItr->second.limits.expiration <= now()); - TEST(true); // Auto throttle expired - if (BUGGIFY) { // Temporarily extend the window between expiration and cleanup - tagPresent = true; - } else { - autoThrottledTags.erase(autoItr); - } - } - } - - if (!tagPresent) { - TEST(true); // All tag throttles expired - tagItr = tagData.erase(tagItr); - } else { - ++tagItr; - } - } - - return clientRates; - } - - void addRequests(TransactionTag const& tag, int requests) { - if (requests > 0) { - TEST(true); // Requests reported for throttled tag - - auto tagItr = tagData.try_emplace(tag); - tagItr.first->second.requestRate.addDelta(requests); - - double requestRate = tagItr.first->second.requestRate.smoothRate(); - - auto autoItr = autoThrottledTags.find(tag); - if (autoItr != autoThrottledTags.end()) { - autoItr->second.updateAndGetClientRate(requestRate); - } - - auto manualItr = manualThrottledTags.find(tag); - if (manualItr != manualThrottledTags.end()) { - for (auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); - ++priorityItr) { - priorityItr->second.updateAndGetClientRate(requestRate); - } - } - } - } - - Optional getRequestRate(TransactionTag const& tag) { - auto itr = tagData.find(tag); - if (itr != tagData.end()) { - return itr->second.requestRate.smoothRate(); - } - return Optional(); - } - - int64_t autoThrottleCount() const { return autoThrottledTags.size(); } - - int64_t manualThrottleCount() const { - int64_t count = 0; - for (auto itr = manualThrottledTags.begin(); itr != manualThrottledTags.end(); ++itr) { - count += itr->second.size(); - } - - return count; - } - - TransactionTagMap autoThrottledTags; - TransactionTagMap> manualThrottledTags; - TransactionTagMap tagData; - uint32_t busyReadTagCount = 0, busyWriteTagCount = 0; -}; - -struct RatekeeperLimits { - double tpsLimit; - Int64MetricHandle tpsLimitMetric; - Int64MetricHandle reasonMetric; - - int64_t storageTargetBytes; - int64_t storageSpringBytes; - int64_t logTargetBytes; - int64_t logSpringBytes; - double maxVersionDifference; - - int64_t durabilityLagTargetVersions; - int64_t lastDurabilityLag; - double durabilityLagLimit; - - TransactionPriority priority; - std::string context; - - Reference rkUpdateEventCacheHolder; - - RatekeeperLimits(TransactionPriority priority, - std::string context, - int64_t storageTargetBytes, - int64_t storageSpringBytes, - int64_t logTargetBytes, - int64_t logSpringBytes, - double maxVersionDifference, - int64_t durabilityLagTargetVersions) - : tpsLimit(std::numeric_limits::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)), - reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes), - storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes), - maxVersionDifference(maxVersionDifference), - durabilityLagTargetVersions( - durabilityLagTargetVersions + - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not - // be durable on the storage servers - lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits::infinity()), priority(priority), - context(context), rkUpdateEventCacheHolder(makeReference("RkUpdate" + context)) {} -}; - -namespace RatekeeperActorCpp { - -// Differentiate from GrvProxyInfo in DatabaseContext.h -struct GrvProxyInfo { - int64_t totalTransactions; - int64_t batchTransactions; - uint64_t lastThrottledTagChangeId; - - double lastUpdateTime; - double lastTagPushTime; - - GrvProxyInfo() - : totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), lastTagPushTime(0) { - } -}; - -} // namespace RatekeeperActorCpp - -struct RatekeeperData { - UID id; - Database db; - - Map storageQueueInfo; - Map tlogQueueInfo; - - std::map grvProxyInfo; - Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes; - HealthMetrics healthMetrics; - DatabaseConfiguration configuration; - PromiseStream> addActor; - - Int64MetricHandle actualTpsMetric; - - double lastWarning; - double lastSSListFetchedTimestamp; - double lastBusiestCommitTagPick; - - RkTagThrottleCollection throttledTags; - uint64_t throttledTagChangeId; - - RatekeeperLimits normalLimits; - RatekeeperLimits batchLimits; - - Deque actualTpsHistory; - Optional remoteDC; - - Future expiredTagThrottleCleanup; - - bool autoThrottlingEnabled; - - RatekeeperData(UID id, Database db) - : id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), - smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), - actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()), - lastBusiestCommitTagPick(0), throttledTagChangeId(0), - normalLimits(TransactionPriority::DEFAULT, - "", - SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, - SERVER_KNOBS->TARGET_BYTES_PER_TLOG, - SERVER_KNOBS->SPRING_BYTES_TLOG, - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, - SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS), - batchLimits(TransactionPriority::BATCH, - "Batch", - SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, - SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, - SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, - SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), - autoThrottlingEnabled(false) { - expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); }, - SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL); - } -}; - -// SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function -ACTOR Future trackStorageServerQueueInfo(RatekeeperData* self, StorageServerInterface ssi) { - self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality))); - state Map::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id()); - TraceEvent("RkTracking", self->id).detail("StorageServer", ssi.id()).detail("Locality", ssi.locality.toString()); - try { - loop { - ErrorOr reply = wait(ssi.getQueuingMetrics.getReplyUnlessFailedFor( - StorageQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? - if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.reset(reply.get().version); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion); - myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version); - } - - myQueueInfo->value.busiestReadTag = reply.get().busiestTag; - myQueueInfo->value.busiestReadTagFractionalBusyness = reply.get().busiestTagFractionalBusyness; - myQueueInfo->value.busiestReadTagRate = reply.get().busiestTagRate; - } else { - if (myQueueInfo->value.valid) { - TraceEvent("RkStorageServerDidNotRespond", self->id).detail("StorageServer", ssi.id()); - } - myQueueInfo->value.valid = false; - } - - wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && - IFailureMonitor::failureMonitor().onStateEqual(ssi.getQueuingMetrics.getEndpoint(), - FailureStatus(false))); - } - } catch (...) { - // including cancellation - self->storageQueueInfo.erase(myQueueInfo); - throw; - } -} - -ACTOR Future trackTLogQueueInfo(RatekeeperData* self, TLogInterface tli) { - self->tlogQueueInfo.insert(mapPair(tli.id(), TLogQueueInfo(tli.id()))); - state Map::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id()); - TraceEvent("RkTracking", self->id).detail("TransactionLog", tli.id()); - try { - loop { - ErrorOr reply = wait(tli.getQueuingMetrics.getReplyUnlessFailedFor( - TLogQueuingMetricsRequest(), 0, 0)); // SOMEDAY: or tryGetReply? - if (reply.present()) { - myQueueInfo->value.valid = true; - myQueueInfo->value.prevReply = myQueueInfo->value.lastReply; - myQueueInfo->value.lastReply = reply.get(); - if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) { - myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total); - } else { - self->smoothTotalDurableBytes.addDelta(reply.get().bytesDurable - - myQueueInfo->value.prevReply.bytesDurable); - myQueueInfo->value.smoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.verySmoothDurableBytes.setTotal(reply.get().bytesDurable); - myQueueInfo->value.smoothInputBytes.setTotal(reply.get().bytesInput); - myQueueInfo->value.smoothFreeSpace.setTotal(reply.get().storageBytes.available); - myQueueInfo->value.smoothTotalSpace.setTotal(reply.get().storageBytes.total); - } - } else { - if (myQueueInfo->value.valid) { - TraceEvent("RkTLogDidNotRespond", self->id).detail("TransactionLog", tli.id()); - } - myQueueInfo->value.valid = false; - } - - wait(delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE) && - IFailureMonitor::failureMonitor().onStateEqual(tli.getQueuingMetrics.getEndpoint(), - FailureStatus(false))); - } - } catch (...) { - // including cancellation - self->tlogQueueInfo.erase(myQueueInfo); - throw; - } -} - -ACTOR Future splitError(Future in, Promise errOut) { - try { - wait(in); - return Void(); - } catch (Error& e) { - if (e.code() != error_code_actor_cancelled && !errOut.isSet()) - errOut.sendError(e); - throw; - } -} - -ACTOR Future trackEachStorageServer( - RatekeeperData* self, - FutureStream>> serverChanges) { - state Map> actors; - state Promise err; - loop choose { - when(state std::pair> change = waitNext(serverChanges)) { - wait(delay(0)); // prevent storageServerTracker from getting cancelled while on the call stack - if (change.second.present()) { - if (!change.second.get().isTss()) { - auto& a = actors[change.first]; - a = Future(); - a = splitError(trackStorageServerQueueInfo(self, change.second.get()), err); - } - } else - actors.erase(change.first); - } - when(wait(err.getFuture())) {} - } -} - -ACTOR Future monitorServerListChange( - RatekeeperData* self, - PromiseStream>> serverChanges) { - state std::map oldServers; - state Transaction tr(self->db); - - loop { - try { - if (now() - self->lastSSListFetchedTimestamp > 2 * SERVER_KNOBS->SERVER_LIST_DELAY) { - TraceEvent(SevWarnAlways, "RatekeeperGetSSListLongLatency", self->id) - .detail("Latency", now() - self->lastSSListFetchedTimestamp); - } - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - std::vector> results = - wait(getServerListAndProcessClasses(&tr)); - self->lastSSListFetchedTimestamp = now(); - - std::map newServers; - for (const auto& [ssi, _] : results) { - const UID serverId = ssi.id(); - newServers[serverId] = ssi; - - if (oldServers.count(serverId)) { - if (ssi.getValue.getEndpoint() != oldServers[serverId].getValue.getEndpoint()) { - serverChanges.send(std::make_pair(serverId, Optional(ssi))); - } - oldServers.erase(serverId); - } else { - serverChanges.send(std::make_pair(serverId, Optional(ssi))); - } - } - - for (const auto& it : oldServers) { - serverChanges.send(std::make_pair(it.first, Optional())); - } - - oldServers.swap(newServers); - tr = Transaction(self->db); - wait(delay(SERVER_KNOBS->SERVER_LIST_DELAY)); - } catch (Error& e) { - TraceEvent("RatekeeperGetSSListError", self->id).error(e).suppressFor(1.0); - wait(tr.onError(e)); - } - } -} - -ACTOR Future monitorThrottlingChanges(RatekeeperData* self) { - state bool committed = false; - loop { - state ReadYourWritesTransaction tr(self->db); - - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - - state Future throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY); - state Future> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey); - - if (!committed) { - BinaryWriter limitWriter(Unversioned()); - limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS; - tr.set(tagThrottleLimitKey, limitWriter.toValue()); - } - - wait(success(throttledTagKeys) && success(autoThrottlingEnabled)); - - if (autoThrottlingEnabled.get().present() && - autoThrottlingEnabled.get().get() == LiteralStringRef("0")) { - TEST(true); // Auto-throttling disabled - if (self->autoThrottlingEnabled) { - TraceEvent("AutoTagThrottlingDisabled", self->id).log(); - } - self->autoThrottlingEnabled = false; - } else if (autoThrottlingEnabled.get().present() && - autoThrottlingEnabled.get().get() == LiteralStringRef("1")) { - TEST(true); // Auto-throttling enabled - if (!self->autoThrottlingEnabled) { - TraceEvent("AutoTagThrottlingEnabled", self->id).log(); - } - self->autoThrottlingEnabled = true; - } else { - TEST(true); // Auto-throttling unspecified - if (autoThrottlingEnabled.get().present()) { - TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id) - .detail("Value", autoThrottlingEnabled.get().get()); - } - self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED; - if (!committed) - tr.set(tagThrottleAutoEnabledKey, LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0")); - } - - RkTagThrottleCollection updatedTagThrottles; - - TraceEvent("RatekeeperReadThrottledTags", self->id) - .detail("NumThrottledTags", throttledTagKeys.get().size()); - for (auto entry : throttledTagKeys.get()) { - TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key); - TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value); - - ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported - - if (tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) { - TEST(true); // Converting tag throttle duration to absolute time - tagValue.expirationTime = now() + tagValue.initialDuration; - BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason())); - wr << tagValue; - state Value value = wr.toValue(); - - tr.set(entry.key, value); - } - - if (tagValue.expirationTime > now()) { - TransactionTag tag = *tagKey.tags.begin(); - Optional oldLimits = - self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority); - - if (tagKey.throttleType == TagThrottleType::AUTO) { - updatedTagThrottles.autoThrottleTag( - self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); - if (tagValue.reason == TagThrottledReason::BUSY_READ) { - updatedTagThrottles.busyReadTagCount++; - } else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) { - updatedTagThrottles.busyWriteTagCount++; - } - } else { - updatedTagThrottles.manualThrottleTag( - self->id, tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits); - } - } - } - - self->throttledTags = std::move(updatedTagThrottles); - ++self->throttledTagChangeId; - - state Future watchFuture = tr.watch(tagThrottleSignalKey); - wait(tr.commit()); - committed = true; - - wait(watchFuture); - TraceEvent("RatekeeperThrottleSignaled", self->id).log(); - TEST(true); // Tag throttle changes detected - break; - } catch (Error& e) { - TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e); - wait(tr.onError(e)); - } - } - } -} - -Future refreshStorageServerCommitCost(RatekeeperData* self) { - if (self->lastBusiestCommitTagPick == 0) { // the first call should be skipped - self->lastBusiestCommitTagPick = now(); - return Void(); - } - double elapsed = now() - self->lastBusiestCommitTagPick; - // for each SS, select the busiest commit tag from ssTrTagCommitCost - for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) { - it->value.busiestWriteTag.reset(); - TransactionTag busiestTag; - TransactionCommitCostEstimation maxCost; - double maxRate = 0, maxBusyness = 0; - for (const auto& [tag, cost] : it->value.tagCostEst) { - double rate = cost.getCostSum() / elapsed; - if (rate > maxRate) { - busiestTag = tag; - maxRate = rate; - maxCost = cost; - } - } - if (maxRate > SERVER_KNOBS->MIN_TAG_WRITE_PAGES_RATE) { - it->value.busiestWriteTag = busiestTag; - // TraceEvent("RefreshSSCommitCost").detail("TotalWriteCost", it->value.totalWriteCost).detail("TotalWriteOps",it->value.totalWriteOps); - ASSERT(it->value.totalWriteCosts > 0); - maxBusyness = double(maxCost.getCostSum()) / it->value.totalWriteCosts; - it->value.busiestWriteTagFractionalBusyness = maxBusyness; - it->value.busiestWriteTagRate = maxRate; - } - - TraceEvent("BusiestWriteTag", it->key) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagOps", maxCost.getOpsSum()) - .detail("TagCost", maxCost.getCostSum()) - .detail("TotalCost", it->value.totalWriteCosts) - .detail("Reported", it->value.busiestWriteTag.present()) - .trackLatest(it->value.busiestWriteTagEventHolder->trackingKey); - - // reset statistics - it->value.tagCostEst.clear(); - it->value.totalWriteOps = 0; - it->value.totalWriteCosts = 0; - } - self->lastBusiestCommitTagPick = now(); - return Void(); -} - -void tryAutoThrottleTag(RatekeeperData* self, - TransactionTag tag, - double rate, - double busyness, - TagThrottledReason reason) { - // NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE - // currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs. - if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { - TEST(true); // Transaction tag auto-throttled - Optional clientRate = self->throttledTags.autoThrottleTag(self->id, tag, busyness); - if (clientRate.present()) { - TagSet tags; - tags.addTag(tag); - - Reference db = Reference::addRef(self->db.getPtr()); - self->addActor.send(ThrottleApi::throttleTags(db, - tags, - clientRate.get(), - SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, - TagThrottleType::AUTO, - TransactionPriority::DEFAULT, - now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, - reason)); - } - } -} - -void tryAutoThrottleTag(RatekeeperData* self, - StorageQueueInfo& ss, - int64_t storageQueue, - int64_t storageDurabilityLag) { - // NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In most - // of situation, this works. More indicators besides queue size and durability lag could be investigated in the - // future - if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || - storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { - if (ss.busiestWriteTag.present()) { - tryAutoThrottleTag(self, - ss.busiestWriteTag.get(), - ss.busiestWriteTagRate, - ss.busiestWriteTagFractionalBusyness, - TagThrottledReason::BUSY_WRITE); - } - if (ss.busiestReadTag.present()) { - tryAutoThrottleTag(self, - ss.busiestReadTag.get(), - ss.busiestReadTagRate, - ss.busiestReadTagFractionalBusyness, - TagThrottledReason::BUSY_READ); - } - } -} - -void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { - // double controlFactor = ; // dt / eFoldingTime - - double actualTps = self->smoothReleasedTransactions.smoothRate(); - self->actualTpsMetric = (int64_t)actualTps; - // SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this - // value - actualTps = std::max(std::max(1.0, actualTps), - self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); - - if (self->actualTpsHistory.size() > SERVER_KNOBS->MAX_TPS_HISTORY_SAMPLES) { - self->actualTpsHistory.pop_front(); - } - self->actualTpsHistory.push_back(actualTps); - - limits->tpsLimit = std::numeric_limits::infinity(); - UID reasonID = UID(); - limitReason_t limitReason = limitReason_t::unlimited; - - int sscount = 0; - - int64_t worstFreeSpaceStorageServer = std::numeric_limits::max(); - int64_t worstStorageQueueStorageServer = 0; - int64_t limitingStorageQueueStorageServer = 0; - int64_t worstDurabilityLag = 0; - - std::multimap storageTpsLimitReverseIndex; - std::multimap storageDurabilityLagReverseIndex; - - std::map ssReasons; - - bool printRateKeepLimitReasonDetails = - SERVER_KNOBS->RATEKEEPER_PRINT_LIMIT_REASON && - (deterministicRandom()->random01() < SERVER_KNOBS->RATEKEEPER_LIMIT_REASON_SAMPLE_RATE); - - // Look at each storage server's write queue and local rate, compute and store the desired rate ratio - for (auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) { - auto& ss = i->value; - if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) - continue; - ++sscount; - - limitReason_t ssLimitReason = limitReason_t::unlimited; - - int64_t minFreeSpace = - std::max(SERVER_KNOBS->MIN_AVAILABLE_SPACE, - (int64_t)(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal())); - - worstFreeSpaceStorageServer = - std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace); - - int64_t springBytes = std::max( - 1, std::min(limits->storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); - int64_t targetBytes = std::max( - 1, std::min(limits->storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace)); - if (targetBytes != limits->storageTargetBytes) { - if (minFreeSpace == SERVER_KNOBS->MIN_AVAILABLE_SPACE) { - ssLimitReason = limitReason_t::storage_server_min_free_space; - } else { - ssLimitReason = limitReason_t::storage_server_min_free_space_ratio; - } - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("Reason", ssLimitReason) - .detail("SSID", ss.id) - .detail("SSSmoothTotalSpace", ss.smoothTotalSpace.smoothTotal()) - .detail("SSSmoothFreeSpace", ss.smoothFreeSpace.smoothTotal()) - .detail("TargetBytes", targetBytes) - .detail("LimitsStorageTargetBytes", limits->storageTargetBytes) - .detail("MinFreeSpace", minFreeSpace); - } - } - - int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal(); - worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue); - - int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal(); - worstDurabilityLag = std::max(worstDurabilityLag, storageDurabilityLag); - - storageDurabilityLagReverseIndex.insert(std::make_pair(-1 * storageDurabilityLag, &ss)); - - auto& ssMetrics = self->healthMetrics.storageStats[ss.id]; - ssMetrics.storageQueue = storageQueue; - ssMetrics.storageDurabilityLag = storageDurabilityLag; - ssMetrics.cpuUsage = ss.lastReply.cpuUsage; - ssMetrics.diskUsage = ss.lastReply.diskUsage; - - double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0); - - if (limits->priority == TransactionPriority::DEFAULT) { - tryAutoThrottleTag(self, ss, storageQueue, storageDurabilityLag); - } - - double inputRate = ss.smoothInputBytes.smoothRate(); - // inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE ); - - /*if( deterministicRandom()->random01() < 0.1 ) { - std::string name = "RatekeeperUpdateRate" + limits.context; - TraceEvent(name, ss.id) - .detail("MinFreeSpace", minFreeSpace) - .detail("SpringBytes", springBytes) - .detail("TargetBytes", targetBytes) - .detail("SmoothTotalSpaceTotal", ss.smoothTotalSpace.smoothTotal()) - .detail("SmoothFreeSpaceTotal", ss.smoothFreeSpace.smoothTotal()) - .detail("LastReplyBytesInput", ss.lastReply.bytesInput) - .detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal()) - .detail("TargetRateRatio", targetRateRatio) - .detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate()) - .detail("ActualTPS", actualTps) - .detail("InputRate", inputRate) - .detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate()) - .detail("B", b); - }*/ - - // Don't let any storage server use up its target bytes faster than its MVCC window! - double maxBytesPerSecond = - (targetBytes - springBytes) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0); - double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), - maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE); - if (ssLimitReason == limitReason_t::unlimited) - ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc; - - if (targetRateRatio > 0 && inputRate > 0) { - ASSERT(inputRate != 0); - double smoothedRate = - std::max(ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE); - double x = smoothedRate / (inputRate * targetRateRatio); - double lim = actualTps * x; - if (lim < limitTps) { - double oldLimitTps = limitTps; - limitTps = lim; - if (ssLimitReason == limitReason_t::unlimited || - ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc) { - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("Reason", limitReason_t::storage_server_write_queue_size) - .detail("FromReason", ssLimitReason) - .detail("SSID", ss.id) - .detail("SSSmoothTotalSpace", ss.smoothTotalSpace.smoothTotal()) - .detail("LimitsStorageTargetBytes", limits->storageTargetBytes) - .detail("LimitsStorageSpringBytes", limits->storageSpringBytes) - .detail("SSSmoothFreeSpace", ss.smoothFreeSpace.smoothTotal()) - .detail("MinFreeSpace", minFreeSpace) - .detail("SSLastReplyBytesInput", ss.lastReply.bytesInput) - .detail("SSSmoothDurableBytes", ss.smoothDurableBytes.smoothTotal()) - .detail("StorageQueue", storageQueue) - .detail("TargetBytes", targetBytes) - .detail("SpringBytes", springBytes) - .detail("SSVerySmoothDurableBytesSmoothRate", ss.verySmoothDurableBytes.smoothRate()) - .detail("SmoothedRate", smoothedRate) - .detail("X", x) - .detail("ActualTps", actualTps) - .detail("Lim", lim) - .detail("LimitTps", oldLimitTps) - .detail("InputRate", inputRate) - .detail("TargetRateRatio", targetRateRatio); - } - ssLimitReason = limitReason_t::storage_server_write_queue_size; - } - } - } - - storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss)); - - if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || - ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) { - reasonID = ss.id; - limits->tpsLimit = limitTps; - limitReason = ssLimitReason; - } - - ssReasons[ss.id] = ssLimitReason; - } - - std::set>> ignoredMachines; - for (auto ss = storageTpsLimitReverseIndex.begin(); - ss != storageTpsLimitReverseIndex.end() && ss->first < limits->tpsLimit; - ++ss) { - if (ignoredMachines.size() < - std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { - ignoredMachines.insert(ss->second->locality.zoneId()); - continue; - } - if (ignoredMachines.count(ss->second->locality.zoneId()) > 0) { - continue; - } - - limitingStorageQueueStorageServer = - ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal(); - limits->tpsLimit = ss->first; - reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst - // SS, we still report it as the limiting process - limitReason = ssReasons[reasonID]; - break; - } - - // Calculate limited durability lag - int64_t limitingDurabilityLag = 0; - - std::set>> ignoredDurabilityLagMachines; - for (auto ss = storageDurabilityLagReverseIndex.begin(); ss != storageDurabilityLagReverseIndex.end(); ++ss) { - if (ignoredDurabilityLagMachines.size() < - std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) { - ignoredDurabilityLagMachines.insert(ss->second->locality.zoneId()); - continue; - } - if (ignoredDurabilityLagMachines.count(ss->second->locality.zoneId()) > 0) { - continue; - } - - limitingDurabilityLag = -1 * ss->first; - if (limitingDurabilityLag > limits->durabilityLagTargetVersions && - self->actualTpsHistory.size() > SERVER_KNOBS->NEEDED_TPS_HISTORY_SAMPLES) { - if (limits->durabilityLagLimit == std::numeric_limits::infinity()) { - double maxTps = 0; - for (int i = 0; i < self->actualTpsHistory.size(); i++) { - maxTps = std::max(maxTps, self->actualTpsHistory[i]); - } - limits->durabilityLagLimit = SERVER_KNOBS->INITIAL_DURABILITY_LAG_MULTIPLIER * maxTps; - } - if (limitingDurabilityLag > limits->lastDurabilityLag) { - limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_REDUCTION_RATE * limits->durabilityLagLimit; - } - if (limits->durabilityLagLimit < limits->tpsLimit) { - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("SSID", ss->second->id) - .detail("Reason", limitReason_t::storage_server_durability_lag) - .detail("LimitsDurabilityLagLimit", limits->durabilityLagLimit) - .detail("LimitsTpsLimit", limits->tpsLimit) - .detail("LimitingDurabilityLag", limitingDurabilityLag) - .detail("LimitsLastDurabilityLag", limits->lastDurabilityLag); - } - limits->tpsLimit = limits->durabilityLagLimit; - limitReason = limitReason_t::storage_server_durability_lag; - } - } else if (limits->durabilityLagLimit != std::numeric_limits::infinity() && - limitingDurabilityLag > - limits->durabilityLagTargetVersions - SERVER_KNOBS->DURABILITY_LAG_UNLIMITED_THRESHOLD) { - limits->durabilityLagLimit = SERVER_KNOBS->DURABILITY_LAG_INCREASE_RATE * limits->durabilityLagLimit; - } else { - limits->durabilityLagLimit = std::numeric_limits::infinity(); - } - limits->lastDurabilityLag = limitingDurabilityLag; - break; - } - - self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer; - self->healthMetrics.limitingStorageQueue = limitingStorageQueueStorageServer; - self->healthMetrics.worstStorageDurabilityLag = worstDurabilityLag; - self->healthMetrics.limitingStorageDurabilityLag = limitingDurabilityLag; - - double writeToReadLatencyLimit = 0; - Version worstVersionLag = 0; - Version limitingVersionLag = 0; - - { - Version minSSVer = std::numeric_limits::max(); - Version minLimitingSSVer = std::numeric_limits::max(); - for (const auto& it : self->storageQueueInfo) { - auto& ss = it.value; - if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) - continue; - - minSSVer = std::min(minSSVer, ss.lastReply.version); - - // Machines that ratekeeper isn't controlling can fall arbitrarily far behind - if (ignoredMachines.count(it.value.locality.zoneId()) == 0) { - minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version); - } - } - - Version maxTLVer = std::numeric_limits::min(); - for (const auto& it : self->tlogQueueInfo) { - auto& tl = it.value; - if (!tl.valid) - continue; - maxTLVer = std::max(maxTLVer, tl.lastReply.v); - } - - if (minSSVer != std::numeric_limits::max() && maxTLVer != std::numeric_limits::min()) { - // writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed - writeToReadLatencyLimit = - ((maxTLVer - minLimitingSSVer) - limits->maxVersionDifference / 2) / (limits->maxVersionDifference / 4); - worstVersionLag = std::max((Version)0, maxTLVer - minSSVer); - limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer); - } - } - - int64_t worstFreeSpaceTLog = std::numeric_limits::max(); - int64_t worstStorageQueueTLog = 0; - int tlcount = 0; - for (auto& it : self->tlogQueueInfo) { - auto& tl = it.value; - if (!tl.valid) - continue; - ++tlcount; - - limitReason_t tlogLimitReason = limitReason_t::log_server_write_queue; - - int64_t minFreeSpace = - std::max(SERVER_KNOBS->MIN_AVAILABLE_SPACE, - (int64_t)(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO * tl.smoothTotalSpace.smoothTotal())); - - worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace); - - int64_t springBytes = std::max( - 1, std::min(limits->logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2)); - int64_t targetBytes = std::max( - 1, std::min(limits->logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace)); - if (targetBytes != limits->logTargetBytes) { - if (minFreeSpace == SERVER_KNOBS->MIN_AVAILABLE_SPACE) { - tlogLimitReason = limitReason_t::log_server_min_free_space; - } else { - tlogLimitReason = limitReason_t::log_server_min_free_space_ratio; - } - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("TLogID", tl.id) - .detail("Reason", tlogLimitReason) - .detail("TLSmoothFreeSpace", tl.smoothFreeSpace.smoothTotal()) - .detail("TLSmoothTotalSpace", tl.smoothTotalSpace.smoothTotal()) - .detail("LimitsLogTargetBytes", limits->logTargetBytes) - .detail("TargetBytes", targetBytes) - .detail("MinFreeSpace", minFreeSpace); - } - } - - int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal(); - self->healthMetrics.tLogQueue[tl.id] = queue; - int64_t b = queue - targetBytes; - worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue); - - if (tl.lastReply.bytesInput - tl.lastReply.bytesDurable > tl.lastReply.storageBytes.free - minFreeSpace / 2) { - if (now() - self->lastWarning > 5.0) { - self->lastWarning = now(); - TraceEvent(SevWarnAlways, "RkTlogMinFreeSpaceZero", self->id).detail("ReasonId", tl.id); - } - reasonID = tl.id; - limitReason = limitReason_t::log_server_min_free_space; - limits->tpsLimit = 0.0; - } - - double targetRateRatio = std::min((b + springBytes) / (double)springBytes, 2.0); - - if (writeToReadLatencyLimit > targetRateRatio) { - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("TLogID", tl.id) - .detail("Reason", limitReason_t::storage_server_readable_behind) - .detail("TLSmoothFreeSpace", tl.smoothFreeSpace.smoothTotal()) - .detail("TLSmoothTotalSpace", tl.smoothTotalSpace.smoothTotal()) - .detail("LimitsLogSpringBytes", limits->logSpringBytes) - .detail("LimitsLogTargetBytes", limits->logTargetBytes) - .detail("SpringBytes", springBytes) - .detail("TargetBytes", targetBytes) - .detail("TLLastReplyBytesInput", tl.lastReply.bytesInput) - .detail("TLSmoothDurableBytes", tl.smoothDurableBytes.smoothTotal()) - .detail("Queue", queue) - .detail("B", b) - .detail("TargetRateRatio", targetRateRatio) - .detail("WriteToReadLatencyLimit", writeToReadLatencyLimit) - .detail("MinFreeSpace", minFreeSpace) - .detail("LimitsMaxVersionDifference", limits->maxVersionDifference); - } - targetRateRatio = writeToReadLatencyLimit; - tlogLimitReason = limitReason_t::storage_server_readable_behind; - } - - double inputRate = tl.smoothInputBytes.smoothRate(); - - if (targetRateRatio > 0) { - double smoothedRate = - std::max(tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE); - double x = smoothedRate / (inputRate * targetRateRatio); - if (targetRateRatio < .75) //< FIXME: KNOB for 2.0 - x = std::max(x, 0.95); - double lim = actualTps * x; - if (lim < limits->tpsLimit) { - limits->tpsLimit = lim; - reasonID = tl.id; - limitReason = tlogLimitReason; - } - } - if (inputRate > 0) { - // Don't let any tlogs use up its target bytes faster than its MVCC window! - double x = - ((targetBytes - springBytes) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + - 2.0)) / - inputRate; - double lim = actualTps * x; - if (lim < limits->tpsLimit) { - if (printRateKeepLimitReasonDetails) { - TraceEvent("RatekeeperLimitReasonDetails") - .detail("Reason", limitReason_t::log_server_mvcc_write_bandwidth) - .detail("TLogID", tl.id) - .detail("MinFreeSpace", minFreeSpace) - .detail("TLSmoothFreeSpace", tl.smoothFreeSpace.smoothTotal()) - .detail("TLSmoothTotalSpace", tl.smoothTotalSpace.smoothTotal()) - .detail("LimitsLogSpringBytes", limits->logSpringBytes) - .detail("LimitsLogTargetBytes", limits->logTargetBytes) - .detail("SpringBytes", springBytes) - .detail("TargetBytes", targetBytes) - .detail("InputRate", inputRate) - .detail("X", x) - .detail("ActualTps", actualTps) - .detail("Lim", lim) - .detail("LimitsTpsLimit", limits->tpsLimit); - } - limits->tpsLimit = lim; - reasonID = tl.id; - limitReason = limitReason_t::log_server_mvcc_write_bandwidth; - } - } - } - - self->healthMetrics.worstTLogQueue = worstStorageQueueTLog; - - limits->tpsLimit = std::max(limits->tpsLimit, 0.0); - - if (g_network->isSimulated() && g_simulator.speedUpSimulation) { - limits->tpsLimit = std::max(limits->tpsLimit, 100.0); - } - - int64_t totalDiskUsageBytes = 0; - for (auto& t : self->tlogQueueInfo) { - if (t.value.valid) { - totalDiskUsageBytes += t.value.lastReply.storageBytes.used; - } - } - for (auto& s : self->storageQueueInfo) { - if (s.value.valid) { - totalDiskUsageBytes += s.value.lastReply.storageBytes.used; - } - } - - if (now() - self->lastSSListFetchedTimestamp > SERVER_KNOBS->STORAGE_SERVER_LIST_FETCH_TIMEOUT) { - limits->tpsLimit = 0.0; - limitReason = limitReason_t::storage_server_list_fetch_failed; - reasonID = UID(); - TraceEvent(SevWarnAlways, "RkSSListFetchTimeout", self->id).suppressFor(1.0); - } else if (limits->tpsLimit == std::numeric_limits::infinity()) { - limits->tpsLimit = SERVER_KNOBS->RATEKEEPER_DEFAULT_LIMIT; - } - - limits->tpsLimitMetric = std::min(limits->tpsLimit, 1e6); - limits->reasonMetric = limitReason; - - if (deterministicRandom()->random01() < 0.1) { - const std::string& name = limits->rkUpdateEventCacheHolder.getPtr()->trackingKey; - TraceEvent(name.c_str(), self->id) - .detail("TPSLimit", limits->tpsLimit) - .detail("Reason", limitReason) - .detail("ReasonServerID", reasonID == UID() ? std::string() : Traceable::toString(reasonID)) - .detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate()) - .detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate()) - .detail("TPSBasis", actualTps) - .detail("StorageServers", sscount) - .detail("GrvProxies", self->grvProxyInfo.size()) - .detail("TLogs", tlcount) - .detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer) - .detail("WorstFreeSpaceTLog", worstFreeSpaceTLog) - .detail("WorstStorageServerQueue", worstStorageQueueStorageServer) - .detail("LimitingStorageServerQueue", limitingStorageQueueStorageServer) - .detail("WorstTLogQueue", worstStorageQueueTLog) - .detail("TotalDiskUsageBytes", totalDiskUsageBytes) - .detail("WorstStorageServerVersionLag", worstVersionLag) - .detail("LimitingStorageServerVersionLag", limitingVersionLag) - .detail("WorstStorageServerDurabilityLag", worstDurabilityLag) - .detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag) - .detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount()) - .detail("TagsAutoThrottledBusyRead", self->throttledTags.busyReadTagCount) - .detail("TagsAutoThrottledBusyWrite", self->throttledTags.busyWriteTagCount) - .detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount()) - .detail("AutoThrottlingEnabled", self->autoThrottlingEnabled) - .trackLatest(name); - } -} - -static void updateCommitCostEstimation(RatekeeperData* self, - UIDTransactionTagMap const& costEstimation) { - for (auto it = self->storageQueueInfo.begin(); it != self->storageQueueInfo.end(); ++it) { - auto tagCostIt = costEstimation.find(it->key); - if (tagCostIt == costEstimation.end()) - continue; - for (const auto& [tagName, cost] : tagCostIt->second) { - it->value.tagCostEst[tagName] += cost; - it->value.totalWriteCosts += cost.getCostSum(); - it->value.totalWriteOps += cost.getOpsSum(); - } - } -} - -ACTOR Future configurationMonitor(RatekeeperData* self) { - loop { - state ReadYourWritesTransaction tr(self->db); - - loop { - try { - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - RangeResult results = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY); - - self->configuration.fromKeyValues((VectorRef)results); - - state Future watchFuture = - tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || - tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || - tr.watch(failedLocalityVersionKey); - wait(tr.commit()); - wait(watchFuture); - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - } -} - -ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { - state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); - state Future timeout = Void(); - state std::vector> tlogTrackers; - state std::vector tlogInterfs; - state Promise err; - state Future collection = actorCollection(self.addActor.getFuture()); - - TraceEvent("RatekeeperStarting", rkInterf.id()); - self.addActor.send(waitFailureServer(rkInterf.waitFailure.getFuture())); - self.addActor.send(configurationMonitor(&self)); - - PromiseStream>> serverChanges; - self.addActor.send(monitorServerListChange(&self, serverChanges)); - self.addActor.send(trackEachStorageServer(&self, serverChanges.getFuture())); - self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); - - self.addActor.send(monitorThrottlingChanges(&self)); - RatekeeperData* selfPtr = &self; // let flow compiler capture self - self.addActor.send( - recurring([selfPtr]() { refreshStorageServerCommitCost(selfPtr); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); - - TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) - .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) - .detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) - .detail("Rate", - (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + - 2.0)); - - TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id()) - .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) - .detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) - .detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES) - .detail("Rate", - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / - ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + - 2.0)); - - tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); - tlogTrackers.reserve(tlogInterfs.size()); - for (int i = 0; i < tlogInterfs.size(); i++) { - tlogTrackers.push_back(splitError(trackTLogQueueInfo(&self, tlogInterfs[i]), err)); - } - - self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); - - try { - state bool lastLimited = false; - loop choose { - when(wait(timeout)) { - updateRate(&self, &self.normalLimits); - updateRate(&self, &self.batchLimits); - - lastLimited = self.smoothReleasedTransactions.smoothRate() > - SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; - double tooOld = now() - 1.0; - for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) { - if (p->second.lastUpdateTime < tooOld) - p = self.grvProxyInfo.erase(p); - else - ++p; - } - timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE); - } - when(GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { - GetRateInfoReply reply; - - auto& p = self.grvProxyInfo[req.requesterID]; - //TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions); - if (p.totalTransactions > 0) { - self.smoothReleasedTransactions.addDelta(req.totalReleasedTransactions - p.totalTransactions); - - for (auto tag : req.throttledTagCounts) { - self.throttledTags.addRequests(tag.first, tag.second); - } - } - if (p.batchTransactions > 0) { - self.smoothBatchReleasedTransactions.addDelta(req.batchReleasedTransactions - p.batchTransactions); - } - - p.totalTransactions = req.totalReleasedTransactions; - p.batchTransactions = req.batchReleasedTransactions; - p.lastUpdateTime = now(); - - reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size(); - reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size(); - reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE; - - if (p.lastThrottledTagChangeId != self.throttledTagChangeId || - now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { - p.lastThrottledTagChangeId = self.throttledTagChangeId; - p.lastTagPushTime = now(); - - reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled); - bool returningTagsToProxy = reply.throttledTags.present() && reply.throttledTags.get().size() > 0; - TEST(returningTagsToProxy); // Returning tag throttles to a proxy - } - - reply.healthMetrics.update(self.healthMetrics, true, req.detailed); - reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit; - reply.healthMetrics.batchLimited = lastLimited; - - req.reply.send(reply); - } - when(HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) { - req.reply.send(Void()); - TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID); - break; - } - when(ReportCommitCostEstimationRequest req = waitNext(rkInterf.reportCommitCostEstimation.getFuture())) { - updateCommitCostEstimation(&self, req.ssTrTagCommitCost); - req.reply.send(Void()); - } - when(wait(err.getFuture())) {} - when(wait(dbInfo->onChange())) { - if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) { - tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); - tlogTrackers = std::vector>(); - for (int i = 0; i < tlogInterfs.size(); i++) - tlogTrackers.push_back(splitError(trackTLogQueueInfo(&self, tlogInterfs[i]), err)); - } - self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); - } - when(wait(collection)) { - ASSERT(false); - throw internal_error(); - } - } - } catch (Error& err) { - TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true); - } - return Void(); -} diff --git a/fdbserver/RatekeeperData.actor.cpp b/fdbserver/RatekeeperData.actor.cpp index 09e4c56f1f..e644240a16 100644 --- a/fdbserver/RatekeeperData.actor.cpp +++ b/fdbserver/RatekeeperData.actor.cpp @@ -21,6 +21,39 @@ #include "fdbserver/RatekeeperData.h" #include "flow/actorcompiler.h" // must be last include +const char* limitReasonName[] = { "workload", + "storage_server_write_queue_size", + "storage_server_write_bandwidth_mvcc", + "storage_server_readable_behind", + "log_server_mvcc_write_bandwidth", + "log_server_write_queue", + "storage_server_min_free_space", + "storage_server_min_free_space_ratio", + "log_server_min_free_space", + "log_server_min_free_space_ratio", + "storage_server_durability_lag", + "storage_server_list_fetch_failed" }; +static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size"); + +int limitReasonEnd = limitReason_t_end; + +// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph) +// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS! +const char* limitReasonDesc[] = { "Workload or read performance.", + "Storage server performance (storage queue).", + "Storage server MVCC memory.", + "Storage server version falling behind.", + "Log server MVCC memory.", + "Storage server performance (log queue).", + "Storage server running out of space (approaching 100MB limit).", + "Storage server running out of space (approaching 5% limit).", + "Log server running out of space (approaching 100MB limit).", + "Log server running out of space (approaching 5% limit).", + "Storage server durable version falling behind.", + "Unable to fetch storage server list." }; + +static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size"); + ACTOR static Future splitError(Future in, Promise errOut) { try { wait(in); @@ -228,6 +261,258 @@ public: } } + ACTOR static Future monitorThrottlingChanges(RatekeeperData* self) { + state bool committed = false; + loop { + state ReadYourWritesTransaction tr(self->db); + + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state Future throttledTagKeys = tr.getRange(tagThrottleKeys, CLIENT_KNOBS->TOO_MANY); + state Future> autoThrottlingEnabled = tr.get(tagThrottleAutoEnabledKey); + + if (!committed) { + BinaryWriter limitWriter(Unversioned()); + limitWriter << SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS; + tr.set(tagThrottleLimitKey, limitWriter.toValue()); + } + + wait(success(throttledTagKeys) && success(autoThrottlingEnabled)); + + if (autoThrottlingEnabled.get().present() && + autoThrottlingEnabled.get().get() == LiteralStringRef("0")) { + TEST(true); // Auto-throttling disabled + if (self->autoThrottlingEnabled) { + TraceEvent("AutoTagThrottlingDisabled", self->id).log(); + } + self->autoThrottlingEnabled = false; + } else if (autoThrottlingEnabled.get().present() && + autoThrottlingEnabled.get().get() == LiteralStringRef("1")) { + TEST(true); // Auto-throttling enabled + if (!self->autoThrottlingEnabled) { + TraceEvent("AutoTagThrottlingEnabled", self->id).log(); + } + self->autoThrottlingEnabled = true; + } else { + TEST(true); // Auto-throttling unspecified + if (autoThrottlingEnabled.get().present()) { + TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id) + .detail("Value", autoThrottlingEnabled.get().get()); + } + self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED; + if (!committed) + tr.set(tagThrottleAutoEnabledKey, + LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0")); + } + + RkTagThrottleCollection updatedTagThrottles; + + TraceEvent("RatekeeperReadThrottledTags", self->id) + .detail("NumThrottledTags", throttledTagKeys.get().size()); + for (auto entry : throttledTagKeys.get()) { + TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key); + TagThrottleValue tagValue = TagThrottleValue::fromValue(entry.value); + + ASSERT(tagKey.tags.size() == 1); // Currently, only 1 tag per throttle is supported + + if (tagValue.expirationTime == 0 || + tagValue.expirationTime > now() + tagValue.initialDuration) { + TEST(true); // Converting tag throttle duration to absolute time + tagValue.expirationTime = now() + tagValue.initialDuration; + BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason())); + wr << tagValue; + state Value value = wr.toValue(); + + tr.set(entry.key, value); + } + + if (tagValue.expirationTime > now()) { + TransactionTag tag = *tagKey.tags.begin(); + Optional oldLimits = + self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority); + + if (tagKey.throttleType == TagThrottleType::AUTO) { + updatedTagThrottles.autoThrottleTag( + self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); + if (tagValue.reason == TagThrottledReason::BUSY_READ) { + updatedTagThrottles.busyReadTagCount++; + } else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) { + updatedTagThrottles.busyWriteTagCount++; + } + } else { + updatedTagThrottles.manualThrottleTag(self->id, + tag, + tagKey.priority, + tagValue.tpsRate, + tagValue.expirationTime, + oldLimits); + } + } + } + + self->throttledTags = std::move(updatedTagThrottles); + ++self->throttledTagChangeId; + + state Future watchFuture = tr.watch(tagThrottleSignalKey); + wait(tr.commit()); + committed = true; + + wait(watchFuture); + TraceEvent("RatekeeperThrottleSignaled", self->id).log(); + TEST(true); // Tag throttle changes detected + break; + } catch (Error& e) { + TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e); + wait(tr.onError(e)); + } + } + } + } + + ACTOR static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo) { + state RatekeeperData self(rkInterf.id(), + openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); + state Future timeout = Void(); + state std::vector> tlogTrackers; + state std::vector tlogInterfs; + state Promise err; + state Future collection = actorCollection(self.addActor.getFuture()); + + TraceEvent("RatekeeperStarting", rkInterf.id()); + self.addActor.send(waitFailureServer(rkInterf.waitFailure.getFuture())); + self.addActor.send(self.configurationMonitor()); + + PromiseStream>> serverChanges; + self.addActor.send(self.monitorServerListChange(serverChanges)); + self.addActor.send(self.trackEachStorageServer(serverChanges.getFuture())); + self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id())); + + self.addActor.send(self.monitorThrottlingChanges()); + RatekeeperData* selfPtr = &self; // let flow compiler capture self + self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); }, + SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + + TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) + .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG) + .detail("Spring", SERVER_KNOBS->SPRING_BYTES_TLOG) + .detail( + "Rate", + (SERVER_KNOBS->TARGET_BYTES_PER_TLOG - SERVER_KNOBS->SPRING_BYTES_TLOG) / + ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + + 2.0)); + + TraceEvent("RkStorageServerQueueSizeParameters", rkInterf.id()) + .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) + .detail("Spring", SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) + .detail("EBrake", SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES) + .detail( + "Rate", + (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER) / + ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) / SERVER_KNOBS->VERSIONS_PER_SECOND) + + 2.0)); + + tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); + tlogTrackers.reserve(tlogInterfs.size()); + for (int i = 0; i < tlogInterfs.size(); i++) { + tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err)); + } + + self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); + + try { + state bool lastLimited = false; + loop choose { + when(wait(timeout)) { + self.updateRate(&self.normalLimits); + self.updateRate(&self.batchLimits); + + lastLimited = self.smoothReleasedTransactions.smoothRate() > + SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; + double tooOld = now() - 1.0; + for (auto p = self.grvProxyInfo.begin(); p != self.grvProxyInfo.end();) { + if (p->second.lastUpdateTime < tooOld) + p = self.grvProxyInfo.erase(p); + else + ++p; + } + timeout = delayJittered(SERVER_KNOBS->METRIC_UPDATE_RATE); + } + when(GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) { + GetRateInfoReply reply; + + auto& p = self.grvProxyInfo[req.requesterID]; + //TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.totalTransactions).detail("Delta", req.totalReleasedTransactions - p.totalTransactions); + if (p.totalTransactions > 0) { + self.smoothReleasedTransactions.addDelta(req.totalReleasedTransactions - p.totalTransactions); + + for (auto tag : req.throttledTagCounts) { + self.throttledTags.addRequests(tag.first, tag.second); + } + } + if (p.batchTransactions > 0) { + self.smoothBatchReleasedTransactions.addDelta(req.batchReleasedTransactions - + p.batchTransactions); + } + + p.totalTransactions = req.totalReleasedTransactions; + p.batchTransactions = req.batchReleasedTransactions; + p.lastUpdateTime = now(); + + reply.transactionRate = self.normalLimits.tpsLimit / self.grvProxyInfo.size(); + reply.batchTransactionRate = self.batchLimits.tpsLimit / self.grvProxyInfo.size(); + reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE; + + if (p.lastThrottledTagChangeId != self.throttledTagChangeId || + now() > p.lastTagPushTime + SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL) { + p.lastThrottledTagChangeId = self.throttledTagChangeId; + p.lastTagPushTime = now(); + + reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled); + bool returningTagsToProxy = + reply.throttledTags.present() && reply.throttledTags.get().size() > 0; + TEST(returningTagsToProxy); // Returning tag throttles to a proxy + } + + reply.healthMetrics.update(self.healthMetrics, true, req.detailed); + reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit; + reply.healthMetrics.batchLimited = lastLimited; + + req.reply.send(reply); + } + when(HaltRatekeeperRequest req = waitNext(rkInterf.haltRatekeeper.getFuture())) { + req.reply.send(Void()); + TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID); + break; + } + when(ReportCommitCostEstimationRequest req = + waitNext(rkInterf.reportCommitCostEstimation.getFuture())) { + self.updateCommitCostEstimation(req.ssTrTagCommitCost); + req.reply.send(Void()); + } + when(wait(err.getFuture())) {} + when(wait(dbInfo->onChange())) { + if (tlogInterfs != dbInfo->get().logSystemConfig.allLocalLogs()) { + tlogInterfs = dbInfo->get().logSystemConfig.allLocalLogs(); + tlogTrackers = std::vector>(); + for (int i = 0; i < tlogInterfs.size(); i++) + tlogTrackers.push_back(splitError(self.trackTLogQueueInfo(tlogInterfs[i]), err)); + } + self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId(); + } + when(wait(collection)) { + ASSERT(false); + throw internal_error(); + } + } + } catch (Error& err) { + TraceEvent("RatekeeperDied", rkInterf.id()).error(err, true); + } + return Void(); + } + }; // class RatekeeperDataImpl Future RatekeeperData::configurationMonitor() { @@ -252,6 +537,14 @@ Future RatekeeperData::trackTLogQueueInfo(TLogInterface tli) { return RatekeeperDataImpl::trackTLogQueueInfo(this, tli); } +Future RatekeeperData::monitorThrottlingChanges() { + return RatekeeperDataImpl::monitorThrottlingChanges(this); +} + +Future RatekeeperData::run(RatekeeperInterface rkInterf, Reference const> dbInfo) { + return RatekeeperDataImpl::run(rkInterf, dbInfo); +} + RatekeeperData::RatekeeperData(UID id, Database db) : id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), @@ -863,3 +1156,8 @@ void RatekeeperData::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQue } } } + +ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference const> dbInfo) { + wait(RatekeeperData::run(rkInterf, dbInfo)); + return Void(); +} diff --git a/fdbserver/RatekeeperData.h b/fdbserver/RatekeeperData.h index 1247e03a0e..4340796a2e 100644 --- a/fdbserver/RatekeeperData.h +++ b/fdbserver/RatekeeperData.h @@ -563,6 +563,7 @@ struct RatekeeperData { void tryAutoThrottleTag(TransactionTag, double rate, double busyness, TagThrottledReason); void tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag); + Future monitorThrottlingChanges(); static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo); };