From 1fb537cde5fc0951016233c301b0f7cf1b30b994 Mon Sep 17 00:00:00 2001 From: Jon Fu Date: Mon, 29 Nov 2021 14:16:49 -0500 Subject: [PATCH] Add cache generations to create time boundaries when commits time out or fail --- fdbclient/DatabaseContext.h | 2 ++ fdbclient/NativeAPI.actor.cpp | 21 +++++++++++++++++---- fdbclient/NativeAPI.actor.h | 2 ++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index fd9aa37f4a..cc5d336ac1 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -435,11 +435,13 @@ public: NotifiedDouble lastTimedGrv; Version cachedRv; void updateCachedRV(double t, Version v); + void invalidateCache(); double lastTimedRkThrottle; // Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether // Because our checks for ratekeeper throttling requires communication with the proxies, // we want to track the last time in order to periodically contact the proxy to check for throttling double lastProxyRequest; + Version rvCacheGeneration; int snapshotRywEnabled; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 2df4caa810..49470c341c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -214,6 +214,11 @@ void DatabaseContext::updateCachedRV(double t, Version v) { } } +void DatabaseContext::invalidateCache() { + cachedRv = 0; + lastTimedGrv = 0.0; +} + Reference StorageServerInfo::getInterface(DatabaseContext* cx, StorageServerInterface const& ssi, LocalityData const& locality) { @@ -1291,9 +1296,9 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), specialKeySpace(std::make_unique(specialKeys.begin, specialKeys.end, /* test */ false)) { dbId = deterministicRandom()->randomUniqueID(); @@ -5328,12 +5333,17 @@ ACTOR static Future tryCommit(Database cx, state double grvTime = now(); choose { when(wait(cx->onProxiesChanged())) { + cx->rvCacheGeneration++; reply.cancel(); throw request_maybe_delivered(); } when(CommitID ci = wait(reply)) { Version v = ci.version; - cx->updateCachedRV(grvTime, v); + if (cx->rvCacheGeneration != tr->getRvGeneration()) { + cx->invalidateCache(); + } else { + cx->updateCachedRV(grvTime, v); + } if (v != invalidVersion) { if (CLIENT_BUGGIFY) { throw commit_unknown_result(); @@ -5372,6 +5382,7 @@ ACTOR static Future tryCommit(Database cx, return Void(); } else { // clear the RYW transaction which contains previous conflicting keys + cx->rvCacheGeneration++; tr->info.conflictingKeys.reset(); if (ci.conflictingKRIndices.present()) { tr->info.conflictingKeys = @@ -5400,6 +5411,7 @@ ACTOR static Future tryCommit(Database cx, } } } catch (Error& e) { + cx->rvCacheGeneration++; if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) { // We don't know if the commit happened, and it might even still be in flight. @@ -6042,6 +6054,7 @@ Future Transaction::getReadVersion(uint32_t flags) { cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr()); } readVersion = getDBCachedReadVersion(getDatabase().getPtr(), now()); + rvGeneration = cx->rvCacheGeneration; return readVersion; } if (CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && cx->grvUpdateHandler.isValid()) { diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index a36ceb4c57..7c2a799631 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -430,6 +430,7 @@ public: void setTransactionID(uint64_t id); void setToken(uint64_t token); + Version getRvGeneration() { return rvGeneration; } const std::vector>>& getExtraReadConflictRanges() const { return extraConflictRanges; } Standalone> readConflictRanges() const { @@ -445,6 +446,7 @@ private: double backoff; Version committedVersion{ invalidVersion }; + Version rvGeneration; CommitTransactionRequest tr; Future readVersion; Promise> metadataVersion;