simplify test workload and adjust ratekeeper throttling strategy

This commit is contained in:
Jon Fu 2022-02-11 16:41:14 -05:00
parent 2e63ac6963
commit a63d218e9d
5 changed files with 64 additions and 98 deletions

View File

@ -198,8 +198,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
bool locked;
Optional<Value> metadataVersion;
int64_t midShardSize = 0;
uint32_t queueIterations = 0;
bool rkThrottled = false;
bool rkDefaultThrottled = false;
bool rkBatchThrottled = false;
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
@ -214,8 +214,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
metadataVersion,
tagThrottleInfo,
midShardSize,
queueIterations,
rkThrottled);
rkDefaultThrottled,
rkBatchThrottled);
}
};
@ -238,22 +238,21 @@ struct GetReadVersionRequest : TimedRequest {
uint32_t transactionCount;
uint32_t flags;
TransactionPriority priority;
uint32_t queueIterations;
TransactionTagMap<uint32_t> tags;
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount(1), flags(0), queueIterations(0) {}
GetReadVersionRequest() : transactionCount(1), flags(0) {}
GetReadVersionRequest(SpanID spanContext,
uint32_t transactionCount,
TransactionPriority priority,
uint32_t flags = 0,
TransactionTagMap<uint32_t> tags = TransactionTagMap<uint32_t>(),
Optional<UID> debugID = Optional<UID>())
: spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority),
queueIterations(0), tags(tags), debugID(debugID) {
: spanContext(spanContext), transactionCount(transactionCount), flags(flags), priority(priority), tags(tags),
debugID(debugID) {
flags = flags & ~FLAG_PRIORITY_MASK;
switch (priority) {
case TransactionPriority::BATCH:
@ -274,7 +273,7 @@ struct GetReadVersionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, queueIterations);
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext);
if (ar.isDeserializing) {
if ((flags & PRIORITY_SYSTEM_IMMEDIATE) == PRIORITY_SYSTEM_IMMEDIATE) {

View File

@ -475,7 +475,8 @@ public:
void updateCachedReadVersion(double t, Version v);
Version getCachedReadVersion();
double getLastGrvTime();
double lastRkThrottleTime;
double lastRkBatchThrottleTime;
double lastRkDefaultThrottleTime;
// Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether
// Because our checks for ratekeeper throttling requires communication with the proxies,
// we want to track the last time in order to periodically contact the proxy to check for throttling

View File

@ -1306,10 +1306,10 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkThrottleTime(0.0),
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0),
lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID),
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
@ -5963,11 +5963,15 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
Promise<Optional<Value>> metadataVersion) {
state Span span(spanContext, location, { trState->spanID });
GetReadVersionReply rep = wait(f);
double latency = now() - trState->startTime;
double replyTime = now();
double latency = replyTime - trState->startTime;
trState->cx->lastProxyRequestTime = trState->startTime;
trState->cx->updateCachedReadVersion(trState->startTime, rep.version);
if (rep.rkThrottled && trState->options.priority != TransactionPriority::IMMEDIATE) {
trState->cx->lastRkThrottleTime = now();
if (rep.rkBatchThrottled) {
trState->cx->lastRkBatchThrottleTime = replyTime;
}
if (rep.rkDefaultThrottled) {
trState->cx->lastRkDefaultThrottleTime = replyTime;
}
trState->cx->GRVLatencies.addSample(latency);
if (trState->trLogInfo)
@ -6025,11 +6029,21 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
return rep.version;
}
bool rkThrottlingCooledDown(DatabaseContext* cx) {
if (cx->lastRkThrottleTime == 0.0) {
bool rkThrottlingCooledDown(DatabaseContext* cx, TransactionPriority priority) {
if (priority == TransactionPriority::IMMEDIATE) {
return true;
} else if (priority == TransactionPriority::BATCH) {
if (cx->lastRkBatchThrottleTime == 0.0) {
return true;
}
return (now() - cx->lastRkBatchThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
} else if (priority == TransactionPriority::DEFAULT) {
if (cx->lastRkDefaultThrottleTime == 0.0) {
return true;
}
return (now() - cx->lastRkDefaultThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
}
return (now() - cx->lastRkThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
return false;
}
Future<Version> Transaction::getReadVersion(uint32_t flags) {
@ -6037,7 +6051,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
if (!CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && !trState->options.skipGrvCache &&
(deterministicRandom()->random01() <= CLIENT_KNOBS->DEBUG_USE_GRV_CACHE_CHANCE ||
trState->options.useGrvCache) &&
rkThrottlingCooledDown(getDatabase().getPtr())) {
rkThrottlingCooledDown(getDatabase().getPtr(), trState->options.priority)) {
// Upon our first request to use cached RVs, start the background updater
if (!trState->cx->grvUpdateHandler.isValid()) {
trState->cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr());

View File

@ -47,8 +47,10 @@ struct GrvProxyStats {
double percentageOfDefaultGRVQueueProcessed;
double percentageOfBatchGRVQueueProcessed;
bool lastTxnThrottled;
double throttleStartTime;
bool lastBatchQueueThrottled;
bool lastDefaultQueueThrottled;
double batchThrottleStartTime;
double defaultThrottleStartTime;
LatencySample defaultTxnGRVTimeInQueue;
LatencySample batchTxnGRVTimeInQueue;
@ -102,7 +104,8 @@ struct GrvProxyStats {
updatesFromRatekeeper("UpdatesFromRatekeeper", cc), leaseTimeouts("LeaseTimeouts", cc), systemGRVQueueSize(0),
defaultGRVQueueSize(0), batchGRVQueueSize(0), transactionRateAllowed(0), batchTransactionRateAllowed(0),
transactionLimit(0), batchTransactionLimit(0), percentageOfDefaultGRVQueueProcessed(0),
percentageOfBatchGRVQueueProcessed(0), lastTxnThrottled(false), throttleStartTime(0.0),
percentageOfBatchGRVQueueProcessed(0), lastBatchQueueThrottled(false), lastDefaultQueueThrottled(false),
batchThrottleStartTime(0.0), defaultThrottleStartTime(0.0),
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue",
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -653,31 +656,16 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
}
}
reply.queueIterations = request.queueIterations;
TraceEvent(SevDebug, "DebugGrvProxyThrottleCheck")
.detail("QueueIterations", reply.queueIterations)
.detail("ThrottleThreshold", CLIENT_KNOBS->GRV_THROTTLING_THRESHOLD)
.detail("LastTxnThrottled", stats->lastTxnThrottled)
.detail("ThrottleStartTime", format("%.6f", stats->throttleStartTime))
.detail("Diff", now() - stats->throttleStartTime)
.detail("SustainedThrottlingThreshold", CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD);
if (reply.queueIterations >= CLIENT_KNOBS->GRV_THROTTLING_THRESHOLD) {
if (stats->lastTxnThrottled) {
// Check if this throttling has been sustained for a certain amount of time to avoid false positives
if (now() - stats->throttleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) {
reply.rkThrottled = true;
}
} else { // !stats->lastTxnThrottled
// If not previously throttled, this request/reply is our new starting point
// for judging whether we are being actively throttled by ratekeeper now
stats->lastTxnThrottled = true;
stats->throttleStartTime = now();
if (stats->lastBatchQueueThrottled) {
// Check if this throttling has been sustained for a certain amount of time to avoid false positives
if (now() - stats->batchThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) {
reply.rkBatchThrottled = true;
}
} else {
// If an immediate priority txn comes in, it may get processed on the first time through and
// reset our throttling state prematurely.
if (request.priority != TransactionPriority::IMMEDIATE) {
stats->lastTxnThrottled = false;
}
if (stats->lastDefaultQueueThrottled) {
// Check if this throttling has been sustained for a certain amount of time to avoid false positives
if (now() - stats->defaultThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) {
reply.rkDefaultThrottled = true;
}
}
request.reply.send(reply);
@ -841,7 +829,6 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
// transactionQueue->span.swap(span);
auto& req = transactionQueue->front();
req.queueIterations++;
int tc = req.transactionCount;
if (req.priority < TransactionPriority::DEFAULT &&
@ -877,6 +864,18 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
transactionQueue->pop_front();
requestsToStart++;
}
if (!batchQueue.empty()) {
grvProxyData->stats.lastBatchQueueThrottled = true;
grvProxyData->stats.batchThrottleStartTime = now();
} else {
grvProxyData->stats.lastBatchQueueThrottled = false;
}
if (!defaultQueue.empty()) {
grvProxyData->stats.lastDefaultQueueThrottled = true;
grvProxyData->stats.defaultThrottleStartTime = now();
} else {
grvProxyData->stats.lastDefaultQueueThrottled = false;
}
if (!systemQueue.empty() || !defaultQueue.empty() || !batchQueue.empty()) {
forwardPromise(

View File

@ -74,11 +74,7 @@ struct SidebandSingleWorkload : TestWorkload {
}
std::string description() const override { return "SidebandSingleWorkload"; }
Future<Void> setup(Database const& cx) override {
if (clientId != 0)
return Void();
return persistInterface(this, cx);
}
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (clientId != 0)
return Void();
@ -108,51 +104,8 @@ struct SidebandSingleWorkload : TestWorkload {
m.push_back(keysUnexpectedlyPresent.getMetric());
}
ACTOR Future<Void> persistInterface(SidebandSingleWorkload* self, Database cx) {
state Transaction tr(cx);
BinaryWriter wr(IncludeVersion());
wr << self->interf;
state Standalone<StringRef> serializedInterface = wr.toValue();
loop {
try {
Optional<Value> val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId))));
if (val.present()) {
if (val.get() != serializedInterface)
throw operation_failed();
break;
}
tr.set(format("Sideband/Client/%d", self->clientId), serializedInterface);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("SidebandPersisted", self->interf.id()).detail("ClientIdx", self->clientId);
return Void();
}
ACTOR Future<SidebandInterface> fetchSideband(SidebandSingleWorkload* self, Database cx) {
state Transaction tr(cx);
loop {
try {
Optional<Value> val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId))));
if (!val.present()) {
throw operation_failed();
}
SidebandInterface sideband;
BinaryReader br(val.get(), IncludeVersion());
br >> sideband;
TraceEvent("SidebandFetched", sideband.id()).detail("ClientIdx", self->clientId);
return sideband;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> mutator(SidebandSingleWorkload* self, Database cx) {
state SidebandInterface checker = wait(self->fetchSideband(self, cx));
state SidebandInterface checker = self->interf;
state double lastTime = now();
state Version commitVersion;
state bool unknown = false;