Add cache generations to create time boundaries when commits time out or fail
This commit is contained in:
parent
2b0ade5250
commit
1fb537cde5
|
@ -435,11 +435,13 @@ public:
|
|||
NotifiedDouble lastTimedGrv;
|
||||
Version cachedRv;
|
||||
void updateCachedRV(double t, Version v);
|
||||
void invalidateCache();
|
||||
double lastTimedRkThrottle;
|
||||
// Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether
|
||||
// Because our checks for ratekeeper throttling requires communication with the proxies,
|
||||
// we want to track the last time in order to periodically contact the proxy to check for throttling
|
||||
double lastProxyRequest;
|
||||
Version rvCacheGeneration;
|
||||
|
||||
int snapshotRywEnabled;
|
||||
|
||||
|
|
|
@ -214,6 +214,11 @@ void DatabaseContext::updateCachedRV(double t, Version v) {
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::invalidateCache() {
|
||||
cachedRv = 0;
|
||||
lastTimedGrv = 0.0;
|
||||
}
|
||||
|
||||
Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx,
|
||||
StorageServerInterface const& ssi,
|
||||
LocalityData const& locality) {
|
||||
|
@ -1291,9 +1296,9 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||
bytesPerCommit(1000), outstandingWatches(0), lastTimedGrv(0.0), cachedRv(0), lastTimedRkThrottle(0.0),
|
||||
lastProxyRequest(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
|
||||
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
lastProxyRequest(0.0), rvCacheGeneration(0), transactionTracingSample(false), taskID(taskID),
|
||||
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
|
||||
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)) {
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
|
@ -5328,12 +5333,17 @@ ACTOR static Future<Void> tryCommit(Database cx,
|
|||
state double grvTime = now();
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {
|
||||
cx->rvCacheGeneration++;
|
||||
reply.cancel();
|
||||
throw request_maybe_delivered();
|
||||
}
|
||||
when(CommitID ci = wait(reply)) {
|
||||
Version v = ci.version;
|
||||
cx->updateCachedRV(grvTime, v);
|
||||
if (cx->rvCacheGeneration != tr->getRvGeneration()) {
|
||||
cx->invalidateCache();
|
||||
} else {
|
||||
cx->updateCachedRV(grvTime, v);
|
||||
}
|
||||
if (v != invalidVersion) {
|
||||
if (CLIENT_BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
|
@ -5372,6 +5382,7 @@ ACTOR static Future<Void> tryCommit(Database cx,
|
|||
return Void();
|
||||
} else {
|
||||
// clear the RYW transaction which contains previous conflicting keys
|
||||
cx->rvCacheGeneration++;
|
||||
tr->info.conflictingKeys.reset();
|
||||
if (ci.conflictingKRIndices.present()) {
|
||||
tr->info.conflictingKeys =
|
||||
|
@ -5400,6 +5411,7 @@ ACTOR static Future<Void> tryCommit(Database cx,
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
cx->rvCacheGeneration++;
|
||||
if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
|
||||
// We don't know if the commit happened, and it might even still be in flight.
|
||||
|
||||
|
@ -6042,6 +6054,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
|||
cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr());
|
||||
}
|
||||
readVersion = getDBCachedReadVersion(getDatabase().getPtr(), now());
|
||||
rvGeneration = cx->rvCacheGeneration;
|
||||
return readVersion;
|
||||
}
|
||||
if (CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && cx->grvUpdateHandler.isValid()) {
|
||||
|
|
|
@ -430,6 +430,7 @@ public:
|
|||
|
||||
void setTransactionID(uint64_t id);
|
||||
void setToken(uint64_t token);
|
||||
Version getRvGeneration() { return rvGeneration; }
|
||||
|
||||
const std::vector<Future<std::pair<Key, Key>>>& getExtraReadConflictRanges() const { return extraConflictRanges; }
|
||||
Standalone<VectorRef<KeyRangeRef>> readConflictRanges() const {
|
||||
|
@ -445,6 +446,7 @@ private:
|
|||
|
||||
double backoff;
|
||||
Version committedVersion{ invalidVersion };
|
||||
Version rvGeneration;
|
||||
CommitTransactionRequest tr;
|
||||
Future<Version> readVersion;
|
||||
Promise<Optional<Value>> metadataVersion;
|
||||
|
|
Loading…
Reference in New Issue