do not prevent computePerOperation from being updated for small computeDurations. Added logging for the compute per operation. Protect against erroneously large compute estimates

This commit is contained in:
Evan Tschannen 2020-10-04 19:19:05 -07:00
parent da26b0411c
commit f546034366
3 changed files with 24 additions and 4 deletions

View File

@ -369,8 +369,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01; init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01;
init( ALWAYS_CAUSAL_READ_RISKY, false ); init( ALWAYS_CAUSAL_READ_RISKY, false );
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1; init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
init( MIN_PROXY_COMPUTE, 0.001 );
init( MAX_PROXY_COMPUTE, 2.0 ); init( MAX_PROXY_COMPUTE, 2.0 );
init( MAX_COMPUTE_PER_OPERATION, 0.1 );
init( PROXY_COMPUTE_BUCKETS, 20000 ); init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 ); init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 4 ); init( TXN_STATE_SEND_AMOUNT, 4 );

View File

@ -299,8 +299,8 @@ public:
double REQUIRED_MIN_RECOVERY_DURATION; double REQUIRED_MIN_RECOVERY_DURATION;
bool ALWAYS_CAUSAL_READ_RISKY; bool ALWAYS_CAUSAL_READ_RISKY;
int MAX_COMMIT_UPDATES; int MAX_COMMIT_UPDATES;
double MIN_PROXY_COMPUTE;
double MAX_PROXY_COMPUTE; double MAX_PROXY_COMPUTE;
double MAX_COMPUTE_PER_OPERATION;
int PROXY_COMPUTE_BUCKETS; int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE; double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT; int TXN_STATE_SEND_AMOUNT;

View File

@ -101,6 +101,9 @@ struct ProxyStats {
double lastBucketBegin; double lastBucketBegin;
double bucketInterval; double bucketInterval;
double maxCompute;
double minCompute;
void updateRequestBuckets() { void updateRequestBuckets() {
while(now() - lastBucketBegin > bucketInterval) { while(now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval; lastBucketBegin += bucketInterval;
@ -121,8 +124,21 @@ struct ProxyStats {
return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now())); return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
} }
double getAndResetMaxCompute() {
double r = maxCompute;
maxCompute = 0;
return r;
}
double getAndResetMinCompute() {
double r = minCompute;
minCompute = 1000;
return r;
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr) explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), : cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
maxCompute(0), minCompute(1000),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS), bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
@ -148,6 +164,8 @@ struct ProxyStats {
specialCounter(cc, "Version", [pVersion](){return *pVersion; }); specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); }); specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; }); specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); });
specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics"); logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) { for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0); requestBuckets.push_back(0);
@ -1246,13 +1264,15 @@ ACTOR Future<Void> commitBatch(
} }
computeDuration += g_network->timer() - computeStart; computeDuration += g_network->timer() - computeStart;
if(computeDuration > SERVER_KNOBS->MIN_PROXY_COMPUTE && batchOperations > 0) { if(batchOperations > 0) {
double computePerOperation = computeDuration/batchOperations; double computePerOperation = std::min( SERVER_KNOBS->MAX_COMPUTE_PER_OPERATION, computeDuration/batchOperations );
if(computePerOperation <= self->commitComputePerOperation[latencyBucket]) { if(computePerOperation <= self->commitComputePerOperation[latencyBucket]) {
self->commitComputePerOperation[latencyBucket] = computePerOperation; self->commitComputePerOperation[latencyBucket] = computePerOperation;
} else { } else {
self->commitComputePerOperation[latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*self->commitComputePerOperation[latencyBucket]); self->commitComputePerOperation[latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*self->commitComputePerOperation[latencyBucket]);
} }
self->stats.maxCompute = std::max(self->stats.maxCompute, self->commitComputePerOperation[latencyBucket]);
self->stats.minCompute = std::min(self->stats.minCompute, self->commitComputePerOperation[latencyBucket]);
} }
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above)) /////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))