Added two new counters for transaction throttled error and remove the verbose trace event logging. Also changed a chain of 'if' statements into 'if-else' statements since they are mutal exclusive
This commit is contained in:
parent
800d2e560c
commit
8d28c2a7f0
|
@ -172,6 +172,7 @@ public:
|
|||
Counter transactionsMaybeCommitted;
|
||||
Counter transactionsResourceConstrained;
|
||||
Counter transactionsProcessBehind;
|
||||
Counter transactionsThrottled;
|
||||
|
||||
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
|
||||
|
||||
|
|
|
@ -510,19 +510,24 @@ ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bo
|
|||
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
|
||||
return getHealthMetricsActor(this, detailed);
|
||||
}
|
||||
DatabaseContext::DatabaseContext(
|
||||
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
|
||||
TaskPriority taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion, bool switchable )
|
||||
: connectionFile(connectionFile),clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
|
||||
lockAware(lockAware), apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"),
|
||||
transactionReadVersions("ReadVersions", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
|
||||
transactionCommittedMutations("CommittedMutations", cc), transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionsCommitStarted("CommitStarted", cc),
|
||||
transactionsCommitCompleted("CommitCompleted", cc), transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
|
||||
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
|
||||
transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal)
|
||||
{
|
||||
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
|
||||
TaskPriority taskID, LocalityData const& clientLocality,
|
||||
bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion,
|
||||
bool switchable)
|
||||
: connectionFile(connectionFile), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID),
|
||||
clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"),
|
||||
transactionReadVersions("ReadVersions", cc), transactionLogicalReads("LogicalUncachedReads", cc),
|
||||
transactionPhysicalReads("PhysicalReadRequests", cc), transactionCommittedMutations("CommittedMutations", cc),
|
||||
transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionsCommitStarted("CommitStarted", cc),
|
||||
transactionsCommitCompleted("CommitCompleted", cc), transactionsTooOld("TooOld", cc),
|
||||
transactionsFutureVersions("FutureVersions", cc), transactionsNotCommitted("NotCommitted", cc),
|
||||
transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
|
||||
transactionsThrottled("Throttled", cc), transactionsProcessBehind("ProcessBehind", cc), outstandingWatches(0),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||
bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
internal(internal) {
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
connected = clientInfo->get().proxies.size() ? Void() : clientInfo->onChange();
|
||||
|
||||
|
@ -3145,12 +3150,14 @@ Future<Void> Transaction::onError( Error const& e ) {
|
|||
{
|
||||
if(e.code() == error_code_not_committed)
|
||||
++cx->transactionsNotCommitted;
|
||||
if(e.code() == error_code_commit_unknown_result)
|
||||
else if (e.code() == error_code_commit_unknown_result)
|
||||
++cx->transactionsMaybeCommitted;
|
||||
if (e.code() == error_code_proxy_memory_limit_exceeded || e.code() == error_code_batch_transaction_throttled)
|
||||
else if (e.code() == error_code_proxy_memory_limit_exceeded)
|
||||
++cx->transactionsResourceConstrained;
|
||||
if (e.code() == error_code_process_behind)
|
||||
else if (e.code() == error_code_process_behind)
|
||||
++cx->transactionsProcessBehind;
|
||||
else if (e.code() == error_code_batch_transaction_throttled)
|
||||
++cx->transactionsThrottled;
|
||||
|
||||
double backoff = getBackoff(e.code());
|
||||
reset();
|
||||
|
|
|
@ -56,6 +56,7 @@ struct ProxyStats {
|
|||
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
|
||||
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess;
|
||||
Counter txnConflicts;
|
||||
Counter txnThrottled;
|
||||
Counter commitBatchIn, commitBatchOut;
|
||||
Counter mutationBytes;
|
||||
Counter mutations;
|
||||
|
@ -80,8 +81,8 @@ struct ProxyStats {
|
|||
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
|
||||
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
|
||||
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc),
|
||||
commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc),
|
||||
mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
|
||||
txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc),
|
||||
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
|
||||
keyServerLocationRequests("KeyServerLocationRequests", cc), lastCommitVersionAssigned(0),
|
||||
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
|
||||
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
|
||||
|
@ -186,10 +187,8 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
|||
// Return error for batch_priority GRV requests
|
||||
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
|
||||
if (batchRateInfo->rate <= (1.0 / proxiesCount)) {
|
||||
TraceEvent(SevInfo, "RejectedBatchGRV")
|
||||
.detail("CurrentBatchRateLimit", batchRateInfo->rate)
|
||||
.suppressFor(5);
|
||||
req.reply.sendError(batch_transaction_throttled());
|
||||
stats->txnThrottled += req.transactionCount;
|
||||
continue;
|
||||
}
|
||||
stats->txnBatchPriorityStartIn += req.transactionCount;
|
||||
|
|
Loading…
Reference in New Issue