Merge pull request #6304 from sfc-gh-ajbeamon/reset-copy-trstate

Copy transaction state during reset
This commit is contained in:
A.J. Beamon 2022-01-31 09:11:04 -08:00 committed by GitHub
commit d7819f8387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 29 deletions

View File

@ -2634,6 +2634,42 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
return Void();
}
SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) {
uint64_t txnId = deterministicRandom()->randomUInt64();
if (parentContext.isValid()) {
if (parentContext.first() > 0) {
txnId = parentContext.first();
}
uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0;
return SpanID(txnId, tokenId);
} else if (transactionTracingSample) {
uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0;
return SpanID(txnId, tokenId);
} else {
return SpanID(txnId, 0);
}
}
Reference<TransactionState> TransactionState::cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo,
bool generateNewSpan) const {
SpanID newSpanID = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanID;
Reference<TransactionState> newState = makeReference<TransactionState>(cx, cx->taskID, newSpanID, newTrLogInfo);
if (!cx->apiVersionAtLeast(16)) {
newState->options = options;
}
newState->numErrors = numErrors;
newState->startTime = startTime;
newState->committedVersion = committedVersion;
newState->conflictingKeys = conflictingKeys;
return newState;
}
Future<Void> Transaction::warmRange(KeyRange keys) {
return warmRange_impl(trState, keys);
}
@ -4281,24 +4317,6 @@ void debugAddTags(Reference<TransactionState> trState) {
}
}
SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) {
uint64_t txnId = deterministicRandom()->randomUInt64();
if (parentContext.isValid()) {
if (parentContext.first() > 0) {
txnId = parentContext.first();
}
uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0;
return SpanID(txnId, tokenId);
} else if (transactionTracingSample) {
uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE
? deterministicRandom()->randomUInt64()
: 0;
return SpanID(txnId, tokenId);
} else {
return SpanID(txnId, 0);
}
}
Transaction::Transaction()
: trState(makeReference<TransactionState>(TaskPriority::DefaultEndpoint, generateSpanID(false))) {}
@ -4931,28 +4949,24 @@ void TransactionOptions::reset(Database const& cx) {
}
}
void Transaction::reset() {
void Transaction::resetImpl(bool generateNewSpan) {
flushTrLogsIfEnabled();
trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan);
tr = CommitTransactionRequest(trState->spanID);
readVersion = Future<Version>();
metadataVersion = Promise<Optional<Key>>();
extraConflictRanges.clear();
commitResult = Promise<Void>();
committing = Future<Void>();
flushTrLogsIfEnabled();
trState->versionstampPromise = Promise<Standalone<StringRef>>();
trState->taskID = trState->cx->taskID;
trState->debugID = Optional<UID>();
trState->trLogInfo = Reference<TransactionLogInfo>(createTrLogInfoProbabilistically(trState->cx));
cancelWatches();
}
if (apiVersionAtLeast(16)) {
trState->options.reset(trState->cx);
}
void Transaction::reset() {
resetImpl(false);
}
void Transaction::fullReset() {
trState->spanID = generateSpanID(trState->cx->transactionTracingSample);
reset();
resetImpl(true);
span = Span(trState->spanID, "Transaction"_loc);
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}

View File

@ -258,6 +258,8 @@ struct TransactionState : ReferenceCounted<TransactionState> {
TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference<TransactionLogInfo> trLogInfo)
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID) {}
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
};
class Transaction : NonCopyable {
@ -441,6 +443,8 @@ private:
Snapshot snapshot,
Reverse reverse);
void resetImpl(bool generateNewSpan);
double backoff;
CommitTransactionRequest tr;
Future<Version> readVersion;