add extra tracking in dbcontext to periodically contact GRV proxies
This commit is contained in:
parent
8714480370
commit
1e2060a52b
|
@ -114,6 +114,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( LOG_RANGE_BLOCK_SIZE, CORE_VERSIONSPERSECOND );
|
||||
init( MUTATION_BLOCK_SIZE, 10000);
|
||||
init( MAX_VERSION_CACHE_LAG, 0.1 );
|
||||
init( MAX_PROXY_CONTACT_LAG, 1.0 );
|
||||
init( DEBUG_USE_GRV_CACHE_CHANCE, -0.1 ); // Since we check <= for 100% chance at 1.0, we don't want the default to be 0. This also means 0 is not 0%.
|
||||
init( GRV_CACHE_RK_COOLDOWN, 5.0 );
|
||||
init( GRV_SUSTAINED_THROTTLING_THRESHOLD, 0.01 );
|
||||
|
|
|
@ -116,11 +116,12 @@ public:
|
|||
int64_t CORE_VERSIONSPERSECOND; // This is defined within the server but used for knobs based on server value
|
||||
int LOG_RANGE_BLOCK_SIZE;
|
||||
int MUTATION_BLOCK_SIZE;
|
||||
double MAX_VERSION_CACHE_LAG;
|
||||
double MAX_VERSION_CACHE_LAG; // The upper bound in seconds for OK amount of staleness when using a cached RV
|
||||
double MAX_PROXY_CONTACT_LAG; // The upper bound in seconds for how often we want a response from the GRV proxies
|
||||
double DEBUG_USE_GRV_CACHE_CHANCE; // Debug setting to change the chance for a regular GRV request to use the cache
|
||||
double GRV_CACHE_RK_COOLDOWN; // Required number of seconds to pass after throttling to re-allow cache use
|
||||
double GRV_SUSTAINED_THROTTLING_THRESHOLD; // Adjust what amount of time is considered "sustained" throttling on
|
||||
// proxy for a GRV
|
||||
// proxy for a GRV which leads to disabling the cache
|
||||
|
||||
// Taskbucket
|
||||
double TASKBUCKET_LOGGING_DELAY;
|
||||
|
|
|
@ -390,6 +390,10 @@ public:
|
|||
Version cachedRv;
|
||||
void updateCachedRV(double t, Version v);
|
||||
double lastTimedRkThrottle;
|
||||
// Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether
|
||||
// Because our checks for ratekeeper throttling requires communication with the proxies,
|
||||
// we want to track the last time in order to periodically contact the proxy to check for throttling
|
||||
double lastProxyRequest;
|
||||
|
||||
int snapshotRywEnabled;
|
||||
|
||||
|
|
|
@ -948,6 +948,7 @@ ACTOR static Future<Void> backgroundGrvUpdater(DatabaseContext* cx) {
|
|||
wait(refreshTransaction(cx, &tr));
|
||||
state double curTime = now();
|
||||
state double lastTime = cx->lastTimedGrv.get();
|
||||
state double lastProxyTime = cx->lastProxyRequest;
|
||||
TraceEvent("BackgroundGrvUpdaterBefore")
|
||||
.detail("CurTime", curTime)
|
||||
.detail("LastTime", lastTime)
|
||||
|
@ -956,10 +957,12 @@ ACTOR static Future<Void> backgroundGrvUpdater(DatabaseContext* cx) {
|
|||
.detail("CachedTime", cx->lastTimedGrv.get())
|
||||
.detail("Gap", curTime - lastTime)
|
||||
.detail("Bound", CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay);
|
||||
if (curTime - lastTime >= (CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay)) {
|
||||
if (curTime - lastTime >= (CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) ||
|
||||
curTime - lastProxyTime > CLIENT_KNOBS->MAX_PROXY_CONTACT_LAG) {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::SKIP_GRV_CACHE);
|
||||
wait(success(tr.getReadVersion()));
|
||||
cx->lastProxyRequest = curTime;
|
||||
grvDelay = (grvDelay + (now() - curTime)) / 2.0;
|
||||
TraceEvent("BackgroundGrvUpdaterSuccess")
|
||||
.detail("GrvDelay", grvDelay)
|
||||
|
@ -1205,9 +1208,9 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), outstandingWatches(0),
|
||||
lastTimedGrv(0.0), cachedRv(0), lastTimedRkThrottle(0.0), transactionTracingEnabled(true), taskID(taskID),
|
||||
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
|
||||
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
lastTimedGrv(0.0), cachedRv(0), lastTimedRkThrottle(0.0), lastProxyRequest(0.0), transactionTracingEnabled(true),
|
||||
taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator),
|
||||
apiVersion(apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)) {
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
|
@ -5731,6 +5734,7 @@ ACTOR Future<Version> extractReadVersion(Location location,
|
|||
state Span span(spanContext, location, { parent });
|
||||
GetReadVersionReply rep = wait(f);
|
||||
double latency = now() - startTime;
|
||||
cx->lastProxyRequest = startTime;
|
||||
cx->updateCachedRV(startTime, rep.version);
|
||||
// use startTime instead?
|
||||
// maybe this also requires tracking number of loops processed in queue?
|
||||
|
|
Loading…
Reference in New Issue