adjust sideband workload to expose another consistency error

This commit is contained in:
Jon Fu 2021-12-10 16:51:12 -05:00
parent 00ffd99938
commit 7fab9ea5c0
2 changed files with 86 additions and 30 deletions

View File

@ -203,14 +203,20 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
}
void DatabaseContext::updateCachedRV(double t, Version v) {
if (t > lastTimedGrv && v >= cachedRv) {
if (v >= cachedRv) {
TraceEvent("CheckpointCacheUpdate")
.detail("Version", v)
.detail("CurTime", t)
.detail("LastVersion", cachedRv)
.detail("LastTime", lastTimedGrv);
cachedRv = v;
lastTimedGrv = t;
// Since the time is based on the start of the request, it's possible that we
// get a newer version with an older time.
// (Request started earlier, but was latest to reach the proxy)
// Only update time when strictly increasing (?)
if (t > lastTimedGrv) {
lastTimedGrv = t;
}
}
}
@ -5338,15 +5344,18 @@ ACTOR static Future<Void> tryCommit(Database cx,
}
when(CommitID ci = wait(reply)) {
Version v = ci.version;
if (cx->rvCacheGeneration != tr->getRvGeneration()) {
cx->invalidateRvCache();
} else {
cx->updateCachedRV(grvTime, v);
}
if (v != invalidVersion) {
if (CLIENT_BUGGIFY) {
throw commit_unknown_result();
}
// TraceEvent("CheckpointSideband1");
// if (cx->rvCacheGeneration != tr->getRvGeneration()) {
// TraceEvent("CheckpointSideband1.1");
// cx->invalidateRvCache();
// } else {
TraceEvent("CheckpointSideband1.2");
cx->updateCachedRV(grvTime, v);
// }
if (info.debugID.present())
TraceEvent(interval.end()).detail("CommittedVersion", v);
*pCommittedVersion = v;
@ -5411,6 +5420,7 @@ ACTOR static Future<Void> tryCommit(Database cx,
} catch (Error& e) {
if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
// We don't know if the commit happened, and it might even still be in flight.
TraceEvent("DebugSidebandCommitUnknownResult");
// Advance the cached RV generation to create a time boundary where a commit (possibly) failed.
cx->rvCacheGeneration++;
@ -6048,14 +6058,15 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
if (!CLIENT_KNOBS->FORCE_GRV_CACHE_OFF && !options.skipGrvCache &&
(deterministicRandom()->random01() <= CLIENT_KNOBS->DEBUG_USE_GRV_CACHE_CHANCE || options.useGrvCache) &&
rkThrottlingCooledDown(getDatabase().getPtr())) {
TraceEvent("DebugGrvUseCache")
.detail("LastRV", cx->cachedRv)
.detail("LastTime", format("%.6f", cx->lastTimedGrv));
TraceEvent("DebugGrvEnterCachePath");
// Upon our first request to use cached RVs, start the background updater
if (!cx->grvUpdateHandler.isValid()) {
cx->grvUpdateHandler = backgroundGrvUpdater(getDatabase().getPtr());
}
if (now() - cx->lastTimedGrv <= CLIENT_KNOBS->MAX_VERSION_CACHE_LAG) {
if (now() - cx->lastTimedGrv <= CLIENT_KNOBS->MAX_VERSION_CACHE_LAG && cx->cachedRv != Version(0)) {
TraceEvent("DebugGrvUseCache")
.detail("LastRV", cx->cachedRv)
.detail("LastTime", format("%.6f", cx->lastTimedGrv));
readVersion = cx->cachedRv;
return readVersion;
} // else go through regular GRV path
@ -6137,6 +6148,8 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
startTime,
metadataVersion,
options.tags);
} else {
TraceEvent("DebugGrvValidRv");
}
return readVersion;
}

View File

@ -106,7 +106,6 @@ struct SidebandSingleWorkload : TestWorkload {
state Standalone<StringRef> serializedInterface = wr.toValue();
loop {
try {
tr.setOption(FDBTransactionOptions::USE_GRV_CACHE);
Optional<Value> val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId))));
if (val.present()) {
if (val.get() != serializedInterface)
@ -128,7 +127,6 @@ struct SidebandSingleWorkload : TestWorkload {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::USE_GRV_CACHE);
Optional<Value> val = wait(tr.get(StringRef(format("Sideband/Client/%d", self->clientId))));
if (!val.present()) {
throw operation_failed();
@ -148,34 +146,50 @@ struct SidebandSingleWorkload : TestWorkload {
state SidebandInterface checker = wait(self->fetchSideband(self, cx));
state double lastTime = now();
state Version commitVersion;
state bool unknown = false;
loop {
wait(poisson(&lastTime, 1.0 / self->operationsPerSecond));
state Transaction tr0(cx);
state Transaction tr(cx);
state uint64_t key = deterministicRandom()->randomUniqueID().hash();
state Standalone<StringRef> messageKey(format("Sideband/Message/%llx", key));
// first set, this is the "old" value, always retry
loop {
try {
tr.setOption(FDBTransactionOptions::USE_GRV_CACHE);
Optional<Value> val = wait(tr.get(messageKey));
if (val.present()) {
commitVersion = tr.getReadVersion().get();
++self->keysUnexpectedlyPresent;
break;
}
tr.set(messageKey, LiteralStringRef("deadbeef"));
wait(tr.commit());
commitVersion = tr.getCommittedVersion();
tr0.set(messageKey, LiteralStringRef("oldbeef"));
wait(tr0.commit());
break;
} catch (Error& e) {
state Error error = e;
wait(tr.onError(e));
if (error.code() == error_code_commit_unknown_result) {
// something
}
wait(tr0.onError(e));
}
}
// second set, the checker should see this, no retries on unknown result
loop {
try {
// tr.setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY);
tr.set(messageKey, LiteralStringRef("deadbeef"));
TraceEvent("DebugSidebandBeforeCommit");
wait(tr.commit());
commitVersion = tr.getCommittedVersion();
TraceEvent("DebugSidebandAfterCommit").detail("CommitVersion", commitVersion);
break;
} catch (Error& e) {
if (e.code() == error_code_commit_unknown_result) {
TraceEvent("DebugSidebandUnknownResult");
unknown = true;
++self->messages;
checker.updates.send(SidebandMessage(key, invalidVersion));
break;
}
wait(tr.onError(e));
}
}
if (unknown) {
unknown = false;
continue;
}
++self->messages;
checker.updates.send(SidebandMessage(key, commitVersion));
}
@ -188,18 +202,47 @@ struct SidebandSingleWorkload : TestWorkload {
state Transaction tr(cx);
loop {
try {
TraceEvent("DebugSidebandCacheGetBefore");
tr.setOption(FDBTransactionOptions::USE_GRV_CACHE);
Optional<Value> val = wait(tr.get(messageKey));
state Optional<Value> val = wait(tr.get(messageKey));
TraceEvent("DebugSidebandCacheGetAfter");
if (!val.present()) {
TraceEvent(SevError, "CausalConsistencyError", self->interf.id())
TraceEvent(SevError, "CausalConsistencyError1", self->interf.id())
.detail("MessageKey", messageKey.toString().c_str())
.detail("RemoteCommitVersion", message.commitVersion)
.detail("LocalReadVersion",
tr.getReadVersion().get()); // will assert that ReadVersion is set
++self->consistencyErrors;
} else if (val.get() != LiteralStringRef("deadbeef")) {
TraceEvent("DebugSidebandOldBeef");
// check again without cache, and if it's the same, that's expected
state Transaction tr2(cx);
state Optional<Value> val2;
loop {
try {
TraceEvent("DebugSidebandNoCacheGetBefore");
Optional<Value> val2 = wait(tr2.get(messageKey));
TraceEvent("DebugSidebandNoCacheGetAfter");
break;
} catch (Error& e) {
TraceEvent("DebugSidebandNoCacheError").error(e, true);
wait(tr2.onError(e));
}
}
if (val != val2) {
TraceEvent(SevError, "CausalConsistencyError2", self->interf.id())
.detail("MessageKey", messageKey.toString().c_str())
.detail("Val1", val)
.detail("Val2", val2)
.detail("RemoteCommitVersion", message.commitVersion)
.detail("LocalReadVersion",
tr.getReadVersion().get()); // will assert that ReadVersion is set
++self->consistencyErrors;
}
}
break;
} catch (Error& e) {
TraceEvent("DebugSidebandCheckError").error(e, true);
wait(tr.onError(e));
}
}