Differentiate between different quotas in GlobalTagThrottling workload
This commit is contained in:
parent
2ae5a6c86b
commit
75c858eb2c
|
@ -115,23 +115,7 @@ public:
|
|||
|
||||
bool canRecheck() const { return lastCheck < now() - CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL; }
|
||||
|
||||
double throttleDuration() const {
|
||||
if (expiration <= now()) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
double capacity =
|
||||
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
|
||||
if (capacity >= 1) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
if (tpsRate == 0) {
|
||||
return std::max(0.0, expiration - now());
|
||||
}
|
||||
|
||||
return std::min(expiration - now(), capacity / tpsRate);
|
||||
}
|
||||
double throttleDuration() const;
|
||||
};
|
||||
|
||||
struct WatchParameters : public ReferenceCounted<WatchParameters> {
|
||||
|
|
|
@ -5146,8 +5146,9 @@ Future<Optional<Value>> Transaction::get(const Key& key, Snapshot snapshot) {
|
|||
if (!ver.isReady() || metadataVersion.isSet()) {
|
||||
return metadataVersion.getFuture();
|
||||
} else {
|
||||
if (ver.isError())
|
||||
if (ver.isError()) {
|
||||
return ver.getError();
|
||||
}
|
||||
if (ver.get() == trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) {
|
||||
return trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].second;
|
||||
}
|
||||
|
@ -7059,6 +7060,26 @@ Future<ProtocolVersion> DatabaseContext::getClusterProtocol(Optional<ProtocolVer
|
|||
return getClusterProtocolImpl(coordinator, expectedVersion);
|
||||
}
|
||||
|
||||
double ClientTagThrottleData::throttleDuration() const {
|
||||
if (expiration <= now()) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
double capacity =
|
||||
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
|
||||
|
||||
if (capacity >= 1) {
|
||||
TraceEvent("DEBUG_ReturningEarly1");
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
if (tpsRate == 0) {
|
||||
return std::max(0.0, expiration - now());
|
||||
}
|
||||
|
||||
return std::min(expiration - now(), capacity / tpsRate);
|
||||
}
|
||||
|
||||
uint32_t Transaction::getSize() {
|
||||
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
|
||||
tr.transaction.write_conflict_ranges.expectedSize();
|
||||
|
|
|
@ -87,15 +87,16 @@ class GlobalTagThrottlerImpl {
|
|||
}
|
||||
|
||||
void processTraceEvent(TraceEvent& te) const {
|
||||
ASSERT(quota.present());
|
||||
te.detail("ProvidedReadLimit", getReadLimit())
|
||||
.detail("ProvidedWriteLimit", getWriteLimit())
|
||||
.detail("ReadCostRate", readCostCounter.smoothRate())
|
||||
.detail("WriteCostRate", writeCostCounter.smoothRate())
|
||||
.detail("TotalReadQuota", quota.get().totalReadQuota)
|
||||
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
|
||||
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
|
||||
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
|
||||
if (quota.present()) {
|
||||
te.detail("ProvidedReadLimit", getReadLimit())
|
||||
.detail("ProvidedWriteLimit", getWriteLimit())
|
||||
.detail("ReadCostRate", readCostCounter.smoothRate())
|
||||
.detail("WriteCostRate", writeCostCounter.smoothRate())
|
||||
.detail("TotalReadQuota", quota.get().totalReadQuota)
|
||||
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
|
||||
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
|
||||
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -25,7 +25,10 @@
|
|||
|
||||
class GlobalTagThrottlingWorkload : public TestWorkload {
|
||||
TransactionTag transactionTag;
|
||||
double quota;
|
||||
double reservedReadQuota{ 0.0 };
|
||||
double totalReadQuota{ 0.0 };
|
||||
double reservedWriteQuota{ 0.0 };
|
||||
double totalWriteQuota{ 0.0 };
|
||||
|
||||
ACTOR static Future<Void> setup(GlobalTagThrottlingWorkload* self, Database cx) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
@ -34,8 +37,16 @@ class GlobalTagThrottlingWorkload : public TestWorkload {
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TraceEvent("GlobalTagThrottlingWorkload_SettingTagQuota")
|
||||
.detail("Tag", self->transactionTag)
|
||||
.detail("Quota", self->quota);
|
||||
ThrottleApi::setTagQuota(tr, self->transactionTag, self->quota, self->quota, 0, 0);
|
||||
.detail("ReservedReadQuota", self->reservedReadQuota)
|
||||
.detail("TotalReadQuota", self->totalReadQuota)
|
||||
.detail("ReservedWriteQuota", self->reservedWriteQuota)
|
||||
.detail("TotalWriteQuota", self->totalWriteQuota);
|
||||
ThrottleApi::setTagQuota(tr,
|
||||
self->transactionTag,
|
||||
self->reservedReadQuota,
|
||||
self->totalReadQuota,
|
||||
self->reservedWriteQuota,
|
||||
self->totalWriteQuota);
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -47,7 +58,10 @@ class GlobalTagThrottlingWorkload : public TestWorkload {
|
|||
public:
|
||||
explicit GlobalTagThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
transactionTag = getOption(options, "transactionTag"_sr, "sampleTag"_sr);
|
||||
quota = getOption(options, "quota"_sr, 100.0);
|
||||
reservedReadQuota = getOption(options, "reservedReadQuota"_sr, 0.0);
|
||||
totalReadQuota = getOption(options, "totalReadQuota"_sr, 0.0);
|
||||
reservedWriteQuota = getOption(options, "reservedWriteQuota"_sr, 0.0);
|
||||
totalWriteQuota = getOption(options, "totalWriteQuota"_sr, 0.0);
|
||||
}
|
||||
|
||||
std::string description() const override { return "GlobalTagThrottling"; }
|
||||
|
|
|
@ -379,6 +379,8 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
||||
Optional<Key> transactionTag;
|
||||
|
||||
int transactionsTagThrottled{ 0 };
|
||||
|
||||
// hot traffic pattern
|
||||
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
|
||||
|
||||
|
@ -399,6 +401,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
||||
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
||||
if (hasOption(options, LiteralStringRef("transactionTag"))) {
|
||||
transactionTag = getOption(options, LiteralStringRef("transactionTag"), ""_sr);
|
||||
}
|
||||
|
||||
if (rampUpConcurrency)
|
||||
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
||||
|
@ -454,6 +459,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
||||
if (transactionTag.present()) {
|
||||
m.emplace_back("Transaction Tag Throttled", transactionsTagThrottled, Averaged::False);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,6 +512,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
wait(tr.warmRange(allKeys));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_tag_throttled) {
|
||||
++self->transactionsTagThrottled;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ testTitle='GlobalTagThrottling'
|
|||
[[test.workload]]
|
||||
testName='GlobalTagThrottling'
|
||||
transactionTag='sampleTag1'
|
||||
quota=100.0
|
||||
totalReadQuota=1.0
|
||||
|
||||
[[test.workload]]
|
||||
testName='ReadWrite'
|
||||
|
|
Loading…
Reference in New Issue