addressed code review comments: renamed variables, small functional changes, style changes
This commit is contained in:
parent
7d17e00003
commit
458e708272
|
@ -199,7 +199,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
Optional<Value> metadataVersion;
|
||||
int64_t midShardSize = 0;
|
||||
uint32_t queueIterations = 0;
|
||||
bool rkThrottled;
|
||||
bool rkThrottled = false;
|
||||
|
||||
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
|
||||
|
||||
|
|
|
@ -470,16 +470,16 @@ public:
|
|||
|
||||
// GRV Cache
|
||||
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
|
||||
double lastTimedGrv;
|
||||
Version cachedRv;
|
||||
void updateCachedRV(double t, Version v);
|
||||
Version getCachedRV();
|
||||
double getLastTimedGRV();
|
||||
double lastTimedRkThrottle;
|
||||
double lastGrvTime;
|
||||
Version cachedReadVersion;
|
||||
void updateCachedReadVersion(double t, Version v);
|
||||
Version getCachedReadVersion();
|
||||
double getLastGrvTime();
|
||||
double lastRkThrottleTime;
|
||||
// Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether
|
||||
// Because our checks for ratekeeper throttling requires communication with the proxies,
|
||||
// we want to track the last time in order to periodically contact the proxy to check for throttling
|
||||
double lastProxyRequest;
|
||||
double lastProxyRequestTime;
|
||||
|
||||
int snapshotRywEnabled;
|
||||
|
||||
|
|
|
@ -206,30 +206,30 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::updateCachedRV(double t, Version v) {
|
||||
if (v >= cachedRv) {
|
||||
TraceEvent("CheckpointCacheUpdate")
|
||||
void DatabaseContext::updateCachedReadVersion(double t, Version v) {
|
||||
if (v >= cachedReadVersion) {
|
||||
TraceEvent(SevDebug, "CachedReadVersionUpdate")
|
||||
.detail("Version", v)
|
||||
.detail("CurTime", t)
|
||||
.detail("LastVersion", cachedRv)
|
||||
.detail("LastTime", lastTimedGrv);
|
||||
cachedRv = v;
|
||||
.detail("GrvStartTime", t)
|
||||
.detail("LastVersion", cachedReadVersion)
|
||||
.detail("LastTime", lastGrvTime);
|
||||
cachedReadVersion = v;
|
||||
// Since the time is based on the start of the request, it's possible that we
|
||||
// get a newer version with an older time.
|
||||
// (Request started earlier, but was latest to reach the proxy)
|
||||
// Only update time when strictly increasing (?)
|
||||
if (t > lastTimedGrv) {
|
||||
lastTimedGrv = t;
|
||||
if (t > lastGrvTime) {
|
||||
lastGrvTime = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version DatabaseContext::getCachedRV() {
|
||||
return cachedRv;
|
||||
Version DatabaseContext::getCachedReadVersion() {
|
||||
return cachedReadVersion;
|
||||
}
|
||||
|
||||
double DatabaseContext::getLastTimedGRV() {
|
||||
return lastTimedGrv;
|
||||
double DatabaseContext::getLastGrvTime() {
|
||||
return lastGrvTime;
|
||||
}
|
||||
|
||||
Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx,
|
||||
|
@ -1036,16 +1036,18 @@ ACTOR static Future<Void> backgroundGrvUpdater(DatabaseContext* cx) {
|
|||
state double grvDelay = 0.001;
|
||||
try {
|
||||
loop {
|
||||
if (CLIENT_KNOBS->FORCE_GRV_CACHE_OFF)
|
||||
return Void();
|
||||
wait(refreshTransaction(cx, &tr));
|
||||
state double curTime = now();
|
||||
state double lastTime = cx->getLastTimedGRV();
|
||||
state double lastProxyTime = cx->lastProxyRequest;
|
||||
state double lastTime = cx->getLastGrvTime();
|
||||
state double lastProxyTime = cx->lastProxyRequestTime;
|
||||
TraceEvent(SevDebug, "BackgroundGrvUpdaterBefore")
|
||||
.detail("CurTime", curTime)
|
||||
.detail("LastTime", lastTime)
|
||||
.detail("GrvDelay", grvDelay)
|
||||
.detail("CachedRv", cx->getCachedRV())
|
||||
.detail("CachedTime", cx->getLastTimedGRV())
|
||||
.detail("CachedReadVersion", cx->getCachedReadVersion())
|
||||
.detail("CachedTime", cx->getLastGrvTime())
|
||||
.detail("Gap", curTime - lastTime)
|
||||
.detail("Bound", CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay);
|
||||
if (curTime - lastTime >= (CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) ||
|
||||
|
@ -1053,18 +1055,19 @@ ACTOR static Future<Void> backgroundGrvUpdater(DatabaseContext* cx) {
|
|||
try {
|
||||
tr.setOption(FDBTransactionOptions::SKIP_GRV_CACHE);
|
||||
wait(success(tr.getReadVersion()));
|
||||
cx->lastProxyRequest = curTime;
|
||||
cx->lastProxyRequestTime = curTime;
|
||||
grvDelay = (grvDelay + (now() - curTime)) / 2.0;
|
||||
TraceEvent(SevDebug, "BackgroundGrvUpdaterSuccess")
|
||||
.detail("GrvDelay", grvDelay)
|
||||
.detail("CachedRv", cx->getCachedRV())
|
||||
.detail("CachedTime", cx->getLastTimedGRV());
|
||||
.detail("CachedReadVersion", cx->getCachedReadVersion())
|
||||
.detail("CachedTime", cx->getLastGrvTime());
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevInfo, "BackgroundGrvUpdaterTxnError").error(e, true);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
} else {
|
||||
wait(delay(0.001 + (CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) - (curTime - lastTime)));
|
||||
wait(delay(0.001 + std::min(CLIENT_KNOBS->MAX_PROXY_CONTACT_LAG - (curTime - lastProxyTime),
|
||||
(CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) - (curTime - lastTime))));
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -1301,8 +1304,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||
bytesPerCommit(1000), outstandingWatches(0), lastTimedGrv(0.0), cachedRv(0), lastTimedRkThrottle(0.0),
|
||||
lastProxyRequest(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkThrottleTime(0.0),
|
||||
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
|
@ -5352,7 +5355,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
if (CLIENT_BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
trState->cx->updateCachedRV(grvTime, v);
|
||||
trState->cx->updateCachedReadVersion(grvTime, v);
|
||||
if (debugID.present())
|
||||
TraceEvent(interval.end()).detail("CommittedVersion", v);
|
||||
trState->committedVersion = v;
|
||||
|
@ -5959,10 +5962,10 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
|
|||
state Span span(spanContext, location, { trState->spanID });
|
||||
GetReadVersionReply rep = wait(f);
|
||||
double latency = now() - trState->startTime;
|
||||
trState->cx->lastProxyRequest = trState->startTime;
|
||||
trState->cx->updateCachedRV(trState->startTime, rep.version);
|
||||
trState->cx->lastProxyRequestTime = trState->startTime;
|
||||
trState->cx->updateCachedReadVersion(trState->startTime, rep.version);
|
||||
if (rep.rkThrottled && trState->options.priority != TransactionPriority::IMMEDIATE) {
|
||||
trState->cx->lastTimedRkThrottle = now();
|
||||
trState->cx->lastRkThrottleTime = now();
|
||||
}
|
||||
trState->cx->GRVLatencies.addSample(latency);
|
||||
if (trState->trLogInfo)
|
||||
|
@ -6021,10 +6024,10 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
|
|||
}
|
||||
|
||||
bool rkThrottlingCooledDown(DatabaseContext* cx) {
|
||||
if (cx->lastTimedRkThrottle == 0.0) {
|
||||
if (cx->lastRkThrottleTime == 0.0) {
|
||||
return true;
|
||||
}
|
||||
return (now() - cx->lastTimedRkThrottle > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
|
||||
return (now() - cx->lastRkThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
|
||||
}
|
||||
|
||||
Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
||||
|
@ -6037,8 +6040,8 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
if (!trState->cx->grvUpdateHandler.isValid()) {
|
||||
trState->cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr());
|
||||
}
|
||||
Version rv = trState->cx->getCachedRV();
|
||||
double lastTime = trState->cx->getLastTimedGRV();
|
||||
Version rv = trState->cx->getCachedReadVersion();
|
||||
double lastTime = trState->cx->getLastGrvTime();
|
||||
double requestTime = now();
|
||||
if (requestTime - lastTime <= CLIENT_KNOBS->MAX_VERSION_CACHE_LAG && rv != Version(0)) {
|
||||
ASSERT(!debug_checkVersionTime(rv, requestTime, "CheckStaleness"));
|
||||
|
@ -6046,9 +6049,6 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
return readVersion;
|
||||
} // else go through regular GRV path
|
||||
}
|
||||
if (CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && trState->cx->grvUpdateHandler.isValid()) {
|
||||
trState->cx->grvUpdateHandler.cancel();
|
||||
}
|
||||
++trState->cx->transactionReadVersions;
|
||||
flags |= trState->options.getReadVersionFlags;
|
||||
switch (trState->options.priority) {
|
||||
|
|
|
@ -2410,12 +2410,6 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti
|
|||
validateOptionValueNotPresent(value);
|
||||
options.bypassUnreadable = true;
|
||||
break;
|
||||
case FDBTransactionOptions::USE_GRV_CACHE:
|
||||
validateOptionValueNotPresent(value);
|
||||
options.useGrvCache = true;
|
||||
case FDBTransactionOptions::SKIP_GRV_CACHE:
|
||||
validateOptionValueNotPresent(value);
|
||||
options.skipGrvCache = true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -293,9 +293,9 @@ description is not currently required but encouraged.
|
|||
<Option name="bypass_unreadable" code="1100"
|
||||
description="Allows ``get`` operations to read from sections of keyspace that have become unreadable because of versionstamp operations. These reads will view versionstamp operations as if they were set operations that did not fill in the versionstamp." />
|
||||
<Option name="use_grv_cache" code="1101"
|
||||
description="Allows this transaction to use cached GRV from the database context." />
|
||||
description="Allows this transaction to use cached GRV from the database context. Defaults to off. Upon first usage, starts a background updater to periodically update the cache to avoid stale read versions." />
|
||||
<Option name="skip_grv_cache" code="1102"
|
||||
description="Specifically instruct this transaction to NOT use cached GRV." />
|
||||
description="Specifically instruct this transaction to NOT use cached GRV. Primarily used for the read version cache's background updater to avoid attempting to read a cached entry in specific situations." />
|
||||
</Scope>
|
||||
|
||||
<!-- The enumeration values matter - do not change them without
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
|
||||
// used for simulation validations
|
||||
static std::map<std::string, int64_t> validationData;
|
||||
static std::map<int64_t, double> validationData2;
|
||||
static std::map<int64_t, double> timedVersionsValidationData;
|
||||
static std::set<UID> disabledMachines;
|
||||
|
||||
void debug_setVersionCheckEnabled(UID uid, bool enabled) {
|
||||
|
@ -131,23 +131,23 @@ void debug_setCheckRelocationDuration(bool check) {
|
|||
void debug_advanceVersionTimestamp(int64_t version, double t) {
|
||||
if (!g_network->isSimulated() || g_simulator.extraDB)
|
||||
return;
|
||||
validationData2[version] = t;
|
||||
timedVersionsValidationData[version] = t;
|
||||
}
|
||||
|
||||
bool debug_checkVersionTime(int64_t version, double t, std::string context, Severity sev) {
|
||||
if (!g_network->isSimulated() || g_simulator.extraDB)
|
||||
return false;
|
||||
if (!validationData2.count(version)) {
|
||||
if (!timedVersionsValidationData.count(version)) {
|
||||
TraceEvent(SevWarn, (context + "UnknownTime").c_str())
|
||||
.detail("VersionChecking", version)
|
||||
.detail("TimeChecking", t);
|
||||
return false;
|
||||
}
|
||||
if (t > validationData2[version]) {
|
||||
TraceEvent(sev, (context + "DurabilityError").c_str())
|
||||
if (t > timedVersionsValidationData[version]) {
|
||||
TraceEvent(sev, (context + "VersionTimeError").c_str())
|
||||
.detail("VersionChecking", version)
|
||||
.detail("TimeChecking", t)
|
||||
.detail("MaxTime", validationData2[version]);
|
||||
.detail("MaxTime", timedVersionsValidationData[version]);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -674,7 +674,11 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
|
|||
stats->throttleStartTime = now();
|
||||
}
|
||||
} else {
|
||||
stats->lastTxnThrottled = false;
|
||||
// If an immediate priority txn comes in, it may get processed on the first time through and
|
||||
// reset our throttling state prematurely.
|
||||
if (request.priority != TransactionPriority::IMMEDIATE) {
|
||||
stats->lastTxnThrottled = false;
|
||||
}
|
||||
}
|
||||
request.reply.send(reply);
|
||||
++stats->txnRequestOut;
|
||||
|
|
|
@ -24,6 +24,15 @@
|
|||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
/*
|
||||
* This workload is modelled off the Sideband workload, except it uses a single
|
||||
* mutator and checker rather than several. In addition to ordinary consistency checks,
|
||||
* it also checks the consistency of the cached read versions when using the
|
||||
* USE_GRV_CACHE transaction option, specifically when commit_unknown_result
|
||||
* produces a maybe/maybe-not written scenario. It makes sure that a cached read of an
|
||||
* unknown result matches the regular read of that same key and is not too stale.
|
||||
*/
|
||||
|
||||
struct SidebandMessage {
|
||||
constexpr static FileIdentifier file_identifier = 11862047;
|
||||
uint64_t key;
|
||||
|
@ -168,7 +177,6 @@ struct SidebandSingleWorkload : TestWorkload {
|
|||
// second set, the checker should see this, no retries on unknown result
|
||||
loop {
|
||||
try {
|
||||
// tr.setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY);
|
||||
tr.set(messageKey, LiteralStringRef("deadbeef"));
|
||||
wait(tr.commit());
|
||||
commitVersion = tr.getCommittedVersion();
|
||||
|
|
Loading…
Reference in New Issue