Merge pull request #1311 from etschannen/feature-increase-grv-batch
Increased the GRV client batch size
This commit is contained in:
commit
2d7b48dadc
|
@ -60,7 +60,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
||||||
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - serverKeysPrefixFor(UID()).size() - 1;
|
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - serverKeysPrefixFor(UID()).size() - 1;
|
||||||
init( METADATA_VERSION_CACHE_SIZE, 1000 );
|
init( METADATA_VERSION_CACHE_SIZE, 1000 );
|
||||||
|
|
||||||
init( MAX_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; // Note that SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE is set to match this value
|
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
|
||||||
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
|
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
|
||||||
|
|
||||||
init( LOCATION_CACHE_EVICTION_SIZE, 300000 );
|
init( LOCATION_CACHE_EVICTION_SIZE, 300000 );
|
||||||
|
|
|
@ -249,8 +249,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
||||||
init( START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION, 0.5 );
|
init( START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION, 0.5 );
|
||||||
init( START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA, 0.1 );
|
init( START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA, 0.1 );
|
||||||
init( START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, 0.001 );
|
init( START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, 0.001 );
|
||||||
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 10000 );
|
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 );
|
||||||
init( START_TRANSACTION_MAX_BUDGET_SIZE, 20 ); // Currently set to match CLIENT_KNOBS->MAX_BATCH_SIZE
|
init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 );
|
||||||
|
|
||||||
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
|
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
|
||||||
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;
|
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;
|
||||||
|
|
|
@ -198,7 +198,7 @@ public:
|
||||||
double START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA;
|
double START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA;
|
||||||
double START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL;
|
double START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL;
|
||||||
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
|
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
|
||||||
double START_TRANSACTION_MAX_BUDGET_SIZE;
|
int START_TRANSACTION_MAX_REQUESTS_TO_START;
|
||||||
|
|
||||||
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
|
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
|
||||||
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;
|
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;
|
||||||
|
|
|
@ -1069,22 +1069,20 @@ ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
|
||||||
|
|
||||||
struct TransactionRateInfo {
|
struct TransactionRateInfo {
|
||||||
double rate;
|
double rate;
|
||||||
double budget;
|
|
||||||
|
|
||||||
double limit;
|
double limit;
|
||||||
|
|
||||||
TransactionRateInfo(double rate) : rate(rate), budget(0), limit(0) {}
|
TransactionRateInfo(double rate) : rate(rate), limit(0) {}
|
||||||
|
|
||||||
void reset(double elapsed) {
|
void reset(double elapsed) {
|
||||||
this->limit = std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + budget;
|
limit = std::min(0.0,limit) + std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool canStart(int64_t numToStart, int64_t numAlreadyStarted) {
|
bool canStart(int64_t numAlreadyStarted) {
|
||||||
return numToStart + numAlreadyStarted < limit || numToStart * g_random->random01() + numAlreadyStarted < limit - std::max(0.0, budget);
|
return numAlreadyStarted < limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateBudget(int64_t numStarted) {
|
void updateBudget(int64_t numStarted) {
|
||||||
budget = std::max(std::min<double>(limit - numStarted, SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
|
limit -= numStarted;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1156,14 +1154,15 @@ ACTOR static Future<Void> transactionStarter(
|
||||||
|
|
||||||
double leftToStart = 0;
|
double leftToStart = 0;
|
||||||
double batchLeftToStart = 0;
|
double batchLeftToStart = 0;
|
||||||
while (!transactionQueue.empty()) {
|
int requestsToStart = 0;
|
||||||
|
while (!transactionQueue.empty() && requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
|
||||||
auto& req = transactionQueue.top().first;
|
auto& req = transactionQueue.top().first;
|
||||||
int tc = req.transactionCount;
|
int tc = req.transactionCount;
|
||||||
|
|
||||||
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
|
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
|
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1182,6 +1181,7 @@ ACTOR static Future<Void> transactionStarter(
|
||||||
|
|
||||||
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
|
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
|
||||||
transactionQueue.pop();
|
transactionQueue.pop();
|
||||||
|
requestsToStart++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!transactionQueue.empty())
|
if (!transactionQueue.empty())
|
||||||
|
|
Loading…
Reference in New Issue