Merge pull request #5725 from sfc-gh-jfu/jfu-grv-cache
Add transaction option for clients to use cached read versions
This commit is contained in:
commit
cdebda35ab
|
@ -598,6 +598,11 @@ func (o TransactionOptions) SetBypassUnreadable() error {
|
|||
return o.setOpt(1100, nil)
|
||||
}
|
||||
|
||||
// Allows this transaction to use cached GRV from the database context. Defaults to off. Upon first usage, starts a background updater to periodically update the cache to avoid stale read versions.
|
||||
func (o TransactionOptions) SetUseGrvCache() error {
|
||||
return o.setOpt(1101, nil)
|
||||
}
|
||||
|
||||
type StreamingMode int
|
||||
|
||||
const (
|
||||
|
|
|
@ -119,6 +119,12 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( CORE_VERSIONSPERSECOND, 1e6 );
|
||||
init( LOG_RANGE_BLOCK_SIZE, CORE_VERSIONSPERSECOND );
|
||||
init( MUTATION_BLOCK_SIZE, 10000);
|
||||
init( MAX_VERSION_CACHE_LAG, 0.1 );
|
||||
init( MAX_PROXY_CONTACT_LAG, 0.2 );
|
||||
init( DEBUG_USE_GRV_CACHE_CHANCE, -1.0 ); // For 100% chance at 1.0, this means 0.0 is not 0%. We don't want the default to be 0.
|
||||
init( FORCE_GRV_CACHE_OFF, false );
|
||||
init( GRV_CACHE_RK_COOLDOWN, 60.0 );
|
||||
init( GRV_SUSTAINED_THROTTLING_THRESHOLD, 0.1 );
|
||||
|
||||
// TaskBucket
|
||||
init( TASKBUCKET_LOGGING_DELAY, 5.0 );
|
||||
|
|
|
@ -122,6 +122,13 @@ public:
|
|||
int64_t CORE_VERSIONSPERSECOND; // This is defined within the server but used for knobs based on server value
|
||||
int LOG_RANGE_BLOCK_SIZE;
|
||||
int MUTATION_BLOCK_SIZE;
|
||||
double MAX_VERSION_CACHE_LAG; // The upper bound in seconds for OK amount of staleness when using a cached RV
|
||||
double MAX_PROXY_CONTACT_LAG; // The upper bound in seconds for how often we want a response from the GRV proxies
|
||||
double DEBUG_USE_GRV_CACHE_CHANCE; // Debug setting to change the chance for a regular GRV request to use the cache
|
||||
bool FORCE_GRV_CACHE_OFF; // Panic button to turn off cache. Holds priority over other options.
|
||||
double GRV_CACHE_RK_COOLDOWN; // Required number of seconds to pass after throttling to re-allow cache use
|
||||
double GRV_SUSTAINED_THROTTLING_THRESHOLD; // If ALL GRV requests have been throttled in the last number of seconds
|
||||
// specified here, ratekeeper is throttling and not a false positive
|
||||
|
||||
// Taskbucket
|
||||
double TASKBUCKET_LOGGING_DELAY;
|
||||
|
|
|
@ -195,6 +195,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
int64_t midShardSize = 0;
|
||||
bool rkDefaultThrottled = false;
|
||||
bool rkBatchThrottled = false;
|
||||
|
||||
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
|
||||
|
||||
|
@ -208,7 +210,9 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
|
|||
locked,
|
||||
metadataVersion,
|
||||
tagThrottleInfo,
|
||||
midShardSize);
|
||||
midShardSize,
|
||||
rkDefaultThrottled,
|
||||
rkBatchThrottled);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -363,6 +363,7 @@ public:
|
|||
Future<Void> monitorTssInfoChange;
|
||||
Future<Void> tssMismatchHandler;
|
||||
PromiseStream<std::pair<UID, std::vector<DetailedTSSMismatch>>> tssMismatchStream;
|
||||
Future<Void> grvUpdateHandler;
|
||||
Reference<CommitProxyInfo> commitProxies;
|
||||
Reference<GrvProxyInfo> grvProxies;
|
||||
bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
|
||||
|
@ -479,6 +480,20 @@ public:
|
|||
int outstandingWatches;
|
||||
int maxOutstandingWatches;
|
||||
|
||||
// GRV Cache
|
||||
// Database-level read version cache storing the most recent successful GRV as well as the time it was requested.
|
||||
double lastGrvTime;
|
||||
Version cachedReadVersion;
|
||||
void updateCachedReadVersion(double t, Version v);
|
||||
Version getCachedReadVersion();
|
||||
double getLastGrvTime();
|
||||
double lastRkBatchThrottleTime;
|
||||
double lastRkDefaultThrottleTime;
|
||||
// Cached RVs can be updated through commits, and using cached RVs avoids the proxies altogether
|
||||
// Because our checks for ratekeeper throttling requires communication with the proxies,
|
||||
// we want to track the last time in order to periodically contact the proxy to check for throttling
|
||||
double lastProxyRequestTime;
|
||||
|
||||
int snapshotRywEnabled;
|
||||
|
||||
bool transactionTracingSample;
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
#include "fdbrpc/LoadBalance.h"
|
||||
#include "fdbrpc/Net2FileSystem.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
|
@ -205,6 +206,32 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
|
|||
}
|
||||
}
|
||||
|
||||
void DatabaseContext::updateCachedReadVersion(double t, Version v) {
|
||||
if (v >= cachedReadVersion) {
|
||||
TraceEvent(SevDebug, "CachedReadVersionUpdate")
|
||||
.detail("Version", v)
|
||||
.detail("GrvStartTime", t)
|
||||
.detail("LastVersion", cachedReadVersion)
|
||||
.detail("LastTime", lastGrvTime);
|
||||
cachedReadVersion = v;
|
||||
// Since the time is based on the start of the request, it's possible that we
|
||||
// get a newer version with an older time.
|
||||
// (Request started earlier, but was latest to reach the proxy)
|
||||
// Only update time when strictly increasing (?)
|
||||
if (t > lastGrvTime) {
|
||||
lastGrvTime = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version DatabaseContext::getCachedReadVersion() {
|
||||
return cachedReadVersion;
|
||||
}
|
||||
|
||||
double DatabaseContext::getLastGrvTime() {
|
||||
return lastGrvTime;
|
||||
}
|
||||
|
||||
Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx,
|
||||
StorageServerInterface const& ssi,
|
||||
LocalityData const& locality) {
|
||||
|
@ -1000,6 +1027,53 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> backgroundGrvUpdater(DatabaseContext* cx) {
|
||||
state Transaction tr;
|
||||
state double grvDelay = 0.001;
|
||||
try {
|
||||
loop {
|
||||
if (CLIENT_KNOBS->FORCE_GRV_CACHE_OFF)
|
||||
return Void();
|
||||
wait(refreshTransaction(cx, &tr));
|
||||
state double curTime = now();
|
||||
state double lastTime = cx->getLastGrvTime();
|
||||
state double lastProxyTime = cx->lastProxyRequestTime;
|
||||
TraceEvent(SevDebug, "BackgroundGrvUpdaterBefore")
|
||||
.detail("CurTime", curTime)
|
||||
.detail("LastTime", lastTime)
|
||||
.detail("GrvDelay", grvDelay)
|
||||
.detail("CachedReadVersion", cx->getCachedReadVersion())
|
||||
.detail("CachedTime", cx->getLastGrvTime())
|
||||
.detail("Gap", curTime - lastTime)
|
||||
.detail("Bound", CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay);
|
||||
if (curTime - lastTime >= (CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) ||
|
||||
curTime - lastProxyTime > CLIENT_KNOBS->MAX_PROXY_CONTACT_LAG) {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::SKIP_GRV_CACHE);
|
||||
wait(success(tr.getReadVersion()));
|
||||
cx->lastProxyRequestTime = curTime;
|
||||
grvDelay = (grvDelay + (now() - curTime)) / 2.0;
|
||||
TraceEvent(SevDebug, "BackgroundGrvUpdaterSuccess")
|
||||
.detail("GrvDelay", grvDelay)
|
||||
.detail("CachedReadVersion", cx->getCachedReadVersion())
|
||||
.detail("CachedTime", cx->getLastGrvTime());
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevInfo, "BackgroundGrvUpdaterTxnError").error(e, true);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
} else {
|
||||
wait(
|
||||
delay(std::max(0.001,
|
||||
std::min(CLIENT_KNOBS->MAX_PROXY_CONTACT_LAG - (curTime - lastProxyTime),
|
||||
(CLIENT_KNOBS->MAX_VERSION_CACHE_LAG - grvDelay) - (curTime - lastTime)))));
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevInfo, "BackgroundGrvUpdaterFailed").error(e, true);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext* cx, bool detailed) {
|
||||
if (now() - cx->healthMetricsLastUpdated < CLIENT_KNOBS->AGGREGATE_HEALTH_METRICS_MAX_STALENESS) {
|
||||
if (detailed) {
|
||||
|
@ -1228,7 +1302,8 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
|
||||
bytesPerCommit(1000), outstandingWatches(0), transactionTracingSample(false), taskID(taskID),
|
||||
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0),
|
||||
lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID),
|
||||
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion),
|
||||
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
|
@ -1516,6 +1591,9 @@ DatabaseContext::~DatabaseContext() {
|
|||
clientDBInfoMonitor.cancel();
|
||||
monitorTssInfoChange.cancel();
|
||||
tssMismatchHandler.cancel();
|
||||
if (grvUpdateHandler.isValid()) {
|
||||
grvUpdateHandler.cancel();
|
||||
}
|
||||
for (auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
|
||||
it->second->notifyContextDestroyed();
|
||||
ASSERT_ABORT(server_interf.empty());
|
||||
|
@ -4987,6 +5065,8 @@ void TransactionOptions::clear() {
|
|||
readTags = TagSet{};
|
||||
priority = TransactionPriority::DEFAULT;
|
||||
expensiveClearCostEstimation = false;
|
||||
useGrvCache = false;
|
||||
skipGrvCache = false;
|
||||
}
|
||||
|
||||
TransactionOptions::TransactionOptions() {
|
||||
|
@ -5317,7 +5397,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
TaskPriority::DefaultPromiseEndpoint,
|
||||
AtMostOnce::True);
|
||||
}
|
||||
|
||||
state double grvTime = now();
|
||||
choose {
|
||||
when(wait(trState->cx->onProxiesChanged())) {
|
||||
reply.cancel();
|
||||
|
@ -5329,6 +5409,7 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
|
|||
if (CLIENT_BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
trState->cx->updateCachedReadVersion(grvTime, v);
|
||||
if (debugID.present())
|
||||
TraceEvent(interval.end()).detail("CommittedVersion", v);
|
||||
trState->committedVersion = v;
|
||||
|
@ -5755,6 +5836,18 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional<Strin
|
|||
trState->options.expensiveClearCostEstimation = true;
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::USE_GRV_CACHE:
|
||||
validateOptionValueNotPresent(value);
|
||||
if (trState->numErrors == 0) {
|
||||
trState->options.useGrvCache = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::SKIP_GRV_CACHE:
|
||||
validateOptionValueNotPresent(value);
|
||||
trState->options.skipGrvCache = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -5925,7 +6018,16 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
|
|||
Promise<Optional<Value>> metadataVersion) {
|
||||
state Span span(spanContext, location, { trState->spanID });
|
||||
GetReadVersionReply rep = wait(f);
|
||||
double latency = now() - trState->startTime;
|
||||
double replyTime = now();
|
||||
double latency = replyTime - trState->startTime;
|
||||
trState->cx->lastProxyRequestTime = trState->startTime;
|
||||
trState->cx->updateCachedReadVersion(trState->startTime, rep.version);
|
||||
if (rep.rkBatchThrottled) {
|
||||
trState->cx->lastRkBatchThrottleTime = replyTime;
|
||||
}
|
||||
if (rep.rkDefaultThrottled) {
|
||||
trState->cx->lastRkDefaultThrottleTime = replyTime;
|
||||
}
|
||||
trState->cx->GRVLatencies.addSample(latency);
|
||||
if (trState->trLogInfo)
|
||||
trState->trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V3(
|
||||
|
@ -5982,8 +6084,42 @@ ACTOR Future<Version> extractReadVersion(Reference<TransactionState> trState,
|
|||
return rep.version;
|
||||
}
|
||||
|
||||
bool rkThrottlingCooledDown(DatabaseContext* cx, TransactionPriority priority) {
|
||||
if (priority == TransactionPriority::IMMEDIATE) {
|
||||
return true;
|
||||
} else if (priority == TransactionPriority::BATCH) {
|
||||
if (cx->lastRkBatchThrottleTime == 0.0) {
|
||||
return true;
|
||||
}
|
||||
return (now() - cx->lastRkBatchThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
|
||||
} else if (priority == TransactionPriority::DEFAULT) {
|
||||
if (cx->lastRkDefaultThrottleTime == 0.0) {
|
||||
return true;
|
||||
}
|
||||
return (now() - cx->lastRkDefaultThrottleTime > CLIENT_KNOBS->GRV_CACHE_RK_COOLDOWN);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Future<Version> Transaction::getReadVersion(uint32_t flags) {
|
||||
if (!readVersion.isValid()) {
|
||||
if (!CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && !trState->options.skipGrvCache &&
|
||||
(deterministicRandom()->random01() <= CLIENT_KNOBS->DEBUG_USE_GRV_CACHE_CHANCE ||
|
||||
trState->options.useGrvCache) &&
|
||||
rkThrottlingCooledDown(getDatabase().getPtr(), trState->options.priority)) {
|
||||
// Upon our first request to use cached RVs, start the background updater
|
||||
if (!trState->cx->grvUpdateHandler.isValid()) {
|
||||
trState->cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr());
|
||||
}
|
||||
Version rv = trState->cx->getCachedReadVersion();
|
||||
double lastTime = trState->cx->getLastGrvTime();
|
||||
double requestTime = now();
|
||||
if (requestTime - lastTime <= CLIENT_KNOBS->MAX_VERSION_CACHE_LAG && rv != Version(0)) {
|
||||
ASSERT(!debug_checkVersionTime(rv, requestTime, "CheckStaleness"));
|
||||
readVersion = rv;
|
||||
return readVersion;
|
||||
} // else go through regular GRV path
|
||||
}
|
||||
++trState->cx->transactionReadVersions;
|
||||
flags |= trState->options.getReadVersionFlags;
|
||||
switch (trState->options.priority) {
|
||||
|
@ -6165,6 +6301,9 @@ uint32_t Transaction::getSize() {
|
|||
}
|
||||
|
||||
Future<Void> Transaction::onError(Error const& e) {
|
||||
if (g_network->isSimulated() && ++trState->numErrors % 10 == 0) {
|
||||
TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", trState->numErrors);
|
||||
}
|
||||
if (e.code() == error_code_success) {
|
||||
return client_invalid_operation();
|
||||
}
|
||||
|
@ -6199,9 +6338,6 @@ Future<Void> Transaction::onError(Error const& e) {
|
|||
return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), trState->taskID);
|
||||
}
|
||||
|
||||
if (g_network->isSimulated() && ++trState->numErrors % 10 == 0)
|
||||
TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", trState->numErrors);
|
||||
|
||||
return e;
|
||||
}
|
||||
ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRange keys);
|
||||
|
|
|
@ -157,6 +157,8 @@ struct TransactionOptions {
|
|||
bool includePort : 1;
|
||||
bool reportConflictingKeys : 1;
|
||||
bool expensiveClearCostEstimation : 1;
|
||||
bool useGrvCache : 1;
|
||||
bool skipGrvCache : 1;
|
||||
|
||||
TransactionPriority priority;
|
||||
|
||||
|
|
|
@ -292,6 +292,11 @@ description is not currently required but encouraged.
|
|||
description="Asks storage servers for how many bytes a clear key range contains. Otherwise uses the location cache to roughly estimate this." />
|
||||
<Option name="bypass_unreadable" code="1100"
|
||||
description="Allows ``get`` operations to read from sections of keyspace that have become unreadable because of versionstamp operations. These reads will view versionstamp operations as if they were set operations that did not fill in the versionstamp." />
|
||||
<Option name="use_grv_cache" code="1101"
|
||||
description="Allows this transaction to use cached GRV from the database context. Defaults to off. Upon first usage, starts a background updater to periodically update the cache to avoid stale read versions." />
|
||||
<Option name="skip_grv_cache" code="1102"
|
||||
description="Specifically instruct this transaction to NOT use cached GRV. Primarily used for the read version cache's background updater to avoid attempting to read a cached entry in specific situations."
|
||||
hidden="true"/>
|
||||
</Scope>
|
||||
|
||||
<!-- The enumeration values matter - do not change them without
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
// used for simulation validations
|
||||
static std::map<std::string, int64_t> validationData;
|
||||
static std::map<int64_t, double> timedVersionsValidationData;
|
||||
static std::set<UID> disabledMachines;
|
||||
|
||||
void debug_setVersionCheckEnabled(UID uid, bool enabled) {
|
||||
|
@ -127,3 +128,27 @@ bool debug_isCheckRelocationDuration() {
|
|||
void debug_setCheckRelocationDuration(bool check) {
|
||||
checkRelocationDuration = check;
|
||||
}
|
||||
void debug_advanceVersionTimestamp(int64_t version, double t) {
|
||||
if (!g_network->isSimulated() || g_simulator.extraDB)
|
||||
return;
|
||||
timedVersionsValidationData[version] = t;
|
||||
}
|
||||
|
||||
bool debug_checkVersionTime(int64_t version, double t, std::string context, Severity sev) {
|
||||
if (!g_network->isSimulated() || g_simulator.extraDB)
|
||||
return false;
|
||||
if (!timedVersionsValidationData.count(version)) {
|
||||
TraceEvent(SevWarn, (context + "UnknownTime").c_str())
|
||||
.detail("VersionChecking", version)
|
||||
.detail("TimeChecking", t);
|
||||
return false;
|
||||
}
|
||||
if (t > timedVersionsValidationData[version]) {
|
||||
TraceEvent(sev, (context + "VersionTimeError").c_str())
|
||||
.detail("VersionChecking", version)
|
||||
.detail("TimeChecking", t)
|
||||
.detail("MaxTime", timedVersionsValidationData[version]);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
|
@ -48,4 +48,7 @@ bool debug_checkMaxRestoredVersion(UID id, int64_t version, std::string context,
|
|||
bool debug_isCheckRelocationDuration();
|
||||
void debug_setCheckRelocationDuration(bool check);
|
||||
|
||||
void debug_advanceVersionTimestamp(int64_t version, double t);
|
||||
bool debug_checkVersionTime(int64_t version, double t, std::string context, Severity sev = SevError);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -250,6 +250,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/SelectorCorrectness.actor.cpp
|
||||
workloads/Serializability.actor.cpp
|
||||
workloads/Sideband.actor.cpp
|
||||
workloads/SidebandSingle.actor.cpp
|
||||
workloads/SimpleAtomicAdd.actor.cpp
|
||||
workloads/SlowTaskWorkload.actor.cpp
|
||||
workloads/SnapTest.actor.cpp
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/TransactionLineage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
|
@ -26,6 +27,7 @@
|
|||
#include "fdbclient/GrvProxyInterface.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
|
@ -45,6 +47,11 @@ struct GrvProxyStats {
|
|||
double percentageOfDefaultGRVQueueProcessed;
|
||||
double percentageOfBatchGRVQueueProcessed;
|
||||
|
||||
bool lastBatchQueueThrottled;
|
||||
bool lastDefaultQueueThrottled;
|
||||
double batchThrottleStartTime;
|
||||
double defaultThrottleStartTime;
|
||||
|
||||
LatencySample defaultTxnGRVTimeInQueue;
|
||||
LatencySample batchTxnGRVTimeInQueue;
|
||||
|
||||
|
@ -97,10 +104,12 @@ struct GrvProxyStats {
|
|||
updatesFromRatekeeper("UpdatesFromRatekeeper", cc), leaseTimeouts("LeaseTimeouts", cc), systemGRVQueueSize(0),
|
||||
defaultGRVQueueSize(0), batchGRVQueueSize(0), transactionRateAllowed(0), batchTransactionRateAllowed(0),
|
||||
transactionLimit(0), batchTransactionLimit(0), percentageOfDefaultGRVQueueProcessed(0),
|
||||
percentageOfBatchGRVQueueProcessed(0), defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
percentageOfBatchGRVQueueProcessed(0), lastBatchQueueThrottled(false), lastDefaultQueueThrottled(false),
|
||||
batchThrottleStartTime(0.0), defaultThrottleStartTime(0.0),
|
||||
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue",
|
||||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
|
@ -647,6 +656,22 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
|
|||
}
|
||||
}
|
||||
|
||||
if (stats->lastBatchQueueThrottled) {
|
||||
// Check if this throttling has been sustained for a certain amount of time to avoid false positives
|
||||
if (now() - stats->batchThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) {
|
||||
reply.rkBatchThrottled = true;
|
||||
}
|
||||
}
|
||||
if (stats->lastDefaultQueueThrottled) {
|
||||
// Check if this throttling has been sustained for a certain amount of time to avoid false positives
|
||||
if (now() - stats->defaultThrottleStartTime > CLIENT_KNOBS->GRV_SUSTAINED_THROTTLING_THRESHOLD) {
|
||||
// Consider the batch queue throttled if the default is throttled
|
||||
// to deal with a potential lull in activity for that priority.
|
||||
// Avoids mistakenly thinking batch is unthrottled while default is still throttled.
|
||||
reply.rkBatchThrottled = true;
|
||||
reply.rkDefaultThrottled = true;
|
||||
}
|
||||
}
|
||||
request.reply.send(reply);
|
||||
++stats->txnRequestOut;
|
||||
}
|
||||
|
@ -838,12 +863,27 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
|||
grvProxyData->stats.batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
|
||||
--grvProxyData->stats.batchGRVQueueSize;
|
||||
}
|
||||
|
||||
start[req.flags & 1].push_back(std::move(req));
|
||||
static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
|
||||
transactionQueue->pop_front();
|
||||
requestsToStart++;
|
||||
}
|
||||
if (!batchQueue.empty()) {
|
||||
if (!grvProxyData->stats.lastBatchQueueThrottled) {
|
||||
grvProxyData->stats.lastBatchQueueThrottled = true;
|
||||
grvProxyData->stats.batchThrottleStartTime = now();
|
||||
}
|
||||
} else {
|
||||
grvProxyData->stats.lastBatchQueueThrottled = false;
|
||||
}
|
||||
if (!defaultQueue.empty()) {
|
||||
if (!grvProxyData->stats.lastDefaultQueueThrottled) {
|
||||
grvProxyData->stats.lastDefaultQueueThrottled = true;
|
||||
grvProxyData->stats.defaultThrottleStartTime = now();
|
||||
}
|
||||
} else {
|
||||
grvProxyData->stats.lastDefaultQueueThrottled = false;
|
||||
}
|
||||
|
||||
if (!systemQueue.empty() || !defaultQueue.empty() || !batchQueue.empty()) {
|
||||
forwardPromise(
|
||||
|
|
|
@ -225,6 +225,12 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
|||
waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
|
||||
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
||||
if (req.version > self->liveCommittedVersion) {
|
||||
auto curTime = now();
|
||||
// add debug here to change liveCommittedVersion to time bound of now()
|
||||
debug_advanceVersionTimestamp(self->liveCommittedVersion,
|
||||
curTime + CLIENT_KNOBS->MAX_VERSION_CACHE_LAG);
|
||||
// also add req.version but with no time bound
|
||||
debug_advanceVersionTimestamp(req.version, std::numeric_limits<double>::max());
|
||||
self->liveCommittedVersion = req.version;
|
||||
self->databaseLocked = req.locked;
|
||||
self->proxyMetadataVersion = req.metadataVersion;
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* SidebandSingle.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
/*
|
||||
* This workload is modelled off the Sideband workload, except it uses a single
|
||||
* mutator and checker rather than several. In addition to ordinary consistency checks,
|
||||
* it also checks the consistency of the cached read versions when using the
|
||||
* USE_GRV_CACHE transaction option, specifically when commit_unknown_result
|
||||
* produces a maybe/maybe-not written scenario. It makes sure that a cached read of an
|
||||
* unknown result matches the regular read of that same key and is not too stale.
|
||||
*/
|
||||
|
||||
struct SidebandSingleWorkload : TestWorkload {
|
||||
double testDuration, operationsPerSecond;
|
||||
// Pair represents <Key, commitVersion>
|
||||
PromiseStream<std::pair<uint64_t, Version>> interf;
|
||||
|
||||
std::vector<Future<Void>> clients;
|
||||
PerfIntCounter messages, consistencyErrors, keysUnexpectedlyPresent;
|
||||
|
||||
SidebandSingleWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx), messages("Messages"), consistencyErrors("Causal Consistency Errors"),
|
||||
keysUnexpectedlyPresent("KeysUnexpectedlyPresent") {
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||
operationsPerSecond = getOption(options, LiteralStringRef("operationsPerSecond"), 50.0);
|
||||
}
|
||||
|
||||
std::string description() const override { return "SidebandSingleWorkload"; }
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (clientId != 0)
|
||||
return Void();
|
||||
|
||||
clients.push_back(mutator(this, cx));
|
||||
clients.push_back(checker(this, cx));
|
||||
return delay(testDuration);
|
||||
}
|
||||
Future<bool> check(Database const& cx) override {
|
||||
if (clientId != 0)
|
||||
return true;
|
||||
int errors = 0;
|
||||
for (int c = 0; c < clients.size(); c++) {
|
||||
errors += clients[c].isError();
|
||||
}
|
||||
if (errors)
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "There were client errors.");
|
||||
clients.clear();
|
||||
if (consistencyErrors.getValue())
|
||||
TraceEvent(SevError, "TestFailure").detail("Reason", "There were causal consistency errors.");
|
||||
return !errors && !consistencyErrors.getValue();
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
m.push_back(messages.getMetric());
|
||||
m.push_back(consistencyErrors.getMetric());
|
||||
m.push_back(keysUnexpectedlyPresent.getMetric());
|
||||
}
|
||||
|
||||
ACTOR Future<Void> mutator(SidebandSingleWorkload* self, Database cx) {
|
||||
state double lastTime = now();
|
||||
state Version commitVersion;
|
||||
state bool unknown = false;
|
||||
|
||||
loop {
|
||||
wait(poisson(&lastTime, 1.0 / self->operationsPerSecond));
|
||||
state Transaction tr0(cx);
|
||||
state Transaction tr(cx);
|
||||
state uint64_t key = deterministicRandom()->randomUniqueID().hash();
|
||||
|
||||
state Standalone<StringRef> messageKey(format("Sideband/Message/%llx", key));
|
||||
// first set, this is the "old" value, always retry
|
||||
loop {
|
||||
try {
|
||||
tr0.set(messageKey, LiteralStringRef("oldbeef"));
|
||||
wait(tr0.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr0.onError(e));
|
||||
}
|
||||
}
|
||||
// second set, the checker should see this, no retries on unknown result
|
||||
loop {
|
||||
try {
|
||||
tr.set(messageKey, LiteralStringRef("deadbeef"));
|
||||
wait(tr.commit());
|
||||
commitVersion = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_commit_unknown_result) {
|
||||
unknown = true;
|
||||
++self->messages;
|
||||
self->interf.send(std::pair(key, invalidVersion));
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
if (unknown) {
|
||||
unknown = false;
|
||||
continue;
|
||||
}
|
||||
++self->messages;
|
||||
self->interf.send(std::pair(key, commitVersion));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checker(SidebandSingleWorkload* self, Database cx) {
|
||||
loop {
|
||||
// Pair represents <Key, commitVersion>
|
||||
state std::pair<uint64_t, Version> message = waitNext(self->interf.getFuture());
|
||||
state Standalone<StringRef> messageKey(format("Sideband/Message/%llx", message.first));
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::USE_GRV_CACHE);
|
||||
state Optional<Value> val = wait(tr.get(messageKey));
|
||||
if (!val.present()) {
|
||||
TraceEvent(SevError, "CausalConsistencyError1")
|
||||
.detail("MessageKey", messageKey.toString().c_str())
|
||||
.detail("RemoteCommitVersion", message.second)
|
||||
.detail("LocalReadVersion",
|
||||
tr.getReadVersion().get()); // will assert that ReadVersion is set
|
||||
++self->consistencyErrors;
|
||||
} else if (val.get() != LiteralStringRef("deadbeef")) {
|
||||
// If we read something NOT "deadbeef" and there was no commit_unknown_result,
|
||||
// the cache somehow read a stale version of our key
|
||||
if (message.second != invalidVersion) {
|
||||
TraceEvent(SevError, "CausalConsistencyError2")
|
||||
.detail("MessageKey", messageKey.toString().c_str());
|
||||
++self->consistencyErrors;
|
||||
break;
|
||||
}
|
||||
// check again without cache, and if it's the same, that's expected
|
||||
state Transaction tr2(cx);
|
||||
state Optional<Value> val2;
|
||||
loop {
|
||||
try {
|
||||
wait(store(val2, tr2.get(messageKey)));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("DebugSidebandNoCacheError").error(e, true);
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
if (val != val2) {
|
||||
TraceEvent(SevError, "CausalConsistencyError3")
|
||||
.detail("MessageKey", messageKey.toString().c_str())
|
||||
.detail("Val1", val)
|
||||
.detail("Val2", val2)
|
||||
.detail("RemoteCommitVersion", message.second)
|
||||
.detail("LocalReadVersion",
|
||||
tr.getReadVersion().get()); // will assert that ReadVersion is set
|
||||
++self->consistencyErrors;
|
||||
}
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("DebugSidebandCheckError").error(e, true);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<SidebandSingleWorkload> SidebandSingleWorkloadFactory("SidebandSingle");
|
|
@ -163,6 +163,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/ReportConflictingKeys.toml)
|
||||
add_fdb_test(TEST_FILES fast/SelectorCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES fast/Sideband.toml)
|
||||
add_fdb_test(TEST_FILES fast/SidebandSingle.toml)
|
||||
add_fdb_test(TEST_FILES fast/SidebandWithStatus.toml)
|
||||
add_fdb_test(TEST_FILES fast/SimpleAtomicAdd.toml)
|
||||
add_fdb_test(TEST_FILES fast/SpecialKeySpaceCorrectness.toml)
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
[[test]]
|
||||
testTitle = 'SingleClientCausalConsistencyTest'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'SidebandSingle'
|
||||
testDuration = 30.0
|
||||
operationsPerSecond = 500
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'RandomClogging'
|
||||
testDuration = 30.0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Rollback'
|
||||
meanDelay = 10.0
|
||||
testDuration = 30.0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Attrition'
|
||||
machinesToKill = 10
|
||||
machinesToLeave = 3
|
||||
reboot = true
|
||||
testDuration = 30.0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Attrition'
|
||||
machinesToKill = 10
|
||||
machinesToLeave = 3
|
||||
reboot = true
|
||||
testDuration = 30.0
|
Loading…
Reference in New Issue