2020-04-10 07:55:56 +08:00
|
|
|
/*
|
2021-08-11 09:07:36 +08:00
|
|
|
* TagThrottle.actor.h
|
2020-04-10 07:55:56 +08:00
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2021-09-10 06:00:36 +08:00
|
|
|
#include "flow/Arena.h"
|
2021-08-11 09:07:36 +08:00
|
|
|
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_TAG_THROTTLE_ACTOR_G_H)
|
|
|
|
#define FDBCLIENT_TAG_THROTTLE_ACTOR_G_H
|
|
|
|
#include "fdbclient/TagThrottle.actor.g.h"
|
|
|
|
#elif !defined(FDBCLIENT_TAG_THROTTLE_ACTOR_H)
|
|
|
|
#define FDBCLIENT_TAG_THROTTLE_ACTOR_H
|
2020-04-10 07:55:56 +08:00
|
|
|
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include "flow/Error.h"
|
|
|
|
#include "flow/flow.h"
|
|
|
|
#include "flow/network.h"
|
2021-08-11 09:07:36 +08:00
|
|
|
#include "flow/ThreadHelper.actor.h"
|
|
|
|
#include "fdbclient/FDBOptions.g.h"
|
2020-04-10 07:55:56 +08:00
|
|
|
#include "fdbclient/FDBTypes.h"
|
2021-08-11 09:07:36 +08:00
|
|
|
#include "fdbclient/CommitTransaction.h"
|
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2020-04-17 23:07:01 +08:00
|
|
|
typedef StringRef TransactionTagRef;
|
|
|
|
typedef Standalone<TransactionTagRef> TransactionTag;
|
2020-04-10 07:55:56 +08:00
|
|
|
|
|
|
|
class TagSet {
|
|
|
|
public:
|
2020-06-15 06:23:54 +08:00
|
|
|
typedef std::vector<TransactionTagRef>::const_iterator const_iterator;
|
2020-04-10 07:55:56 +08:00
|
|
|
|
|
|
|
TagSet() : bytes(0) {}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void addTag(TransactionTagRef tag);
|
2020-04-24 11:50:40 +08:00
|
|
|
size_t size() const;
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
const_iterator begin() const { return tags.begin(); }
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
const_iterator end() const { return tags.end(); }
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2020-07-20 11:17:46 +08:00
|
|
|
void clear() {
|
|
|
|
tags.clear();
|
|
|
|
bytes = 0;
|
|
|
|
}
|
2020-04-10 07:55:56 +08:00
|
|
|
|
|
|
|
template <class Context>
|
2020-06-15 05:03:45 +08:00
|
|
|
void save(uint8_t* out, Context& c) const {
|
2021-03-11 02:06:03 +08:00
|
|
|
uint8_t* start = out;
|
2020-06-15 05:03:45 +08:00
|
|
|
for (const auto& tag : *this) {
|
2020-05-03 06:58:45 +08:00
|
|
|
*(out++) = (uint8_t)tag.size();
|
2020-04-10 07:55:56 +08:00
|
|
|
|
|
|
|
std::copy(tag.begin(), tag.end(), out);
|
|
|
|
out += tag.size();
|
|
|
|
}
|
2020-05-05 06:49:55 +08:00
|
|
|
|
2020-06-15 05:03:45 +08:00
|
|
|
ASSERT((size_t)(out - start) == size() + bytes);
|
2020-04-10 07:55:56 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
template <class Context>
|
2020-06-15 05:03:45 +08:00
|
|
|
void load(const uint8_t* data, size_t size, Context& context) {
|
2021-03-11 02:06:03 +08:00
|
|
|
// const uint8_t *start = data;
|
|
|
|
const uint8_t* end = data + size;
|
|
|
|
while (data < end) {
|
2020-05-03 06:58:45 +08:00
|
|
|
uint8_t len = *(data++);
|
2020-06-15 06:23:54 +08:00
|
|
|
// Tags are already deduplicated
|
|
|
|
const auto& tag = tags.emplace_back(context.tryReadZeroCopy(data, len), len);
|
2020-04-10 07:55:56 +08:00
|
|
|
data += len;
|
2020-06-15 05:03:45 +08:00
|
|
|
bytes += tag.size();
|
2020-04-10 07:55:56 +08:00
|
|
|
}
|
|
|
|
|
2020-05-05 06:49:55 +08:00
|
|
|
ASSERT(data == end);
|
2020-05-03 06:58:45 +08:00
|
|
|
|
2020-04-23 05:15:35 +08:00
|
|
|
// Deserialized tag sets share the arena with the request that contained them
|
|
|
|
// For this reason, persisting a TagSet that shares memory with other request
|
|
|
|
// members should be done with caution.
|
2020-06-15 05:03:45 +08:00
|
|
|
arena = context.arena();
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t getBytes() const { return bytes; }
|
|
|
|
|
2020-06-15 06:23:54 +08:00
|
|
|
const Arena& getArena() const { return arena; }
|
2020-06-15 05:03:45 +08:00
|
|
|
|
|
|
|
private:
|
|
|
|
size_t bytes;
|
|
|
|
Arena arena;
|
2020-06-15 10:56:45 +08:00
|
|
|
// Currently there are never >= 256 tags, so
|
|
|
|
// std::vector is faster than std::set. This may
|
|
|
|
// change if we allow more tags in the future.
|
2020-06-15 06:23:54 +08:00
|
|
|
std::vector<TransactionTagRef> tags;
|
2020-06-15 05:03:45 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
template <>
|
|
|
|
struct dynamic_size_traits<TagSet> : std::true_type {
|
|
|
|
// May be called multiple times during one serialization
|
|
|
|
template <class Context>
|
|
|
|
static size_t size(const TagSet& t, Context&) {
|
|
|
|
return t.size() + t.getBytes();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Guaranteed to be called only once during serialization
|
|
|
|
template <class Context>
|
|
|
|
static void save(uint8_t* out, const TagSet& t, Context& c) {
|
|
|
|
t.save(out, c);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Context is an arbitrary type that is plumbed by reference throughout the
|
|
|
|
// load call tree.
|
|
|
|
template <class Context>
|
|
|
|
static void load(const uint8_t* data, size_t size, TagSet& t, Context& context) {
|
|
|
|
t.load(data, size, context);
|
2020-04-10 07:55:56 +08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
enum class TagThrottleType : uint8_t { MANUAL, AUTO };
|
2020-05-16 03:47:55 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
enum class TagThrottledReason : uint8_t { UNSET = 0, MANUAL, BUSY_READ, BUSY_WRITE };
|
2020-08-20 14:26:31 +08:00
|
|
|
|
2020-04-24 11:50:40 +08:00
|
|
|
struct TagThrottleKey {
|
|
|
|
TagSet tags;
|
2020-05-16 03:47:55 +08:00
|
|
|
TagThrottleType throttleType;
|
2020-04-25 02:31:16 +08:00
|
|
|
TransactionPriority priority;
|
2020-04-24 11:50:40 +08:00
|
|
|
|
2020-05-16 03:47:55 +08:00
|
|
|
TagThrottleKey() : throttleType(TagThrottleType::MANUAL), priority(TransactionPriority::DEFAULT) {}
|
2021-03-11 02:06:03 +08:00
|
|
|
TagThrottleKey(TagSet tags, TagThrottleType throttleType, TransactionPriority priority)
|
|
|
|
: tags(tags), throttleType(throttleType), priority(priority) {}
|
2020-04-24 11:50:40 +08:00
|
|
|
|
|
|
|
Key toKey() const;
|
|
|
|
static TagThrottleKey fromKey(const KeyRef& key);
|
|
|
|
};
|
|
|
|
|
|
|
|
struct TagThrottleValue {
|
|
|
|
double tpsRate;
|
|
|
|
double expirationTime;
|
|
|
|
double initialDuration;
|
2020-08-21 12:07:25 +08:00
|
|
|
TagThrottledReason reason;
|
2020-04-11 01:12:26 +08:00
|
|
|
|
2020-08-20 14:26:31 +08:00
|
|
|
TagThrottleValue() : tpsRate(0), expirationTime(0), initialDuration(0), reason(TagThrottledReason::UNSET) {}
|
2020-08-21 12:07:25 +08:00
|
|
|
TagThrottleValue(double tpsRate, double expirationTime, double initialDuration, TagThrottledReason reason)
|
2021-03-11 02:06:03 +08:00
|
|
|
: tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration), reason(reason) {}
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2020-04-24 11:50:40 +08:00
|
|
|
static TagThrottleValue fromValue(const ValueRef& value);
|
2020-04-10 07:55:56 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
// To change this serialization, ProtocolVersion::TagThrottleValue must be updated, and downgrades need to be
|
|
|
|
// considered
|
|
|
|
template <class Ar>
|
2020-04-24 11:50:40 +08:00
|
|
|
void serialize(Ar& ar) {
|
2021-03-11 02:06:03 +08:00
|
|
|
if (ar.protocolVersion().hasTagThrottleValueReason()) {
|
2020-11-15 11:22:04 +08:00
|
|
|
serializer(ar, tpsRate, expirationTime, initialDuration, reason);
|
2021-03-11 02:06:03 +08:00
|
|
|
} else if (ar.protocolVersion().hasTagThrottleValue()) {
|
2020-08-24 05:03:26 +08:00
|
|
|
serializer(ar, tpsRate, expirationTime, initialDuration);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (ar.isDeserializing) {
|
|
|
|
reason = TagThrottledReason::UNSET;
|
2020-08-24 05:03:26 +08:00
|
|
|
}
|
|
|
|
}
|
2020-04-24 11:50:40 +08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct TagThrottleInfo {
|
|
|
|
TransactionTag tag;
|
2020-05-16 03:47:55 +08:00
|
|
|
TagThrottleType throttleType;
|
2020-04-25 02:31:16 +08:00
|
|
|
TransactionPriority priority;
|
2020-04-24 11:50:40 +08:00
|
|
|
double tpsRate;
|
|
|
|
double expirationTime;
|
|
|
|
double initialDuration;
|
2020-08-21 12:07:25 +08:00
|
|
|
TagThrottledReason reason;
|
2020-04-24 11:50:40 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
TagThrottleInfo(TransactionTag tag,
|
|
|
|
TagThrottleType throttleType,
|
|
|
|
TransactionPriority priority,
|
|
|
|
double tpsRate,
|
|
|
|
double expirationTime,
|
|
|
|
double initialDuration,
|
|
|
|
TagThrottledReason reason = TagThrottledReason::UNSET)
|
|
|
|
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime),
|
|
|
|
initialDuration(initialDuration), reason(reason) {}
|
2020-05-02 12:36:28 +08:00
|
|
|
|
2020-08-20 14:26:31 +08:00
|
|
|
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
|
2021-03-11 02:06:03 +08:00
|
|
|
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate),
|
|
|
|
expirationTime(value.expirationTime), initialDuration(value.initialDuration), reason(value.reason) {
|
2020-04-24 11:50:40 +08:00
|
|
|
ASSERT(key.tags.size() == 1); // Multiple tags per throttle is not currently supported
|
|
|
|
tag = *key.tags.begin();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-06-03 01:13:56 +08:00
|
|
|
struct ClientTagThrottleLimits {
|
|
|
|
double tpsRate;
|
|
|
|
double expiration;
|
|
|
|
|
|
|
|
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
|
|
|
|
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
|
|
|
|
|
|
|
|
template <class Archive>
|
|
|
|
void serialize(Archive& ar) {
|
|
|
|
// Convert expiration time to a duration to avoid clock differences
|
|
|
|
double duration = 0;
|
|
|
|
if (!ar.isDeserializing) {
|
|
|
|
duration = expiration - now();
|
|
|
|
}
|
|
|
|
|
|
|
|
serializer(ar, tpsRate, duration);
|
|
|
|
|
|
|
|
if (ar.isDeserializing) {
|
|
|
|
expiration = now() + duration;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct ClientTrCommitCostEstimation {
|
|
|
|
int opsCount = 0;
|
|
|
|
uint64_t writeCosts = 0;
|
|
|
|
std::deque<std::pair<int, uint64_t>> clearIdxCosts;
|
|
|
|
uint32_t expensiveCostEstCount = 0;
|
|
|
|
template <class Ar>
|
|
|
|
void serialize(Ar& ar) {
|
|
|
|
serializer(ar, opsCount, writeCosts, clearIdxCosts, expensiveCostEstCount);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-08-11 09:07:36 +08:00
|
|
|
// Keys to view and control tag throttling
|
|
|
|
extern const KeyRangeRef tagThrottleKeys;
|
|
|
|
extern const KeyRef tagThrottleKeysPrefix;
|
|
|
|
extern const KeyRef tagThrottleAutoKeysPrefix;
|
|
|
|
extern const KeyRef tagThrottleSignalKey;
|
|
|
|
extern const KeyRef tagThrottleAutoEnabledKey;
|
|
|
|
extern const KeyRef tagThrottleLimitKey;
|
|
|
|
extern const KeyRef tagThrottleCountKey;
|
|
|
|
|
2020-04-24 11:50:40 +08:00
|
|
|
namespace ThrottleApi {
|
2021-08-12 07:59:25 +08:00
|
|
|
|
2021-08-18 04:02:25 +08:00
|
|
|
// The template functions can be called with Native API like DatabaseContext, Transaction/ReadYourWritesTransaction
|
|
|
|
// or using IClientAPI like IDatabase, ITransaction
|
|
|
|
|
2021-08-12 07:59:25 +08:00
|
|
|
ACTOR template <class Tr>
|
|
|
|
Future<bool> getValidAutoEnabled(Reference<Tr> tr) {
|
|
|
|
state bool result;
|
|
|
|
loop {
|
2021-09-10 06:00:36 +08:00
|
|
|
// hold the returned standalone object's memory
|
|
|
|
state typename Tr::template FutureT<Optional<Value>> valueF = tr->get(tagThrottleAutoEnabledKey);
|
|
|
|
Optional<Value> value = wait(safeThreadFutureToFuture(valueF));
|
2021-08-12 07:59:25 +08:00
|
|
|
if (!value.present()) {
|
|
|
|
tr->reset();
|
|
|
|
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
|
|
|
|
continue;
|
|
|
|
} else if (value.get() == LiteralStringRef("1")) {
|
|
|
|
result = true;
|
|
|
|
} else if (value.get() == LiteralStringRef("0")) {
|
|
|
|
result = false;
|
|
|
|
} else {
|
|
|
|
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue").detail("Value", value.get());
|
|
|
|
tr->reset();
|
|
|
|
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
|
|
|
Future<std::vector<TagThrottleInfo>> getRecommendedTags(Reference<DB> db, int limit) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
try {
|
|
|
|
bool enableAuto = wait(getValidAutoEnabled(tr));
|
|
|
|
if (enableAuto) {
|
|
|
|
return std::vector<TagThrottleInfo>();
|
|
|
|
}
|
|
|
|
state typename DB::TransactionT::template FutureT<RangeResult> f =
|
|
|
|
tr->getRange(KeyRangeRef(tagThrottleAutoKeysPrefix, tagThrottleKeys.end), limit);
|
|
|
|
RangeResult throttles = wait(safeThreadFutureToFuture(f));
|
|
|
|
std::vector<TagThrottleInfo> results;
|
|
|
|
for (auto throttle : throttles) {
|
|
|
|
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key),
|
|
|
|
TagThrottleValue::fromValue(throttle.value)));
|
|
|
|
}
|
|
|
|
return results;
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
|
|
|
Future<std::vector<TagThrottleInfo>> getThrottledTags(Reference<DB> db, int limit, bool containsRecommend = false) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
state bool reportAuto = containsRecommend;
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
try {
|
|
|
|
if (!containsRecommend) {
|
|
|
|
wait(store(reportAuto, getValidAutoEnabled(tr)));
|
|
|
|
}
|
|
|
|
state typename DB::TransactionT::template FutureT<RangeResult> f = tr->getRange(
|
|
|
|
reportAuto ? tagThrottleKeys : KeyRangeRef(tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix), limit);
|
|
|
|
RangeResult throttles = wait(safeThreadFutureToFuture(f));
|
|
|
|
std::vector<TagThrottleInfo> results;
|
|
|
|
for (auto throttle : throttles) {
|
|
|
|
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key),
|
|
|
|
TagThrottleValue::fromValue(throttle.value)));
|
|
|
|
}
|
|
|
|
return results;
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
template <class Tr>
|
2021-08-11 09:07:36 +08:00
|
|
|
void signalThrottleChange(Reference<Tr> tr) {
|
|
|
|
tr->atomicOp(
|
|
|
|
tagThrottleSignalKey, LiteralStringRef("XXXXXXXXXX\x00\x00\x00\x00"), MutationRef::SetVersionstampedValue);
|
|
|
|
}
|
|
|
|
|
2021-08-12 07:59:25 +08:00
|
|
|
ACTOR template <class Tr>
|
|
|
|
Future<Void> updateThrottleCount(Reference<Tr> tr, int64_t delta) {
|
|
|
|
state typename Tr::template FutureT<Optional<Value>> countVal = tr->get(tagThrottleCountKey);
|
|
|
|
state typename Tr::template FutureT<Optional<Value>> limitVal = tr->get(tagThrottleLimitKey);
|
|
|
|
|
|
|
|
wait(success(safeThreadFutureToFuture(countVal)) && success(safeThreadFutureToFuture(limitVal)));
|
|
|
|
|
|
|
|
int64_t count = 0;
|
|
|
|
int64_t limit = 0;
|
|
|
|
|
|
|
|
if (countVal.get().present()) {
|
|
|
|
BinaryReader reader(countVal.get().get(), Unversioned());
|
|
|
|
reader >> count;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (limitVal.get().present()) {
|
|
|
|
BinaryReader reader(limitVal.get().get(), Unversioned());
|
|
|
|
reader >> limit;
|
|
|
|
}
|
|
|
|
|
|
|
|
count += delta;
|
|
|
|
|
|
|
|
if (count > limit) {
|
|
|
|
throw too_many_tag_throttles();
|
|
|
|
}
|
|
|
|
|
|
|
|
BinaryWriter writer(Unversioned());
|
|
|
|
writer << count;
|
|
|
|
|
|
|
|
tr->set(tagThrottleCountKey, writer.toValue());
|
|
|
|
return Void();
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
|
|
|
Future<bool> unthrottleMatchingThrottles(Reference<DB> db,
|
|
|
|
KeyRef beginKey,
|
|
|
|
KeyRef endKey,
|
|
|
|
Optional<TransactionPriority> priority,
|
|
|
|
bool onlyExpiredThrottles) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
|
|
|
|
state KeySelector begin = firstGreaterOrEqual(beginKey);
|
|
|
|
state KeySelector end = firstGreaterOrEqual(endKey);
|
|
|
|
|
|
|
|
state bool removed = false;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
try {
|
|
|
|
// holds memory of the RangeResult
|
|
|
|
state typename DB::TransactionT::template FutureT<RangeResult> f = tr->getRange(begin, end, 1000);
|
|
|
|
state RangeResult tags = wait(safeThreadFutureToFuture(f));
|
|
|
|
state uint64_t unthrottledTags = 0;
|
|
|
|
uint64_t manualUnthrottledTags = 0;
|
|
|
|
for (auto tag : tags) {
|
|
|
|
if (onlyExpiredThrottles) {
|
|
|
|
double expirationTime = TagThrottleValue::fromValue(tag.value).expirationTime;
|
|
|
|
if (expirationTime == 0 || expirationTime > now()) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TagThrottleKey key = TagThrottleKey::fromKey(tag.key);
|
|
|
|
if (priority.present() && key.priority != priority.get()) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (key.throttleType == TagThrottleType::MANUAL) {
|
|
|
|
++manualUnthrottledTags;
|
|
|
|
}
|
|
|
|
|
|
|
|
removed = true;
|
|
|
|
tr->clear(tag.key);
|
|
|
|
unthrottledTags++;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (manualUnthrottledTags > 0) {
|
|
|
|
wait(updateThrottleCount(tr, -manualUnthrottledTags));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (unthrottledTags > 0) {
|
|
|
|
signalThrottleChange(tr);
|
|
|
|
}
|
|
|
|
|
|
|
|
wait(safeThreadFutureToFuture(tr->commit()));
|
|
|
|
|
|
|
|
if (!tags.more) {
|
|
|
|
return removed;
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT(tags.size() > 0);
|
|
|
|
begin = KeySelector(firstGreaterThan(tags[tags.size() - 1].key), tags.arena());
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
template <class DB>
|
|
|
|
Future<bool> expire(DB db) {
|
|
|
|
return unthrottleMatchingThrottles(
|
|
|
|
db, tagThrottleKeys.begin, tagThrottleKeys.end, Optional<TransactionPriority>(), true);
|
|
|
|
}
|
|
|
|
|
|
|
|
template <class DB>
|
|
|
|
Future<bool> unthrottleAll(Reference<DB> db,
|
|
|
|
Optional<TagThrottleType> tagThrottleType,
|
|
|
|
Optional<TransactionPriority> priority) {
|
|
|
|
KeyRef begin = tagThrottleKeys.begin;
|
|
|
|
KeyRef end = tagThrottleKeys.end;
|
|
|
|
|
|
|
|
if (tagThrottleType.present() && tagThrottleType == TagThrottleType::AUTO) {
|
|
|
|
begin = tagThrottleAutoKeysPrefix;
|
|
|
|
} else if (tagThrottleType.present() && tagThrottleType == TagThrottleType::MANUAL) {
|
|
|
|
end = tagThrottleAutoKeysPrefix;
|
|
|
|
}
|
|
|
|
|
|
|
|
return unthrottleMatchingThrottles(db, begin, end, priority, false);
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
|
|
|
Future<bool> unthrottleTags(Reference<DB> db,
|
|
|
|
TagSet tags,
|
|
|
|
Optional<TagThrottleType> throttleType,
|
|
|
|
Optional<TransactionPriority> priority) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
|
|
|
|
state std::vector<Key> keys;
|
|
|
|
for (auto p : allTransactionPriorities) {
|
|
|
|
if (!priority.present() || priority.get() == p) {
|
|
|
|
if (!throttleType.present() || throttleType.get() == TagThrottleType::AUTO) {
|
|
|
|
keys.push_back(TagThrottleKey(tags, TagThrottleType::AUTO, p).toKey());
|
|
|
|
}
|
|
|
|
if (!throttleType.present() || throttleType.get() == TagThrottleType::MANUAL) {
|
|
|
|
keys.push_back(TagThrottleKey(tags, TagThrottleType::MANUAL, p).toKey());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
state bool removed = false;
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
try {
|
2021-09-10 06:00:36 +08:00
|
|
|
state std::vector<typename DB::TransactionT::template FutureT<Optional<Value>>> valueFutures;
|
2021-08-12 07:59:25 +08:00
|
|
|
state std::vector<Future<Optional<Value>>> values;
|
|
|
|
values.reserve(keys.size());
|
|
|
|
for (auto key : keys) {
|
2021-09-10 06:00:36 +08:00
|
|
|
valueFutures.push_back(tr->get(key));
|
|
|
|
values.push_back(safeThreadFutureToFuture(valueFutures.back()));
|
2021-08-12 07:59:25 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
wait(waitForAll(values));
|
|
|
|
|
|
|
|
int delta = 0;
|
|
|
|
for (int i = 0; i < values.size(); ++i) {
|
|
|
|
if (values[i].get().present()) {
|
|
|
|
if (TagThrottleKey::fromKey(keys[i]).throttleType == TagThrottleType::MANUAL) {
|
|
|
|
delta -= 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
tr->clear(keys[i]);
|
|
|
|
|
|
|
|
// Report that we are removing this tag if we ever see it present.
|
|
|
|
// This protects us from getting confused if the transaction is maybe committed.
|
|
|
|
// It's ok if someone else actually ends up removing this tag at the same time
|
|
|
|
// and we aren't the ones to actually do it.
|
|
|
|
removed = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (delta != 0) {
|
|
|
|
wait(updateThrottleCount(tr, delta));
|
|
|
|
}
|
|
|
|
if (removed) {
|
|
|
|
signalThrottleChange(tr);
|
|
|
|
wait(safeThreadFutureToFuture(tr->commit()));
|
|
|
|
}
|
|
|
|
|
|
|
|
return removed;
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
|
|
|
Future<Void> throttleTags(Reference<DB> db,
|
|
|
|
TagSet tags,
|
|
|
|
double tpsRate,
|
|
|
|
double initialDuration,
|
|
|
|
TagThrottleType throttleType,
|
|
|
|
TransactionPriority priority,
|
|
|
|
Optional<double> expirationTime = Optional<double>(),
|
|
|
|
Optional<TagThrottledReason> reason = Optional<TagThrottledReason>()) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
state Key key = TagThrottleKey(tags, throttleType, priority).toKey();
|
|
|
|
|
|
|
|
ASSERT(initialDuration > 0);
|
|
|
|
|
|
|
|
if (throttleType == TagThrottleType::MANUAL) {
|
|
|
|
reason = TagThrottledReason::MANUAL;
|
|
|
|
}
|
|
|
|
TagThrottleValue throttle(tpsRate,
|
|
|
|
expirationTime.present() ? expirationTime.get() : 0,
|
|
|
|
initialDuration,
|
|
|
|
reason.present() ? reason.get() : TagThrottledReason::UNSET);
|
|
|
|
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValueReason()));
|
|
|
|
wr << throttle;
|
|
|
|
state Value value = wr.toValue();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
try {
|
|
|
|
if (throttleType == TagThrottleType::MANUAL) {
|
2021-09-10 06:00:36 +08:00
|
|
|
// hold the returned standalone object's memory
|
|
|
|
state typename DB::TransactionT::template FutureT<Optional<Value>> oldThrottleF = tr->get(key);
|
|
|
|
Optional<Value> oldThrottle = wait(safeThreadFutureToFuture(oldThrottleF));
|
2021-08-12 07:59:25 +08:00
|
|
|
if (!oldThrottle.present()) {
|
|
|
|
wait(updateThrottleCount(tr, 1));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tr->set(key, value);
|
|
|
|
|
|
|
|
if (throttleType == TagThrottleType::MANUAL) {
|
|
|
|
signalThrottleChange(tr);
|
|
|
|
}
|
|
|
|
|
|
|
|
wait(safeThreadFutureToFuture(tr->commit()));
|
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ACTOR template <class DB>
|
2021-08-11 09:07:36 +08:00
|
|
|
Future<Void> enableAuto(Reference<DB> db, bool enabled) {
|
|
|
|
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
try {
|
2021-09-10 06:00:36 +08:00
|
|
|
// hold the returned standalone object's memory
|
|
|
|
state typename DB::TransactionT::template FutureT<Optional<Value>> valueF =
|
|
|
|
tr->get(tagThrottleAutoEnabledKey);
|
|
|
|
Optional<Value> value = wait(safeThreadFutureToFuture(valueF));
|
2021-08-11 09:07:36 +08:00
|
|
|
if (!value.present() || (enabled && value.get() != LiteralStringRef("1")) ||
|
|
|
|
(!enabled && value.get() != LiteralStringRef("0"))) {
|
|
|
|
tr->set(tagThrottleAutoEnabledKey, LiteralStringRef(enabled ? "1" : "0"));
|
|
|
|
signalThrottleChange<typename DB::TransactionT>(tr);
|
|
|
|
|
|
|
|
wait(safeThreadFutureToFuture(tr->commit()));
|
|
|
|
}
|
|
|
|
return Void();
|
|
|
|
} catch (Error& e) {
|
|
|
|
wait(safeThreadFutureToFuture(tr->onError(e)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
}; // namespace ThrottleApi
|
2020-04-24 11:50:40 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
template <class Value>
|
2020-04-18 00:06:45 +08:00
|
|
|
using TransactionTagMap = std::unordered_map<TransactionTag, Value, std::hash<TransactionTagRef>>;
|
2020-04-17 23:07:01 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
template <class Value>
|
2020-04-25 02:31:16 +08:00
|
|
|
using PrioritizedTransactionTagMap = std::map<TransactionPriority, TransactionTagMap<Value>>;
|
2020-04-17 23:07:01 +08:00
|
|
|
|
2020-08-30 03:35:31 +08:00
|
|
|
template <class Value>
|
2020-08-01 14:14:52 +08:00
|
|
|
using UIDTransactionTagMap = std::unordered_map<UID, TransactionTagMap<Value>>;
|
2021-08-12 07:59:25 +08:00
|
|
|
|
|
|
|
#include "flow/unactorcompiler.h"
|
2020-06-15 05:03:45 +08:00
|
|
|
#endif
|