Merge branch 'main' of github.com:apple/foundationdb into jfu-tenant-rename

This commit is contained in:
Jon Fu 2022-06-29 13:20:19 -07:00
commit 5e7bb0aa21
43 changed files with 1697 additions and 287 deletions

View File

@ -30,7 +30,7 @@ public:
ApiCorrectnessWorkload(const WorkloadConfig& config) : ApiWorkload(config) {}
private:
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
enum OpType { OP_INSERT, OP_GET, OP_CLEAR, OP_GET_RANGE, OP_CLEAR_RANGE, OP_COMMIT_READ, OP_LAST = OP_COMMIT_READ };
void randomCommitReadOp(TTaskFct cont) {
int numKeys = Random::get().randomInt(1, maxKeysPerTransaction);
@ -125,6 +125,71 @@ private:
});
}
void getRangeLoop(std::shared_ptr<ITransactionContext> ctx,
fdb::KeySelector begin,
fdb::KeySelector end,
std::shared_ptr<std::vector<fdb::KeyValue>> results) {
auto f = ctx->tx().getRange(begin,
end,
0 /*limit*/,
0 /*target_bytes*/,
FDB_STREAMING_MODE_WANT_ALL,
0 /*iteration*/,
false /*snapshot*/,
false /*reverse*/);
ctx->continueAfter(f, [this, ctx, f, end, results]() {
auto out = copyKeyValueArray(f.get());
results->insert(results->end(), out.first.begin(), out.first.end());
const bool more = out.second;
if (more) {
// Fetch the remaining results.
getRangeLoop(ctx, fdb::key_select::firstGreaterThan(results->back().key), end, results);
} else {
ctx->done();
}
});
}
void randomGetRangeOp(TTaskFct cont) {
auto begin = randomKey(readExistingKeysRatio);
auto end = randomKey(readExistingKeysRatio);
auto results = std::make_shared<std::vector<fdb::KeyValue>>();
execTransaction(
[this, begin, end, results](auto ctx) {
// Clear the results vector, in case the transaction is retried.
results->clear();
getRangeLoop(ctx,
fdb::key_select::firstGreaterOrEqual(begin),
fdb::key_select::firstGreaterOrEqual(end),
results);
},
[this, begin, end, results, cont]() {
auto expected = store.getRange(begin, end, results->size() + 10, false);
if (results->size() != expected.size()) {
error(fmt::format("randomGetRangeOp mismatch. expected {} keys, actual {} keys",
expected.size(),
results->size()));
} else {
auto expected_kv = expected.begin();
for (auto actual_kv : *results) {
if (actual_kv.key != expected_kv->key || actual_kv.value != expected_kv->value) {
error(fmt::format(
"randomGetRangeOp mismatch. expected key: {} actual key: {} expected value: "
"{:.80} actual value: {:.80}",
fdb::toCharsRef(expected_kv->key),
fdb::toCharsRef(actual_kv.key),
fdb::toCharsRef(expected_kv->value),
fdb::toCharsRef(actual_kv.value)));
}
expected_kv++;
}
}
schedule(cont);
});
}
void randomOperation(TTaskFct cont) {
OpType txType = (store.size() == 0) ? OP_INSERT : (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {
@ -137,6 +202,9 @@ private:
case OP_CLEAR:
randomClearOp(cont);
break;
case OP_GET_RANGE:
randomGetRangeOp(cont);
break;
case OP_CLEAR_RANGE:
randomClearRangeOp(cont);
break;

View File

@ -0,0 +1,178 @@
/*
* QuotaCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "flow/actorcompiler.h" // This must be the last include
namespace {
enum class LimitType { RESERVED, TOTAL };
enum class OpType { READ, WRITE };
Optional<TransactionTag> parseTag(StringRef token) {
if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
return {};
} else {
return token;
}
}
Optional<LimitType> parseLimitType(StringRef token) {
if (token == "reserved"_sr) {
return LimitType::RESERVED;
} else if (token == "total"_sr) {
return LimitType::TOTAL;
} else {
return {};
}
}
Optional<OpType> parseOpType(StringRef token) {
if (token == "read"_sr) {
return OpType::READ;
} else if (token == "write"_sr) {
return OpType::WRITE;
} else {
return {};
}
}
Optional<double> parseLimitValue(StringRef token) {
try {
return std::stod(token.toString());
} catch (...) {
return {};
}
}
ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, OpType opType) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(tag.withPrefix(tagQuotaPrefix));
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
if (!v.present()) {
fmt::print("<empty>\n");
} else {
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
fmt::print("{}\n", quota.totalReadQuota);
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
fmt::print("{}\n", quota.totalWriteQuota);
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
fmt::print("{}\n", quota.reservedReadQuota);
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
fmt::print("{}\n", quota.reservedWriteQuota);
}
}
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR Future<Void> setQuota(Reference<IDatabase> db,
TransactionTag tag,
LimitType limitType,
OpType opType,
double value) {
state Reference<ITransaction> tr = db->createTransaction();
state Key key = tag.withPrefix(tagQuotaPrefix);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(key);
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
ThrottleApi::TagQuotaValue quota;
if (v.present()) {
quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
}
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
quota.totalReadQuota = value;
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
quota.totalWriteQuota = value;
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
quota.reservedReadQuota = value;
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
quota.reservedWriteQuota = value;
}
ThrottleApi::setTagQuota(tr,
tag,
quota.reservedReadQuota,
quota.totalReadQuota,
quota.reservedWriteQuota,
quota.totalWriteQuota);
wait(safeThreadFutureToFuture(tr->commit()));
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
constexpr auto usage =
"quota [get <tag> [reserved|total] [read|write]|set <tag> [reserved|total] [read|write] <value>]";
bool exitFailure() {
fmt::print(usage);
return false;
}
} // namespace
namespace fdb_cli {
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
state bool result = true;
if (tokens.size() != 5 && tokens.size() != 6) {
return exitFailure();
} else {
auto tag = parseTag(tokens[2]);
auto limitType = parseLimitType(tokens[3]);
auto opType = parseOpType(tokens[4]);
if (!tag.present() || !limitType.present() || !opType.present()) {
return exitFailure();
}
if (tokens[1] == "get"_sr) {
if (tokens.size() != 5) {
return exitFailure();
}
wait(getQuota(db, tag.get(), limitType.get(), opType.get()));
return true;
} else if (tokens[1] == "set"_sr) {
if (tokens.size() != 6) {
return exitFailure();
}
auto const limitValue = parseLimitValue(tokens[5]);
if (!limitValue.present()) {
return exitFailure();
}
wait(setQuota(db, tag.get(), limitType.get(), opType.get(), limitValue.get()));
return true;
} else {
return exitFailure();
}
}
}
} // namespace fdb_cli

View File

@ -275,6 +275,8 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
} else {
fprintf(stderr, "ERROR: %s\n", errorStr.c_str());
}
return false;
}
}
}

View File

@ -509,6 +509,10 @@ void initHelp() {
CommandHelp("getversion",
"Fetch the current read version",
"Displays the current read version of the database or currently running transaction.");
helpMap["quota"] =
CommandHelp("quota",
"quota [get <tag> [reserved|total] [read|write]|set <tag> [reserved|total] [read|write] <value>]",
"Get or modify the throughput quota for the specified tag.");
helpMap["reset"] =
CommandHelp("reset",
"reset the current transaction",
@ -1468,6 +1472,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "quota")) {
bool _result = wait(makeInterruptable(quotaCommandActor(db, tokens)));
if (!_result) {
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "reset")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);

View File

@ -220,6 +220,8 @@ ACTOR Future<bool> profileCommandActor(Database db,
bool intrans);
// renametenant command
ACTOR Future<bool> renameTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// quota command
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// setclass command
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// snapshot command

View File

@ -450,16 +450,21 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options)
options.count(p + "storage_engine") == 1;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Transaction* tr) {
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
RangeResult res = wait(tr->getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY);
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>)res);
return config;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
RangeResult res = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY);
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>)res);
DatabaseConfiguration config = wait(getDatabaseConfiguration(&tr));
return config;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -5151,8 +5151,9 @@ Future<Optional<Value>> Transaction::get(const Key& key, Snapshot snapshot) {
if (!ver.isReady() || metadataVersion.isSet()) {
return metadataVersion.getFuture();
} else {
if (ver.isError())
if (ver.isError()) {
return ver.getError();
}
if (ver.get() == trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) {
return trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].second;
}
@ -5756,6 +5757,10 @@ void Transaction::resetImpl(bool generateNewSpan) {
cancelWatches();
}
TagSet const& Transaction::getTags() const {
return trState->options.tags;
}
void Transaction::reset() {
resetImpl(false);
}
@ -7060,6 +7065,25 @@ Future<ProtocolVersion> DatabaseContext::getClusterProtocol(Optional<ProtocolVer
return getClusterProtocolImpl(coordinator, expectedVersion);
}
double ClientTagThrottleData::throttleDuration() const {
if (expiration <= now()) {
return 0.0;
}
double capacity =
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
if (capacity >= 1) {
return 0.0;
}
if (tpsRate == 0) {
return std::max(0.0, expiration - now());
}
return std::min(expiration - now(), capacity / tpsRate);
}
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();

View File

@ -117,7 +117,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
init( SNAP_CREATE_MAX_TIMEOUT, isSimulated ? 70.0 : 300.0 );
init( SNAP_MINIMUM_TIME_GAP, 5.0 );
init( SNAP_NETWORK_FAILURE_RETRY_LIMIT, 10 );
init( MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, 1 );
init( MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE, 1 );
@ -181,7 +183,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
/*
The bytesRead/byteSize radio. Will be declared as read hot when larger than this. 8.0 was chosen to avoid reporting table scan as read hot.
*/
init ( SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS, 1666667 * 1000);
init ( SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS, 1666667 * 1000);
/*
The read bandwidth of a given shard needs to be larger than this value in order to be evaluated if it's read hot. The roughly 1.67MB per second is calculated as following:
- Heuristic data suggests that each storage process can do max 500K read operations per second
@ -662,6 +664,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5;
init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0;
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
init( SS_THROTTLE_TAGS_TRACKED, 1 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10);
init( GLOBAL_TAG_THROTTLING, false );
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
init( GLOBAL_TAG_THROTTLING_TRACE_INTERVAL, 5.0 );
//Storage Metrics
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );

View File

@ -18,12 +18,16 @@
* limitations under the License.
*/
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tuple.h"
#include "flow/actorcompiler.h" // has to be last include
double const ClientTagThrottleLimits::NO_EXPIRATION = std::numeric_limits<double>::max();
void TagSet::addTag(TransactionTagRef tag) {
ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION < 256); // Number of tags is encoded with a single byte
@ -124,6 +128,53 @@ TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) {
return throttleValue;
}
KeyRangeRef const tagQuotaKeys = KeyRangeRef("\xff/tagQuota/"_sr, "\xff/tagQuota0"_sr);
KeyRef const tagQuotaPrefix = tagQuotaKeys.begin;
Key ThrottleApi::getTagQuotaKey(TransactionTagRef tag) {
return tag.withPrefix(tagQuotaPrefix);
}
bool ThrottleApi::TagQuotaValue::isValid() const {
return reservedReadQuota <= totalReadQuota && reservedWriteQuota <= totalWriteQuota && reservedReadQuota >= 0 &&
reservedWriteQuota >= 0;
}
Value ThrottleApi::TagQuotaValue::toValue() const {
Tuple tuple;
tuple.appendDouble(reservedReadQuota);
tuple.appendDouble(totalReadQuota);
tuple.appendDouble(reservedWriteQuota);
tuple.appendDouble(totalWriteQuota);
return tuple.pack();
}
ThrottleApi::TagQuotaValue ThrottleApi::TagQuotaValue::fromValue(ValueRef value) {
auto tuple = Tuple::unpack(value);
if (tuple.size() != 4) {
throw invalid_throttle_quota_value();
}
TagQuotaValue result;
try {
result.reservedReadQuota = tuple.getDouble(0);
result.totalReadQuota = tuple.getDouble(1);
result.reservedWriteQuota = tuple.getDouble(2);
result.totalWriteQuota = tuple.getDouble(3);
} catch (Error& e) {
TraceEvent(SevWarnAlways, "TagQuotaValueFailedToDeserialize").error(e);
throw invalid_throttle_quota_value();
}
if (!result.isValid()) {
TraceEvent(SevWarnAlways, "TagQuotaValueInvalidQuotas")
.detail("ReservedReadQuota", result.reservedReadQuota)
.detail("TotalReadQuota", result.totalReadQuota)
.detail("ReservedWriteQuota", result.reservedWriteQuota)
.detail("TotalWriteQuota", result.totalWriteQuota);
throw invalid_throttle_quota_value();
}
return result;
}
FDB_DEFINE_BOOLEAN_PARAM(ContainsRecommended);
FDB_DEFINE_BOOLEAN_PARAM(Capitalize);

View File

@ -116,23 +116,7 @@ public:
bool canRecheck() const { return lastCheck < now() - CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL; }
double throttleDuration() const {
if (expiration <= now()) {
return 0.0;
}
double capacity =
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
if (capacity >= 1) {
return 0.0;
}
if (tpsRate == 0) {
return std::max(0.0, expiration - now());
}
return std::min(expiration - now(), capacity / tpsRate);
}
double throttleDuration() const;
};
struct WatchParameters : public ReferenceCounted<WatchParameters> {

View File

@ -41,6 +41,7 @@ standard API and some knowledge of the contents of the system key space.
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Transaction* tr);
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Database cx);
ACTOR Future<Void> waitForFullReplication(Database cx);

View File

@ -465,6 +465,7 @@ public:
Reference<TransactionState> trState;
std::vector<Reference<Watch>> watches;
TagSet const& getTags() const;
Span span;
// used in template functions as returned Future type

View File

@ -196,6 +196,7 @@ public:
Transaction& getTransaction() { return tr; }
Optional<TenantName> getTenant() { return tr.getTenant(); }
TagSet const& getTags() const { return tr.getTags(); }
// used in template functions as returned Future type
template <typename Type>

View File

@ -177,7 +177,7 @@ public:
SHARD_MIN_BYTES_PER_KSEC, // Shards with more than this bandwidth will not be merged
SHARD_SPLIT_BYTES_PER_KSEC; // When splitting a shard, it is split into pieces with less than this bandwidth
double SHARD_MAX_READ_DENSITY_RATIO;
int64_t SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS;
int64_t SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS;
double SHARD_MAX_BYTES_READ_PER_KSEC_JITTER;
double STORAGE_METRIC_TIMEOUT;
double METRIC_DELAY;
@ -564,6 +564,7 @@ public:
int64_t TLOG_RECOVER_MEMORY_LIMIT;
double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
// Tag throttling
int64_t MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
int64_t MAX_AUTO_THROTTLED_TRANSACTION_TAGS;
double MIN_TAG_COST;
@ -576,6 +577,17 @@ public:
double AUTO_TAG_THROTTLE_UPDATE_FREQUENCY;
double TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL;
bool AUTO_TAG_THROTTLING_ENABLED;
// Limit to the number of throttling tags each storage server
// will track and send to the ratekeeper
int64_t SS_THROTTLE_TAGS_TRACKED;
// Use global tag throttling strategy. i.e. throttle based on the cluster-wide
// throughput for tags and their associated quotas.
bool GLOBAL_TAG_THROTTLING;
// Minimum number of transactions per second that the global tag throttler must allow for each tag
double GLOBAL_TAG_THROTTLING_MIN_RATE;
// Used by global tag throttling counters
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
double GLOBAL_TAG_THROTTLING_TRACE_INTERVAL;
double MAX_TRANSACTIONS_PER_BYTE;
@ -603,7 +615,12 @@ public:
// disk snapshot
int64_t MAX_FORKED_PROCESS_OUTPUT;
// retry limit after network failures
int64_t SNAP_NETWORK_FAILURE_RETRY_LIMIT;
// time limit for creating snapshot
double SNAP_CREATE_MAX_TIMEOUT;
// minimum gap time between two snapshot requests for the same process
double SNAP_MINIMUM_TIME_GAP;
// Maximum number of storage servers a snapshot can fail to
// capture while still succeeding
int64_t MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE;

View File

@ -389,6 +389,8 @@ extern const KeyRef tagThrottleSignalKey;
extern const KeyRef tagThrottleAutoEnabledKey;
extern const KeyRef tagThrottleLimitKey;
extern const KeyRef tagThrottleCountKey;
extern const KeyRangeRef tagQuotaKeys;
extern const KeyRef tagQuotaPrefix;
// Log Range constant variables
// Used in the backup pipeline to track mutations

View File

@ -207,6 +207,8 @@ struct ClientTagThrottleLimits {
double tpsRate;
double expiration;
static double const NO_EXPIRATION;
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
@ -595,6 +597,38 @@ Future<Void> enableAuto(Reference<DB> db, bool enabled) {
}
}
class TagQuotaValue {
public:
double reservedReadQuota{ 0.0 };
double totalReadQuota{ 0.0 };
double reservedWriteQuota{ 0.0 };
double totalWriteQuota{ 0.0 };
bool isValid() const;
Value toValue() const;
static TagQuotaValue fromValue(ValueRef);
};
Key getTagQuotaKey(TransactionTagRef);
template <class Tr>
void setTagQuota(Reference<Tr> tr,
TransactionTagRef tag,
double reservedReadQuota,
double totalReadQuota,
double reservedWriteQuota,
double totalWriteQuota) {
TagQuotaValue tagQuotaValue;
tagQuotaValue.reservedReadQuota = reservedReadQuota;
tagQuotaValue.totalReadQuota = totalReadQuota;
tagQuotaValue.reservedWriteQuota = reservedWriteQuota;
tagQuotaValue.totalWriteQuota = totalWriteQuota;
if (!tagQuotaValue.isValid()) {
throw invalid_throttle_quota_value();
}
tr->set(getTagQuotaKey(tag), tagQuotaValue.toValue());
signalThrottleChange(tr);
}
}; // namespace ThrottleApi
template <class Value>

View File

@ -107,6 +107,10 @@ public:
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
bool isSpawnedKVProcess() const {
// SOMEDAY: use a separate bool may be better?
return name == "remote flow process";
}
bool isReliable() const {
return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0 && !failedDisk &&
(!machine || (machine->machineProcess->fault_injection_p1 == 0 &&

View File

@ -1328,7 +1328,8 @@ public:
std::vector<LocalityData> primaryLocalitiesDead, primaryLocalitiesLeft;
for (auto processInfo : getAllProcesses()) {
if (processInfo->isAvailableClass() && processInfo->locality.dcId() == dcId) {
if (!processInfo->isSpawnedKVProcess() && processInfo->isAvailableClass() &&
processInfo->locality.dcId() == dcId) {
if (processInfo->isExcluded() || processInfo->isCleared() || !processInfo->isAvailable()) {
primaryProcessesDead.add(processInfo->locality);
primaryLocalitiesDead.push_back(processInfo->locality);
@ -1348,7 +1349,6 @@ public:
if (usableRegions > 1 && remoteTLogPolicy && !primaryTLogsDead) {
primaryTLogsDead = primaryProcessesDead.validate(remoteTLogPolicy);
}
return primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
}
@ -1602,7 +1602,7 @@ public:
.detail("Protected", protectedAddresses.count(machine->address))
.backtrace();
// This will remove all the "tracked" messages that came from the machine being killed
if (machine->name != "remote flow process")
if (!machine->isSpawnedKVProcess())
latestEventCache.clear();
machine->failed = true;
} else if (kt == InjectFaults) {
@ -1631,8 +1631,7 @@ public:
} else {
ASSERT(false);
}
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting ||
machine->name == "remote flow process");
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting || machine->isSpawnedKVProcess());
}
void rebootProcess(ProcessInfo* process, KillType kt) override {
if (kt == RebootProcessAndDelete && protectedAddresses.count(process->address)) {
@ -2498,7 +2497,7 @@ ACTOR void doReboot(ISimulator::ProcessInfo* p, ISimulator::KillType kt) {
.detail("Rebooting", p->rebooting)
.detail("Reliable", p->isReliable());
return;
} else if (p->name == "remote flow process") {
} else if (p->isSpawnedKVProcess()) {
TraceEvent(SevDebug, "DoRebootFailed").detail("Name", p->name).detail("Address", p->address);
return;
} else if (p->getChilds().size()) {

View File

@ -2086,21 +2086,32 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
throw snap_log_anti_quorum_unsupported();
}
// send a snap request to DD
if (!commitData->db->get().distributor.present()) {
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
throw dd_not_found();
}
state Future<ErrorOr<Void>> ddSnapReq = commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(
DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
try {
wait(throwErrorOr(ddSnapReq));
} catch (Error& e) {
TraceEvent("SnapCommitProxy_DDSnapResponseError")
.errorUnsuppressed(e)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
throw e;
state int snapReqRetry = 0;
state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
loop {
// send a snap request to DD
if (!commitData->db->get().distributor.present()) {
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
throw dd_not_found();
}
try {
Future<ErrorOr<Void>> ddSnapReq =
commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(
DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
wait(throwErrorOr(ddSnapReq));
break;
} catch (Error& e) {
TraceEvent("SnapCommitProxy_DDSnapResponseError")
.errorUnsuppressed(e)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// Retry if we have network issues
if (e.code() != error_code_request_maybe_delivered ||
++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT)
throw e;
wait(delay(snapRetryBackoff));
snapRetryBackoff = snapRetryBackoff * 2; // exponential backoff
}
}
snapReq.reply.send(Void());
} catch (Error& e) {
@ -2314,13 +2325,6 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveContext* pContext,
TxnStateRequest request) {
state const TxnStateRequest& req = request;
state ProxyCommitData& commitData = *pContext->pCommitData;
state PromiseStream<Future<Void>>& addActor = *pContext->pActors;
state Sequence& maxSequence = pContext->maxSequence;
state ReplyPromise<Void> reply = req.reply;
state std::unordered_set<Sequence>& txnSequences = pContext->receivedSequences;
ASSERT(pContext->pCommitData != nullptr);
ASSERT(pContext->pActors != nullptr);

View File

@ -876,14 +876,26 @@ Future<Void> sendSnapReq(RequestStream<Req> stream, Req req, Error e) {
return Void();
}
ACTOR template <class Req>
Future<ErrorOr<Void>> trySendSnapReq(RequestStream<Req> stream, Req req) {
ErrorOr<REPLY_TYPE(Req)> reply = wait(stream.tryGetReply(req));
if (reply.isError()) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
return ErrorOr<Void>(reply.getError());
ACTOR Future<ErrorOr<Void>> trySendSnapReq(RequestStream<WorkerSnapRequest> stream, WorkerSnapRequest req) {
state int snapReqRetry = 0;
state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
loop {
ErrorOr<REPLY_TYPE(WorkerSnapRequest)> reply = wait(stream.tryGetReply(req));
if (reply.isError()) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
if (reply.getError().code() != error_code_request_maybe_delivered ||
++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT)
return ErrorOr<Void>(reply.getError());
else {
// retry for network failures with same snap UID to avoid snapshot twice
req = WorkerSnapRequest(req.snapPayload, req.snapUID, req.role);
wait(delay(snapRetryBackoff));
snapRetryBackoff = snapRetryBackoff * 2;
}
} else
break;
}
return ErrorOr<Void>(Void());
}
@ -906,6 +918,124 @@ ACTOR static Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures
return Void();
}
ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>> getStatefulWorkers(
Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::vector<TLogInterface>* tlogs,
int* storageFaultTolerance) {
state std::map<NetworkAddress, std::pair<WorkerInterface, std::string>> result;
state std::map<NetworkAddress, WorkerInterface> workersMap;
state Transaction tr(cx);
state DatabaseConfiguration configuration;
loop {
try {
// necessary options
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// get database configuration
DatabaseConfiguration _configuration = wait(getDatabaseConfiguration(&tr));
configuration = _configuration;
// get storages
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
state std::vector<StorageServerInterface> storageServers;
storageServers.reserve(serverList.size());
for (int i = 0; i < serverList.size(); i++)
storageServers.push_back(decodeServerListValue(serverList[i].value));
// get workers
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
for (const auto& worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
Optional<Value> regionsValue =
wait(tr.get(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix)));
int usableRegions = 1;
if (regionsValue.present()) {
usableRegions = atoi(regionsValue.get().toString().c_str());
}
auto masterDcId = dbInfo->get().master.locality.dcId();
int storageFailures = 0;
for (const auto& server : storageServers) {
TraceEvent(SevDebug, "StorageServerDcIdInfo")
.detail("Address", server.address().toString())
.detail("ServerLocalityID", server.locality.dcId())
.detail("MasterDcID", masterDcId);
if (usableRegions == 1 || server.locality.dcId() == masterDcId) {
auto itr = workersMap.find(server.address());
if (itr == workersMap.end()) {
TraceEvent(SevWarn, "GetStorageWorkers")
.detail("Reason", "Could not find worker for storage server")
.detail("SS", server.id());
++storageFailures;
} else {
if (result.count(server.address())) {
ASSERT(itr->second.id() == result[server.address()].first.id());
if (result[server.address()].second.find("storage") == std::string::npos)
result[server.address()].second.append(",storage");
} else {
result[server.address()] = std::make_pair(itr->second, "storage");
}
}
}
}
// calculate fault tolerance
*storageFaultTolerance = std::min(static_cast<int>(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE),
configuration.storageTeamSize - 1) -
storageFailures;
if (*storageFaultTolerance < 0) {
TEST(true); // Too many failed storage servers to complete snapshot
throw snap_storage_failed();
}
// tlogs
for (const auto& tlog : *tlogs) {
TraceEvent(SevDebug, "GetStatefulWorkersTlog").detail("Addr", tlog.address());
if (workersMap.find(tlog.address()) == workersMap.end()) {
TraceEvent(SevError, "MissingTlogWorkerInterface").detail("TlogAddress", tlog.address());
throw snap_tlog_failed();
}
if (result.count(tlog.address())) {
ASSERT(workersMap[tlog.address()].id() == result[tlog.address()].first.id());
result[tlog.address()].second.append(",tlog");
} else {
result[tlog.address()] = std::make_pair(workersMap[tlog.address()], "tlog");
}
}
// get coordinators
Optional<Value> coordinators = wait(tr.get(coordinatorsKey));
if (!coordinators.present()) {
throw operation_failed();
}
ClusterConnectionString ccs(coordinators.get().toString());
std::vector<NetworkAddress> coordinatorsAddr = wait(ccs.tryResolveHostnames());
std::set<NetworkAddress> coordinatorsAddrSet(coordinatorsAddr.begin(), coordinatorsAddr.end());
for (const auto& worker : workers) {
// Note : only considers second address for coordinators,
// as we use primary addresses from storage and tlog interfaces above
NetworkAddress primary = worker.interf.address();
Optional<NetworkAddress> secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress;
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end() ||
(secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
if (result.count(primary)) {
ASSERT(workersMap[primary].id() == result[primary].first.id());
result[primary].second.append(",coord");
} else {
result[primary] = std::make_pair(workersMap[primary], "coord");
}
}
}
return result;
} catch (Error& e) {
wait(tr.onError(e));
result.clear();
}
}
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
@ -942,47 +1072,44 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
TraceEvent("SnapDataDistributor_AfterDisableTLogPop")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local storage nodes
// TODO: Atomically read configuration and storage worker list in a single transaction
state DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
std::pair<std::vector<WorkerInterface>, int> storageWorkersAndFailures =
wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed()));
const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures;
auto const storageFaultTolerance =
std::min(static_cast<int>(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE),
configuration.storageTeamSize - 1) -
storageFailures;
if (storageFaultTolerance < 0) {
TEST(true); // Too many failed storage servers to complete snapshot
throw snap_storage_failed();
}
TraceEvent("SnapDataDistributor_GotStorageWorkers")
state int storageFaultTolerance;
// snap stateful nodes
state std::map<NetworkAddress, std::pair<WorkerInterface, std::string>> statefulWorkers =
wait(transformErrors(getStatefulWorkers(cx, db, &tlogs, &storageFaultTolerance), snap_storage_failed()));
TraceEvent("SnapDataDistributor_GotStatefulWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// we need to snapshot storage nodes before snapshot any tlogs
std::vector<Future<ErrorOr<Void>>> storageSnapReqs;
storageSnapReqs.reserve(storageWorkers.size());
for (const auto& worker : storageWorkers) {
storageSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr)));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("storage") != std::string::npos)
storageSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr)));
}
wait(waitForMost(storageSnapReqs, storageFaultTolerance, snap_storage_failed()));
TraceEvent("SnapDataDistributor_AfterSnapStorage")
.detail("FaultTolerance", storageFaultTolerance)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local tlog nodes
std::vector<Future<Void>> tLogSnapReqs;
std::vector<Future<ErrorOr<Void>>> tLogSnapReqs;
tLogSnapReqs.reserve(tlogs.size());
for (const auto& tlog : tlogs) {
tLogSnapReqs.push_back(sendSnapReq(tlog.snapRequest,
TLogSnapRequest{ snapReq.snapPayload, snapReq.snapUID, "tlog"_sr },
snap_tlog_failed()));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("tlog") != std::string::npos)
tLogSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "tlog"_sr)));
}
wait(waitForAll(tLogSnapReqs));
wait(waitForMost(tLogSnapReqs, 0, snap_tlog_failed()));
TraceEvent("SnapDataDistributor_AfterTLogStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// enable tlog pop on local tlog nodes
std::vector<Future<Void>> enablePops;
enablePops.reserve(tlogs.size());
@ -995,20 +1122,18 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
TraceEvent("SnapDataDistributor_AfterEnableTLogPops")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap the coordinators
std::vector<WorkerInterface> coordWorkers = wait(getCoordWorkers(cx, db));
TraceEvent("SnapDataDistributor_GotCoordWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<ErrorOr<Void>>> coordSnapReqs;
coordSnapReqs.reserve(coordWorkers.size());
for (const auto& worker : coordWorkers) {
coordSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr)));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("coord") != std::string::npos)
coordSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr)));
}
auto const coordFaultTolerance = std::min<int>(std::max<int>(0, coordSnapReqs.size() / 2 - 1),
SERVER_KNOBS->MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE);
wait(waitForMost(coordSnapReqs, coordFaultTolerance, snap_coord_failed()));
TraceEvent("SnapDataDistributor_AfterSnapCoords")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
@ -1056,37 +1181,48 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) {
ACTOR Future<Void> ddSnapCreate(
DistributorSnapRequest snapReq,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState,
std::map<UID, DistributorSnapRequest>* ddSnapMap /* ongoing snapshot requests */,
std::map<UID, ErrorOr<Void>>*
ddSnapResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) {
state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
// disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails
// here
TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").log();
snapReq.reply.sendError(operation_failed());
TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").detail("SnapUID", snapReq.snapUID);
ddSnapMap->at(snapReq.snapUID).reply.sendError(operation_failed());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(operation_failed());
return Void();
}
double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
try {
choose {
when(wait(dbInfoChange)) {
TraceEvent("SnapDDCreateDBInfoChanged")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(snap_with_recovery_unsupported());
ddSnapMap->at(snapReq.snapUID).reply.sendError(snap_with_recovery_unsupported());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(snap_with_recovery_unsupported());
}
when(wait(ddSnapCreateCore(snapReq, db))) {
TraceEvent("SnapDDCreateSuccess")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.send(Void());
ddSnapMap->at(snapReq.snapUID).reply.send(Void());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(Void());
}
when(wait(delay(delayTime))) {
when(wait(delay(SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT))) {
TraceEvent("SnapDDCreateTimedOut")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(timed_out());
ddSnapMap->at(snapReq.snapUID).reply.sendError(timed_out());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(timed_out());
}
}
} catch (Error& e) {
@ -1095,7 +1231,9 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
ddSnapMap->at(snapReq.snapUID).reply.sendError(e);
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(e);
} else {
// enable DD should always succeed
bool success = ddEnabledState->setDDEnabled(true, snapReq.snapUID);
@ -1246,6 +1384,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ActorCollection actors(false);
state DDEnabledState ddEnabledState;
state std::map<UID, DistributorSnapRequest> ddSnapReqMap;
state std::map<UID, ErrorOr<Void>> ddSnapReqResultMap;
self->addActor.send(actors.getResult());
self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id()));
@ -1273,7 +1413,30 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
actors.add(ddGetMetrics(req, getShardMetricsList));
}
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db, &ddEnabledState));
auto& snapUID = snapReq.snapUID;
if (ddSnapReqResultMap.count(snapUID)) {
TEST(true); // Data distributor received a duplicate finished snap request
auto result = ddSnapReqResultMap[snapUID];
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
TraceEvent("RetryFinishedDistributorSnapRequest")
.detail("SnapUID", snapUID)
.detail("Result", result.isError() ? result.getError().code() : 0);
} else if (ddSnapReqMap.count(snapReq.snapUID)) {
TEST(true); // Data distributor received a duplicate ongoing snap request
TraceEvent("RetryOngoingDistributorSnapRequest").detail("SnapUID", snapUID);
ASSERT(snapReq.snapPayload == ddSnapReqMap[snapUID].snapPayload);
ddSnapReqMap[snapUID] = snapReq;
} else {
ddSnapReqMap[snapUID] = snapReq;
actors.add(ddSnapCreate(snapReq, db, &ddEnabledState, &ddSnapReqMap, &ddSnapReqResultMap));
auto* ddSnapReqResultMapPtr = &ddSnapReqResultMap;
actors.add(fmap(
[ddSnapReqResultMapPtr, snapUID](Void _) {
ddSnapReqResultMapPtr->erase(snapUID);
return Void();
},
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
}
}
when(DistributorExclusionSafetyCheckRequest exclCheckReq =
waitNext(di.distributorExclCheckReq.getFuture())) {

View File

@ -43,7 +43,7 @@ BandwidthStatus getBandwidthStatus(StorageMetrics const& metrics) {
}
ReadBandwidthStatus getReadBandwidthStatus(StorageMetrics const& metrics) {
if (metrics.bytesReadPerKSecond <= SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS ||
if (metrics.bytesReadPerKSecond <= SERVER_KNOBS->SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS ||
metrics.bytesReadPerKSecond <= SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO * metrics.bytes *
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS) {
return ReadBandwidthStatusNormal;
@ -238,7 +238,7 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
std::max((int64_t)(SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO * bytes *
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS *
(1.0 + SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER)),
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
SERVER_KNOBS->SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS);
bounds.min.bytesReadPerKSecond = 0;
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
} else if (readBandwidthStatus == ReadBandwidthStatusHigh) {
@ -291,7 +291,7 @@ ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self,
.detail("Keys", keys)
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BandwidthStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)

View File

@ -426,14 +426,12 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, UID snapUID, std::stri
} else {
// copy the files
state std::string folderFrom = folder + "/.";
state std::string folderTo = folder + "-snap-" + uidStr.toString();
double maxSimDelayTime = 10.0;
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
state std::string folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
std::vector<std::string> paramList;
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(mkdirBin);
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, maxSimDelayTime);
cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, 10.0);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {

View File

@ -0,0 +1,533 @@
/*
* GlobalTagThrottler.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbrpc/Smoother.h"
#include "fdbserver/TagThrottler.h"
#include <limits>
#include "flow/actorcompiler.h" // must be last include
class GlobalTagThrottlerImpl {
class QuotaAndCounters {
Optional<ThrottleApi::TagQuotaValue> quota;
std::unordered_map<UID, double> ssToReadCostRate;
std::unordered_map<UID, double> ssToWriteCostRate;
Smoother totalReadCostRate;
Smoother totalWriteCostRate;
Smoother transactionCounter;
Smoother perClientRate;
Optional<double> getReadTPSLimit() const {
if (totalReadCostRate.smoothTotal() > 0) {
return quota.get().totalReadQuota * transactionCounter.smoothRate() / totalReadCostRate.smoothTotal();
} else {
return {};
}
}
Optional<double> getWriteTPSLimit() const {
if (totalWriteCostRate.smoothTotal() > 0) {
return quota.get().totalWriteQuota * transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
} else {
return {};
}
}
public:
QuotaAndCounters()
: totalReadCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
totalWriteCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
void updateReadCostRate(UID ssId, double newReadCostRate) {
auto& currentReadCostRate = ssToReadCostRate[ssId];
auto diff = newReadCostRate - currentReadCostRate;
currentReadCostRate += diff;
totalReadCostRate.addDelta(diff);
}
void updateWriteCostRate(UID ssId, double newWriteCostRate) {
auto& currentWriteCostRate = ssToWriteCostRate[ssId];
auto diff = newWriteCostRate - currentWriteCostRate;
currentWriteCostRate += diff;
totalWriteCostRate.addDelta(diff);
}
void addTransactions(int count) { transactionCounter.addDelta(count); }
Optional<double> getTargetTotalTPSLimit() const {
if (!quota.present())
return {};
auto readLimit = getReadTPSLimit();
auto writeLimit = getWriteTPSLimit();
// TODO: Implement expiration logic
if (!readLimit.present() && !writeLimit.present()) {
return {};
} else {
if (!readLimit.present()) {
return writeLimit.get();
} else if (!writeLimit.present()) {
return readLimit.get();
} else {
return std::min(readLimit.get(), writeLimit.get());
}
}
}
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit() {
auto targetRate = getTargetTotalTPSLimit();
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
auto newPerClientRate = std::max(
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE,
std::min(targetRate.get(),
(targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
perClientRate.setTotal(newPerClientRate);
return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION);
} else {
return {};
}
}
void processTraceEvent(TraceEvent& te) const {
if (quota.present()) {
te.detail("ProvidedReadTPSLimit", getReadTPSLimit())
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit())
.detail("ReadCostRate", totalReadCostRate.smoothTotal())
.detail("WriteCostRate", totalWriteCostRate.smoothTotal())
.detail("TotalReadQuota", quota.get().totalReadQuota)
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
}
}
};
Database db;
UID id;
std::map<TransactionTag, QuotaAndCounters> trackedTags;
uint64_t throttledTagChangeId{ 0 };
Future<Void> traceActor;
ACTOR static Future<Void> tracer(GlobalTagThrottlerImpl const* self) {
loop {
for (const auto& [tag, quotaAndCounters] : self->trackedTags) {
TraceEvent te("GlobalTagThrottling");
te.detail("Tag", tag);
quotaAndCounters.processTraceEvent(te);
}
wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRACE_INTERVAL));
}
}
ACTOR static Future<Void> monitorThrottlingChanges(GlobalTagThrottlerImpl* self) {
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
// TODO: Clean up quotas that have been removed
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state RangeResult currentQuotas = wait(tr.getRange(tagQuotaKeys, CLIENT_KNOBS->TOO_MANY));
TraceEvent("GlobalTagThrottler_ReadCurrentQuotas").detail("Size", currentQuotas.size());
for (auto const kv : currentQuotas) {
auto const tag = kv.key.removePrefix(tagQuotaPrefix);
auto const quota = ThrottleApi::TagQuotaValue::fromValue(kv.value);
self->trackedTags[tag].setQuota(quota);
}
++self->throttledTagChangeId;
// FIXME: Should wait on watch instead
// wait(tr.watch(tagThrottleSignalKey));
wait(delay(5.0));
TraceEvent("GlobalTagThrottler_ChangeSignaled");
TEST(true); // Global tag throttler detected quota changes
break;
} catch (Error& e) {
TraceEvent("GlobalTagThrottlerMonitoringChangesError", self->id).error(e);
wait(tr.onError(e));
}
}
}
}
public:
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); }
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); }
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
// TODO: For now, only enforce total throttling rates.
// We should use reserved quotas as well.
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
for (auto& [tag, quotaAndCounters] : trackedTags) {
// Currently there is no differentiation between batch priority and default priority transactions
auto const limit = quotaAndCounters.updateAndGetPerClientLimit();
if (limit.present()) {
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
}
}
return result;
}
int64_t autoThrottleCount() const { return trackedTags.size(); }
uint32_t busyReadTagCount() const {
// TODO: Implement
return 0;
}
uint32_t busyWriteTagCount() const {
// TODO: Implement
return 0;
}
int64_t manualThrottleCount() const { return trackedTags.size(); }
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
for (const auto& busyReadTag : ss.busiestReadTags) {
trackedTags[busyReadTag.tag].updateReadCostRate(ss.id, busyReadTag.rate);
}
for (const auto& busyWriteTag : ss.busiestWriteTags) {
trackedTags[busyWriteTag.tag].updateWriteCostRate(ss.id, busyWriteTag.rate);
}
// TODO: Call ThrottleApi::throttleTags
return Void();
}
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
trackedTags[tag].setQuota(tagQuotaValue);
}
};
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
GlobalTagThrottler::~GlobalTagThrottler() = default;
Future<Void> GlobalTagThrottler::monitorThrottlingChanges() {
return impl->monitorThrottlingChanges();
}
void GlobalTagThrottler::addRequests(TransactionTag tag, int count) {
return impl->addRequests(tag, count);
}
uint64_t GlobalTagThrottler::getThrottledTagChangeId() const {
return impl->getThrottledTagChangeId();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> GlobalTagThrottler::getClientRates() {
return impl->getClientRates();
}
int64_t GlobalTagThrottler::autoThrottleCount() const {
return impl->autoThrottleCount();
}
uint32_t GlobalTagThrottler::busyReadTagCount() const {
return impl->busyReadTagCount();
}
uint32_t GlobalTagThrottler::busyWriteTagCount() const {
return impl->busyWriteTagCount();
}
int64_t GlobalTagThrottler::manualThrottleCount() const {
return impl->manualThrottleCount();
}
bool GlobalTagThrottler::isAutoThrottlingEnabled() const {
return true;
}
Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
return impl->tryUpdateAutoThrottling(ss);
}
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
return impl->setQuota(tag, tagQuotaValue);
}
namespace GlobalTagThrottlerTesting {
Optional<double> getTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) {
auto clientRates = globalTagThrottler.getClientRates();
auto it1 = clientRates.find(TransactionPriority::DEFAULT);
if (it1 != clientRates.end()) {
auto it2 = it1->second.find(tag);
if (it2 != it1->second.end()) {
return it2->second.tpsRate;
}
}
return {};
}
class StorageServerCollection {
class Cost {
Smoother smoother;
public:
Cost() : smoother(5.0) {}
Cost& operator+=(double delta) {
smoother.addDelta(delta);
return *this;
}
double smoothRate() const { return smoother.smoothRate(); }
};
std::vector<std::map<TransactionTag, Cost>> readCosts;
std::vector<std::map<TransactionTag, Cost>> writeCosts;
public:
StorageServerCollection(size_t size) : readCosts(size), writeCosts(size) { ASSERT_GT(size, 0); }
void addReadCost(TransactionTag tag, double cost) {
auto const costPerSS = cost / readCosts.size();
for (auto& readCost : readCosts) {
readCost[tag] += costPerSS;
}
}
void addWriteCost(TransactionTag tag, double cost) {
auto const costPerSS = cost / writeCosts.size();
for (auto& writeCost : writeCosts) {
writeCost[tag] += costPerSS;
}
}
std::vector<StorageQueueInfo> getStorageQueueInfos() const {
std::vector<StorageQueueInfo> result;
result.reserve(readCosts.size());
for (int i = 0; i < readCosts.size(); ++i) {
StorageQueueInfo sqInfo(UID(i, i), LocalityData{});
for (const auto& [tag, readCost] : readCosts[i]) {
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
sqInfo.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness);
}
for (const auto& [tag, writeCost] : writeCosts[i]) {
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
sqInfo.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
}
result.push_back(sqInfo);
}
return result;
}
};
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
StorageServerCollection* storageServers,
TransactionTag tag,
double desiredTpsRate,
double costPerTransaction,
bool write) {
loop {
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
wait(delay(1 / tpsRate));
if (write) {
storageServers->addWriteCost(tag, costPerTransaction);
} else {
storageServers->addReadCost(tag, costPerTransaction);
}
globalTagThrottler->addRequests(tag, 1);
}
}
ACTOR static Future<Void> monitorClientRates(GlobalTagThrottler* globalTagThrottler,
TransactionTag tag,
double desiredTPSLimit) {
state int successes = 0;
loop {
wait(delay(1.0));
auto currentTPSLimit = getTPSLimit(*globalTagThrottler, tag);
if (currentTPSLimit.present()) {
TraceEvent("GlobalTagThrottling_RateMonitor")
.detail("Tag", tag)
.detail("CurrentTPSRate", currentTPSLimit.get())
.detail("DesiredTPSRate", desiredTPSLimit);
if (abs(currentTPSLimit.get() - desiredTPSLimit) < 0.1) {
if (++successes == 3) {
return Void();
}
} else {
successes = 0;
}
} else {
successes = 0;
}
}
}
ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTagThrottler,
StorageServerCollection const* storageServers) {
loop {
wait(delay(1.0));
auto const storageQueueInfos = storageServers->getStorageQueueInfos();
for (const auto& sq : storageQueueInfos) {
globalTagThrottler->tryUpdateAutoThrottling(sq);
}
}
}
} // namespace GlobalTagThrottlerTesting
TEST_CASE("/GlobalTagThrottler/Simple") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalWriteQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, true);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiTagThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag1, tagQuotaValue);
globalTagThrottler.setQuota(testTag2, tagQuotaValue);
state std::vector<Future<Void>> futures;
state std::vector<Future<Void>> monitorFutures;
futures.push_back(
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag1, 5.0, 6.0, false));
futures.push_back(
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag2, 5.0, 6.0, false));
futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers));
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag1, 100.0 / 6.0));
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag2, 100.0 / 6.0));
wait(timeoutError(waitForAny(futures) || waitForAll(monitorFutures), 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 10.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiClientActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 5.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
// Global transaction rate should be 20.0, with a distribution of (5, 15) between the 2 clients
TEST_CASE("/GlobalTagThrottler/SkewedMultiClientActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 5.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 25.0, 5.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 15.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
// Test that the tag throttler can reach equilibrium, then adjust to a new equilibrium once the quota is changed
TEST_CASE("/GlobalTagThrottler/UpdateQuota") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
state ThrottleApi::TagQuotaValue tagQuotaValue;
state TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
tagQuotaValue.totalReadQuota = 50.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 50.0 / 6.0);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}

View File

@ -668,7 +668,7 @@ public:
TraceEvent("RocksDB").detail("Info", "DBDestroyed");
}
rocksdb::DB* getDb() { return db; }
rocksdb::DB* getDb() const { return db; }
std::unordered_map<std::string, std::shared_ptr<PhysicalShard>>* getAllShards() { return &physicalShards; }
@ -2092,11 +2092,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
}
StorageBytes getStorageBytes() const override {
uint64_t total_live = 0;
int64_t total_free = 0;
int64_t total_space = 0;
uint64_t live = 0;
ASSERT(shardManager.getDb()->GetAggregatedIntProperty(rocksdb::DB::Properties::kLiveSstFilesSize, &live));
return StorageBytes(total_free, total_space, total_live, total_free);
int64_t free;
int64_t total;
g_network->getDiskBytes(path, free, total);
return StorageBytes(free, total, live, free);
}
std::vector<std::string> removeRange(KeyRangeRef range) override { return shardManager.removeRange(range); }
@ -2118,7 +2120,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
ShardManager shardManager;
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
std::string path;
const std::string dataPath;
UID id;
Reference<IThreadPool> writeThread;
Reference<IThreadPool> readThreads;

View File

@ -227,11 +227,6 @@ public:
}
}
ACTOR static Future<Void> monitorThrottlingChanges(Ratekeeper* self) {
wait(self->tagThrottler->monitorThrottlingChanges());
return Void();
}
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
@ -408,7 +403,7 @@ Future<Void> Ratekeeper::trackTLogQueueInfo(TLogInterface tli) {
}
Future<Void> Ratekeeper::monitorThrottlingChanges() {
return RatekeeperImpl::monitorThrottlingChanges(this);
return tagThrottler->monitorThrottlingChanges();
}
Future<Void> Ratekeeper::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
@ -436,7 +431,11 @@ Ratekeeper::Ratekeeper(UID id, Database db)
SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH,
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) {
tagThrottler = std::make_unique<TagThrottler>(db, id);
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
tagThrottler = std::make_unique<GlobalTagThrottler>(db, id);
} else {
tagThrottler = std::make_unique<TagThrottler>(db, id);
}
}
void Ratekeeper::updateCommitCostEstimation(

View File

@ -584,13 +584,6 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveContext* pContext,
TxnStateRequest request) {
state const TxnStateRequest& req = request;
state Resolver& resolverData = *pContext->pResolverData;
state PromiseStream<Future<Void>>& addActor = *pContext->pActors;
state Sequence& maxSequence = pContext->maxSequence;
state ReplyPromise<Void> reply = req.reply;
state std::unordered_set<Sequence>& txnSequences = pContext->receivedSequences;
ASSERT(pContext->pResolverData.getPtr() != nullptr);
ASSERT(pContext->pActors != nullptr);

View File

@ -22,7 +22,7 @@
#include "fdbserver/Knobs.h"
#include "fdbserver/RkTagThrottleCollection.h"
double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional<double> requestRate) {
double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional<double> requestRate) const {
if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
} else {
@ -347,10 +347,12 @@ int64_t RkTagThrottleCollection::manualThrottleCount() const {
return count;
}
void RkTagThrottleCollection::updateBusyTagCount(TagThrottledReason reason) {
void RkTagThrottleCollection::incrementBusyTagCount(TagThrottledReason reason) {
if (reason == TagThrottledReason::BUSY_READ) {
++busyReadTagCount;
} else if (reason == TagThrottledReason::BUSY_WRITE) {
++busyWriteTagCount;
} else {
ASSERT(false);
}
}

View File

@ -366,9 +366,9 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
Reference<AsyncVar<bool>> degraded;
// End of fields used by snapshot based backup and restore
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
Reference<Histogram> commitLatencyDist;
@ -2569,42 +2569,6 @@ void getQueuingMetrics(TLogData* self, Reference<LogData> logData, TLogQueuingMe
req.reply.send(reply);
}
ACTOR Future<Void> tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
int err = wait(execHelper(&snapArg, snapReq.snapUID, self->dataFolder, snapReq.role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
if (err != 0) {
throw operation_failed();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").errorUnsuppressed(e);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
@ -2731,9 +2695,6 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogEnablePopRequest enablePopReq = waitNext(tli.enablePopRequest.getFuture())) {
logData->addActor.send(tLogEnablePopReq(enablePopReq, self, logData));
}
when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) {
logData->addActor.send(tLogSnapCreate(snapReq, self, logData));
}
}
}

View File

@ -21,6 +21,7 @@
#include "fdbserver/TagThrottler.h"
#include "fdbserver/RkTagThrottleCollection.h"
#include "flow/actorcompiler.h" // must be last include
class TagThrottlerImpl {
Database db;
@ -106,7 +107,7 @@ class TagThrottlerImpl {
if (tagKey.throttleType == TagThrottleType::AUTO) {
updatedTagThrottles.autoThrottleTag(
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
updatedTagThrottles.updateBusyTagCount(tagValue.reason);
updatedTagThrottles.incrementBusyTagCount(tagValue.reason);
} else {
updatedTagThrottles.manualThrottleTag(self->id,
tag,
@ -143,6 +144,7 @@ class TagThrottlerImpl {
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
Optional<double> clientRate = throttledTags.autoThrottleTag(id, tag, busyness);
// TODO: Increment tag throttle counts here?
if (clientRate.present()) {
TagSet tags;
tags.addTag(tag);
@ -185,23 +187,21 @@ public:
// the future
auto storageQueue = ss.getStorageQueueBytes();
auto storageDurabilityLag = ss.getDurabilityLag();
std::vector<Future<Void>> futures;
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
// TODO: Update once size is potentially > 1
ASSERT_WE_THINK(ss.busiestWriteTags.size() <= 1);
ASSERT_WE_THINK(ss.busiestReadTags.size() <= 1);
for (const auto& busyWriteTag : ss.busiestWriteTags) {
return tryUpdateAutoThrottling(busyWriteTag.tag,
busyWriteTag.rate,
busyWriteTag.fractionalBusyness,
TagThrottledReason::BUSY_WRITE);
futures.push_back(tryUpdateAutoThrottling(busyWriteTag.tag,
busyWriteTag.rate,
busyWriteTag.fractionalBusyness,
TagThrottledReason::BUSY_WRITE));
}
for (const auto& busyReadTag : ss.busiestReadTags) {
return tryUpdateAutoThrottling(
busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ);
futures.push_back(tryUpdateAutoThrottling(
busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ));
}
}
return Void();
return waitForAll(futures);
}
}; // class TagThrottlerImpl

View File

@ -18,50 +18,193 @@
* limitations under the License.
*/
#include "fdbserver/Knobs.h"
#include "fdbserver/TransactionTagCounter.h"
#include "flow/Trace.h"
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
: thisServerID(thisServerID),
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
namespace {
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
if (tags.present()) {
TEST(true); // Tracking transaction tag in counter
double cost = costFunction(bytes);
for (auto& tag : tags.get()) {
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
count += cost;
if (count > busiestTagCount) {
busiestTagCount = count;
busiestTag = tag;
class TopKTags {
public:
struct TagAndCount {
TransactionTag tag;
int64_t count;
bool operator<(TagAndCount const& other) const { return count < other.count; }
explicit TagAndCount(TransactionTag tag, int64_t count) : tag(tag), count(count) {}
};
private:
// Because the number of tracked is expected to be small, they can be tracked
// in a simple vector. If the number of tracked tags increases, a more sophisticated
// data structure will be required.
std::vector<TagAndCount> topTags;
int limit;
public:
explicit TopKTags(int limit) : limit(limit) {
ASSERT_GT(limit, 0);
topTags.reserve(limit);
}
void incrementCount(TransactionTag tag, int previousCount, int increase) {
auto iter = std::find_if(topTags.begin(), topTags.end(), [tag](const auto& tc) { return tc.tag == tag; });
if (iter != topTags.end()) {
ASSERT_EQ(previousCount, iter->count);
iter->count += increase;
} else if (topTags.size() < limit) {
ASSERT_EQ(previousCount, 0);
topTags.emplace_back(tag, increase);
} else {
auto toReplace = std::min_element(topTags.begin(), topTags.end());
ASSERT_GE(toReplace->count, previousCount);
if (toReplace->count < previousCount + increase) {
toReplace->tag = tag;
toReplace->count = previousCount + increase;
}
}
intervalTotalSampledCount += cost;
}
std::vector<StorageQueuingMetricsReply::TagInfo> getBusiestTags(double elapsed, double totalSampleCount) const {
std::vector<StorageQueuingMetricsReply::TagInfo> result;
for (auto const& tagAndCounter : topTags) {
auto rate = (tagAndCounter.count / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) / elapsed;
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
result.emplace_back(tagAndCounter.tag, rate, tagAndCounter.count / totalSampleCount);
}
}
return result;
}
void clear() { topTags.clear(); }
};
} // namespace
class TransactionTagCounterImpl {
UID thisServerID;
TransactionTagMap<int64_t> intervalCounts;
int64_t intervalTotalSampledCount = 0;
TopKTags topTags;
double intervalStart = 0;
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
Reference<EventCacheHolder> busiestReadTagEventHolder;
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
public:
TransactionTagCounterImpl(UID thisServerID)
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
if (tags.present()) {
TEST(true); // Tracking transaction tag in counter
double cost = costFunction(bytes);
for (auto& tag : tags.get()) {
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
topTags.incrementCount(tag, count, cost);
count += cost;
}
intervalTotalSampledCount += cost;
}
}
void startNewInterval() {
double elapsed = now() - intervalStart;
previousBusiestTags.clear();
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
previousBusiestTags = topTags.getBusiestTags(elapsed, intervalTotalSampledCount);
TraceEvent("BusiestReadTag", thisServerID)
.detail("Elapsed", elapsed)
//.detail("Tag", printable(busiestTag))
//.detail("TagCost", busiestTagCount)
.detail("TotalSampledCost", intervalTotalSampledCount)
.detail("Reported", previousBusiestTags.size())
.trackLatest(busiestReadTagEventHolder->trackingKey);
}
intervalCounts.clear();
intervalTotalSampledCount = 0;
topTags.clear();
intervalStart = now();
}
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
};
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
: impl(PImpl<TransactionTagCounterImpl>::create(thisServerID)) {}
TransactionTagCounter::~TransactionTagCounter() = default;
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
return impl->addRequest(tags, bytes);
}
void TransactionTagCounter::startNewInterval() {
double elapsed = now() - intervalStart;
previousBusiestTags.clear();
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
previousBusiestTags.emplace_back(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
}
TraceEvent("BusiestReadTag", thisServerID)
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagCost", busiestTagCount)
.detail("TotalSampledCost", intervalTotalSampledCount)
.detail("Reported", !previousBusiestTags.empty())
.trackLatest(busiestReadTagEventHolder->trackingKey);
}
intervalCounts.clear();
intervalTotalSampledCount = 0;
busiestTagCount = 0;
intervalStart = now();
return impl->startNewInterval();
}
std::vector<StorageQueuingMetricsReply::TagInfo> const& TransactionTagCounter::getBusiestTags() const {
return impl->getBusiestTags();
}
TEST_CASE("/TransactionTagCounter/TopKTags") {
TopKTags topTags(2);
// Ensure that costs are larger enough to show up
auto const costMultiplier =
std::max<double>(1.0, 2 * SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
topTags.incrementCount("a"_sr, 0, 1 * costMultiplier);
{
auto const busiestTags = topTags.getBusiestTags(1.0, 1 * costMultiplier);
ASSERT_EQ(busiestTags.size(), 1);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
1);
}
topTags.incrementCount("b"_sr, 0, 2 * costMultiplier);
topTags.incrementCount("c"_sr, 0, 3 * costMultiplier);
{
auto busiestTags = topTags.getBusiestTags(1.0, 6 * costMultiplier);
ASSERT_EQ(busiestTags.size(), 2);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
0);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
1);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
1);
}
topTags.incrementCount("a"_sr, 1 * costMultiplier, 3 * costMultiplier);
{
auto busiestTags = topTags.getBusiestTags(1.0, 9 * costMultiplier);
ASSERT_EQ(busiestTags.size(), 2);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
1);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
0);
ASSERT_EQ(std::count_if(busiestTags.begin(),
busiestTags.end(),
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
1);
}
topTags.clear();
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
return Void();
}

View File

@ -148,7 +148,7 @@ class Ratekeeper {
double lastWarning;
double lastSSListFetchedTimestamp;
std::unique_ptr<class TagThrottler> tagThrottler;
std::unique_ptr<class ITagThrottler> tagThrottler;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;

View File

@ -42,7 +42,7 @@ class RkTagThrottleCollection : NonCopyable {
bool rateSet = false;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate);
double getTargetRate(Optional<double> requestRate) const;
Optional<double> updateAndGetClientRate(Optional<double> requestRate);
};
@ -83,7 +83,7 @@ public:
void addRequests(TransactionTag const& tag, int requests);
int64_t autoThrottleCount() const { return autoThrottledTags.size(); }
int64_t manualThrottleCount() const;
void updateBusyTagCount(TagThrottledReason);
void incrementBusyTagCount(TagThrottledReason);
auto getBusyReadTagCount() const { return busyReadTagCount; }
auto getBusyWriteTagCount() const { return busyWriteTagCount; }
};

View File

@ -532,7 +532,7 @@ struct StorageServerMetrics {
auto _ranges = getReadHotRanges(req.keys,
SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
SERVER_KNOBS->SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS);
reply.readHotRanges = VectorRef(_ranges.data(), _ranges.size());
req.reply.send(reply);
}

View File

@ -23,32 +23,72 @@
#include "fdbclient/PImpl.h"
#include "fdbserver/Ratekeeper.h"
class TagThrottler {
class ITagThrottler {
public:
virtual ~ITagThrottler() = default;
// Poll the system keyspace looking for updates made through the tag throttling API
virtual Future<Void> monitorThrottlingChanges() = 0;
// Increment the number of known requests associated with the specified tag
virtual void addRequests(TransactionTag tag, int count) = 0;
// This throttled tag change ID is used to coordinate updates with the GRV proxies
virtual uint64_t getThrottledTagChangeId() const = 0;
// For each tag and priority combination, return the throughput limit and expiration time
// Also, erase expired tags
virtual PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() = 0;
virtual int64_t autoThrottleCount() const = 0;
virtual uint32_t busyReadTagCount() const = 0;
virtual uint32_t busyWriteTagCount() const = 0;
virtual int64_t manualThrottleCount() const = 0;
virtual bool isAutoThrottlingEnabled() const = 0;
// Based on the busiest read and write tags in the provided storage queue info, update
// tag throttling limits.
virtual Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) = 0;
};
class TagThrottler : public ITagThrottler {
PImpl<class TagThrottlerImpl> impl;
public:
TagThrottler(Database db, UID id);
~TagThrottler();
// Poll the system keyspace looking for updates made through the tag throttling API
Future<Void> monitorThrottlingChanges();
// Increment the number of known requests associated with the specified tag
void addRequests(TransactionTag tag, int count);
// This throttled tag change ID is used to coordinate updates with the GRV proxies
uint64_t getThrottledTagChangeId() const;
// For each tag and priority combination, return the throughput limit and expiration time
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates();
int64_t autoThrottleCount() const;
uint32_t busyReadTagCount() const;
uint32_t busyWriteTagCount() const;
int64_t manualThrottleCount() const;
bool isAutoThrottlingEnabled() const;
// Based on the busiest read and write tags in the provided storage queue info, update
// tag throttling limits.
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&);
Future<Void> monitorThrottlingChanges() override;
void addRequests(TransactionTag tag, int count) override;
uint64_t getThrottledTagChangeId() const override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
int64_t autoThrottleCount() const override;
uint32_t busyReadTagCount() const override;
uint32_t busyWriteTagCount() const override;
int64_t manualThrottleCount() const override;
bool isAutoThrottlingEnabled() const override;
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
};
class GlobalTagThrottler : public ITagThrottler {
PImpl<class GlobalTagThrottlerImpl> impl;
public:
GlobalTagThrottler(Database db, UID id);
~GlobalTagThrottler();
Future<Void> monitorThrottlingChanges() override;
void addRequests(TransactionTag tag, int count) override;
uint64_t getThrottledTagChangeId() const override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
int64_t autoThrottleCount() const override;
uint32_t busyReadTagCount() const override;
uint32_t busyWriteTagCount() const override;
int64_t manualThrottleCount() const override;
bool isAutoThrottlingEnabled() const override;
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
// testing only
public:
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
};

View File

@ -20,25 +20,23 @@
#pragma once
#include "fdbclient/PImpl.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/Knobs.h"
class TransactionTagCounter {
TransactionTagMap<int64_t> intervalCounts;
int64_t intervalTotalSampledCount = 0;
TransactionTag busiestTag;
int64_t busiestTagCount = 0;
double intervalStart = 0;
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
UID thisServerID;
Reference<EventCacheHolder> busiestReadTagEventHolder;
PImpl<class TransactionTagCounterImpl> impl;
public:
TransactionTagCounter(UID thisServerID);
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
~TransactionTagCounter();
// Update counters tracking the busyness of each tag in the current interval
void addRequest(Optional<TagSet> const& tags, int64_t bytes);
// Save current set of busy tags and reset counters for next interval
void startNewInterval();
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
// Returns the set of busiest tags as of the end of the last interval
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const;
};

View File

@ -1415,10 +1415,16 @@ ACTOR Future<Void> traceRole(Role role, UID roleId) {
}
}
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, Standalone<StringRef> snapFolder) {
ACTOR Future<Void> workerSnapCreate(
WorkerSnapRequest snapReq,
std::string snapFolder,
std::map<std::string, WorkerSnapRequest>* snapReqMap /* ongoing snapshot requests */,
std::map<std::string, ErrorOr<Void>>*
snapReqResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
state std::string snapReqKey = snapReq.snapUID.toString() + snapReq.role.toString();
try {
int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder.toString(), snapReq.role.toString()));
int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder, snapReq.role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
@ -1432,11 +1438,15 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, Standalone<String
if (snapReq.role.toString() == "storage") {
printStorageVersionInfo();
}
snapReq.reply.send(Void());
snapReqMap->at(snapReqKey).reply.send(Void());
snapReqMap->erase(snapReqKey);
(*snapReqResultMap)[snapReqKey] = ErrorOr<Void>(Void());
} catch (Error& e) {
TraceEvent("ExecHelperError").errorUnsuppressed(e);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
snapReqMap->at(snapReqKey).reply.sendError(e);
snapReqMap->erase(snapReqKey);
(*snapReqResultMap)[snapReqKey] = ErrorOr<Void>(e);
} else {
throw e;
}
@ -1584,6 +1594,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
state WorkerSnapRequest lastSnapReq;
// Here the key is UID+role, as we still send duplicate requests to a process which is both storage and tlog
state std::map<std::string, WorkerSnapRequest> snapReqMap;
state std::map<std::string, ErrorOr<Void>> snapReqResultMap;
state double lastSnapTime = -SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP; // always successful for the first Snap Request
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
@ -2497,11 +2512,49 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace);
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
std::string snapUID = snapReq.snapUID.toString() + snapReq.role.toString();
if (snapReqResultMap.count(snapUID)) {
TEST(true); // Worker received a duplicate finished snap request
auto result = snapReqResultMap[snapUID];
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
TraceEvent("RetryFinishedWorkerSnapRequest")
.detail("SnapUID", snapUID)
.detail("Role", snapReq.role)
.detail("Result", result.isError() ? result.getError().code() : 0);
} else if (snapReqMap.count(snapUID)) {
TEST(true); // Worker received a duplicate ongoing snap request
TraceEvent("RetryOngoingWorkerSnapRequest").detail("SnapUID", snapUID).detail("Role", snapReq.role);
ASSERT(snapReq.role == snapReqMap[snapUID].role);
ASSERT(snapReq.snapPayload == snapReqMap[snapUID].snapPayload);
snapReqMap[snapUID] = snapReq;
} else {
snapReqMap[snapUID] = snapReq; // set map point to the request
if (g_network->isSimulated() && (now() - lastSnapTime) < SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP) {
// only allow duplicate snapshots on same process in a short time for different roles
auto okay = (lastSnapReq.snapUID == snapReq.snapUID) && lastSnapReq.role != snapReq.role;
TraceEvent(okay ? SevInfo : SevError, "RapidSnapRequestsOnSameProcess")
.detail("CurrSnapUID", snapUID)
.detail("PrevSnapUID", lastSnapReq.snapUID)
.detail("CurrRole", snapReq.role)
.detail("PrevRole", lastSnapReq.role)
.detail("GapTime", now() - lastSnapTime);
}
errorForwarders.add(workerSnapCreate(snapReq,
snapReq.role.toString() == "coord" ? coordFolder : folder,
&snapReqMap,
&snapReqResultMap));
auto* snapReqResultMapPtr = &snapReqResultMap;
errorForwarders.add(fmap(
[snapReqResultMapPtr, snapUID](Void _) {
snapReqResultMapPtr->erase(snapUID);
return Void();
},
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
if (g_network->isSimulated()) {
lastSnapReq = snapReq;
lastSnapTime = now();
}
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when(wait(errorForwarders.getResult())) {}
when(wait(handleErrors)) {}

View File

@ -0,0 +1,74 @@
/*
* GlobalTagThrottling.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class GlobalTagThrottlingWorkload : public TestWorkload {
TransactionTag transactionTag;
double reservedReadQuota{ 0.0 };
double totalReadQuota{ 0.0 };
double reservedWriteQuota{ 0.0 };
double totalWriteQuota{ 0.0 };
ACTOR static Future<Void> setup(GlobalTagThrottlingWorkload* self, Database cx) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
TraceEvent("GlobalTagThrottlingWorkload_SettingTagQuota")
.detail("Tag", self->transactionTag)
.detail("ReservedReadQuota", self->reservedReadQuota)
.detail("TotalReadQuota", self->totalReadQuota)
.detail("ReservedWriteQuota", self->reservedWriteQuota)
.detail("TotalWriteQuota", self->totalWriteQuota);
ThrottleApi::setTagQuota(tr,
self->transactionTag,
self->reservedReadQuota,
self->totalReadQuota,
self->reservedWriteQuota,
self->totalWriteQuota);
wait(tr->commit());
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
};
}
public:
explicit GlobalTagThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
transactionTag = getOption(options, "transactionTag"_sr, "sampleTag"_sr);
reservedReadQuota = getOption(options, "reservedReadQuota"_sr, 0.0);
totalReadQuota = getOption(options, "totalReadQuota"_sr, 0.0);
reservedWriteQuota = getOption(options, "reservedWriteQuota"_sr, 0.0);
totalWriteQuota = getOption(options, "totalWriteQuota"_sr, 0.0);
}
std::string description() const override { return "GlobalTagThrottling"; }
Future<Void> setup(Database const& cx) override { return clientId ? Void() : setup(this, cx); }
Future<Void> start(Database const& cx) override { return Void(); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<GlobalTagThrottlingWorkload> GlobalTagThrottlingWorkloadFactory("GlobalTagThrottling");

View File

@ -101,7 +101,7 @@ struct ReadHotDetectionWorkload : TestWorkload {
StorageMetrics sm = wait(cx->getStorageMetrics(self->wholeRange, 100));
// TraceEvent("RHDCheckPhaseLog")
// .detail("KeyRangeSize", sm.bytes)
// .detail("KeyRangeReadBandwith", sm.bytesReadPerKSecond);
// .detail("KeyRangeReadBandwidth", sm.bytesReadPerKSecond);
Standalone<VectorRef<ReadHotRangeWithMetrics>> keyRanges = wait(cx->getReadHotRanges(self->wholeRange));
// TraceEvent("RHDCheckPhaseLog")
// .detail("KeyRangesSize", keyRanges.size())

View File

@ -61,6 +61,7 @@ struct ReadWriteCommonImpl {
throw;
}
}
ACTOR static Future<Void> tracePeriodically(ReadWriteCommon* self) {
state double start = now();
state double elapsed = 0.0;
@ -376,6 +377,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
bool adjacentReads; // keys are adjacent within a transaction
bool adjacentWrites;
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
Optional<Key> transactionTag;
int transactionsTagThrottled{ 0 };
// hot traffic pattern
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
@ -397,6 +401,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
if (hasOption(options, LiteralStringRef("transactionTag"))) {
transactionTag = getOption(options, LiteralStringRef("transactionTag"), ""_sr);
}
if (rampUpConcurrency)
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
@ -415,15 +422,18 @@ struct ReadWriteWorkload : ReadWriteCommon {
}
}
std::string description() const override { return descriptionString.toString(); }
template <class Trans>
void setupTransaction(Trans* tr) {
void setupTransaction(Trans& tr) {
if (batchPriority) {
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
tr.setOption(FDBTransactionOptions::PRIORITY_BATCH);
}
if (transactionTag.present() && tr.getTags().size() == 0) {
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, transactionTag.get());
}
}
std::string description() const override { return descriptionString.toString(); }
void getMetrics(std::vector<PerfMetric>& m) override {
ReadWriteCommon::getMetrics(m);
if (!rampUpLoad) {
@ -449,6 +459,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
if (transactionTag.present()) {
m.emplace_back("Transaction Tag Throttled", transactionsTagThrottled, Averaged::False);
}
}
}
@ -494,11 +507,14 @@ struct ReadWriteWorkload : ReadWriteCommon {
state Transaction tr(cx);
try {
self->setupTransaction(&tr);
self->setupTransaction(tr);
wait(self->readOp(&tr, keys, self, false));
wait(tr.warmRange(allKeys));
break;
} catch (Error& e) {
if (e.code() == error_code_tag_throttled) {
++self->transactionsTagThrottled;
}
wait(tr.onError(e));
}
}
@ -625,7 +641,7 @@ struct ReadWriteWorkload : ReadWriteCommon {
loop {
try {
self->setupTransaction(&tr);
self->setupTransaction(tr);
GRVStartTime = now();
self->transactionFailureMetric->startLatency = -1;

View File

@ -71,14 +71,12 @@ struct SaveAndKillWorkload : TestWorkload {
std::map<NetworkAddress, ISimulator::ProcessInfo*> rebootingProcesses = g_simulator.currentlyRebootingProcesses;
std::map<std::string, ISimulator::ProcessInfo*> allProcessesMap;
for (const auto& [_, process] : rebootingProcesses) {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() &&
process->name != "remote flow process") {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() && !process->isSpawnedKVProcess()) {
allProcessesMap[process->dataFolder] = process;
}
}
for (const auto& process : processes) {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() &&
process->name != "remote flow process") {
if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() && !process->isSpawnedKVProcess()) {
allProcessesMap[process->dataFolder] = process;
}
}

View File

@ -196,7 +196,7 @@ ERROR( key_not_tuple, 2041, "The key cannot be parsed as a tuple" );
ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" );
ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" );
ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" )
ERROR( invalid_throttle_quota_value, 2045, "Failed to deserialize or initialize throttle quota value" )
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )

View File

@ -208,6 +208,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/CycleWithKills.toml)
add_fdb_test(TEST_FILES rare/CycleWithDeadHall.toml)
add_fdb_test(TEST_FILES rare/FuzzTest.toml)
add_fdb_test(TEST_FILES rare/GlobalTagThrottling.toml IGNORE)
add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml)
add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml)
add_fdb_test(TEST_FILES rare/LargeApiCorrectness.toml)

View File

@ -0,0 +1,41 @@
[[test]]
testTitle='GlobalTagThrottling'
[[test.knobs]]
min_tag_read_pages_rate=1.0
global_tag_throttling=true
[[test.workload]]
testName='GlobalTagThrottling'
transactionTag='sampleTag1'
totalReadQuota=1.0
[[test.workload]]
testName='ReadWrite'
testDuration=600.0
transactionsPerSecond=100
writesPerTransactionA=0
readsPerTransactionA=10
writesPerTransactionB=0
readsPerTransactionB=0
alpha=0.0
nodeCount=10000
valueBytes=1000
minValueBytes=1000
warmingDelay=60.0
transactionTag='sampleTag1'
[[test.workload]]
testName='ReadWrite'
testDuration=600.0
transactionsPerSecond=100
writesPerTransactionA=0
readsPerTransactionA=10
writesPerTransactionB=0
readsPerTransactionB=0
alpha=0.0
nodeCount=10000
valueBytes=1000
minValueBytes=1000
warmingDelay=60.0
transactionTag='sampleTag2'