From 1ce2457f7eafe398aaf53cadba7dafbc3aa9f4f0 Mon Sep 17 00:00:00 2001 From: Jon Fu Date: Tue, 17 Aug 2021 15:12:02 -0400 Subject: [PATCH 001/210] initial commit --- fdbclient/DatabaseContext.h | 6 ++++ fdbclient/Knobs.cpp | 2 ++ fdbclient/Knobs.h | 1 + fdbclient/NativeAPI.actor.cpp | 45 ++++++++++++++++++++++++++++ fdbclient/NativeAPI.actor.h | 1 + fdbclient/ReadYourWrites.actor.cpp | 3 ++ fdbclient/ReadYourWrites.h | 1 + fdbclient/vexillographer/fdb.options | 2 ++ 8 files changed, 61 insertions(+) diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 52c1945f8e..d06e0a2cda 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -276,6 +276,7 @@ public: Future monitorTssInfoChange; Future tssMismatchHandler; PromiseStream tssMismatchStream; + Future grvUpdateHandler; Reference commitProxies; Reference grvProxies; bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time. @@ -328,6 +329,11 @@ public: // map from tssid -> metrics for that tss pair std::unordered_map> tssMetrics; + // Database-level read version cache storing the most recent successful GRV + Future cachedReadVersion; + double lastGrvUpdateTime; + void updateCachedRV(Future v); + UID dbId; bool internal; // Only contexts created through the C client and fdbcli are non-internal diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 462c6f733c..9b058392d8 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -114,6 +114,8 @@ void ClientKnobs::initialize(bool randomize) { init( CORE_VERSIONSPERSECOND, 1e6 ); init( LOG_RANGE_BLOCK_SIZE, 1e6 ); //Dependent on CORE_VERSIONSPERSECOND init( MUTATION_BLOCK_SIZE, 10000 ); + init( MAX_VERSION_CACHE_LAG, 10.0 ); // If subtracting time from now(), what unit is being measured? + // TaskBucket init( TASKBUCKET_LOGGING_DELAY, 5.0 ); diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index f01624d7fe..ea1d3e41bc 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -110,6 +110,7 @@ public: int64_t CORE_VERSIONSPERSECOND; // This is defined within the server but used for knobs based on server value int LOG_RANGE_BLOCK_SIZE; int MUTATION_BLOCK_SIZE; + double MAX_VERSION_CACHE_LAG; // Taskbucket double TASKBUCKET_LOGGING_DELAY; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 3f6ed7530b..3d11310fc9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -53,6 +53,7 @@ #include "fdbrpc/LoadBalance.h" #include "fdbrpc/Net2FileSystem.h" #include "fdbrpc/simulator.h" +#include "fdbserver/Knobs.h" #include "flow/Arena.h" #include "flow/ActorCollection.h" #include "flow/DeterministicRandom.h" @@ -168,6 +169,11 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) { } } +void DatabaseContext::updateCachedRV(Future v) { + lastGrvUpdateTime = now(); + cachedReadVersion = v; +} + Reference StorageServerInfo::getInterface(DatabaseContext* cx, StorageServerInterface const& ssi, LocalityData const& locality) { @@ -879,6 +885,28 @@ ACTOR static Future handleTssMismatches(DatabaseContext* cx) { } } +ACTOR static Future backgroundGrvUpdater(DatabaseContext* cx) { + cx->lastGrvUpdateTime = 0.0; + state Transaction tr; + loop { + wait(refreshTransaction(cx, &tr)); + auto curTime = now(); + if (curTime - cx->lastGrvUpdateTime > CLIENT_KNOBS->MAX_VERSION_CACHE_LAG) { + try { + // Is this what the method should be? Maybe skip transaction + // and go straight to proxy like getRawVersion() + Version v = wait(tr.getReadVersion()); + cx->updateCachedRV(v); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + throw; + } + wait(tr.onError(e)); + } + } + } +} + ACTOR static Future getHealthMetricsActor(DatabaseContext* cx, bool detailed) { if (now() - cx->healthMetricsLastUpdated < CLIENT_KNOBS->AGGREGATE_HEALTH_METRICS_MAX_STALENESS) { if (detailed) { @@ -1127,6 +1155,7 @@ DatabaseContext::DatabaseContext(Referencesecond->notifyContextDestroyed(); ASSERT_ABORT(server_interf.empty()); @@ -3584,6 +3614,7 @@ void Transaction::setVersion(Version v) { if (v <= 0) throw version_invalid(); readVersion = v; + cx->updateCachedRV(v); } Future> Transaction::get(const Key& key, bool snapshot) { @@ -4059,6 +4090,7 @@ void TransactionOptions::clear() { readTags = TagSet{}; priority = TransactionPriority::DEFAULT; expensiveClearCostEstimation = false; + useGrvCache = false; } TransactionOptions::TransactionOptions() { @@ -4596,6 +4628,8 @@ Future Transaction::commitMutations() { Future commitResult = tryCommit(cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options); + // Record committed version if successful (failed commit would go to catch block instead(?)) + cx->updateCachedRV(this->committedVersion); if (isCheckingWrites) { Promise committed; checkWrites(cx, commitResult, committed, tr, this); @@ -4823,6 +4857,12 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional extractReadVersion(Location location, Future Transaction::getReadVersion(uint32_t flags) { if (!readVersion.isValid()) { + if (options.useGrvCache && cx->cachedReadVersion.isValid() && cx->cachedReadVersion.isReady() && + !cx->cachedReadVersion.isError()) { + return cx->cachedReadVersion; + } ++cx->transactionReadVersions; flags |= options.getReadVersionFlags; switch (options.priority) { @@ -5105,6 +5149,7 @@ Future Transaction::getReadVersion(uint32_t flags) { startTime, metadataVersion, options.tags); + cx->updateCachedRV(readVersion); } return readVersion; } diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 2955a792dd..849d937c62 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -148,6 +148,7 @@ struct TransactionOptions { bool includePort : 1; bool reportConflictingKeys : 1; bool expensiveClearCostEstimation : 1; + bool useGrvCache : 1; TransactionPriority priority; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 6c7b77ff65..e2ededfd5f 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -2239,6 +2239,9 @@ void ReadYourWritesTransaction::setOptionImpl(FDBTransactionOptions::Option opti validateOptionValue(value, false); options.bypassUnreadable = true; break; + case FDBTransactionOptions::USE_GRV_CACHE: + validateOptionValue(value, false); + options.useGrvCache = true; default: break; } diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 977dab1f24..41db431de4 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -43,6 +43,7 @@ struct ReadYourWritesTransactionOptions { int maxRetries; int snapshotRywEnabled; bool bypassUnreadable : 1; + bool useGrvCache : 1; ReadYourWritesTransactionOptions() {} explicit ReadYourWritesTransactionOptions(Transaction const& tr); diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index 2d3a5b57ce..faa05066ff 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -289,6 +289,8 @@ description is not currently required but encouraged. description="Asks storage servers for how many bytes a clear key range contains. Otherwise uses the location cache to roughly estimate this." />