Fix VersionStamp problems by instead adding a COMMIT_ON_FIRST_PROXY transaction option.
Simulation identified the fact that we can violate the VersionStamps-are-always-increasing promise via the following series of events: 1. On proxy 0, dumpData adds commit requests to proxy 0's commit promise stream 2. To any proxy, a client submits the first transaction of abortBackup, which stops further dumpData calls on proxy 0. 3. To any proxy that is not proxy 0, submit a transaction that checks if it needs to upgrade the destination version. 4. The transaction from (3) is committed 5. Transactions from (1) are committed This is possible because the dumpData transactions have no read conflict ranges, and thus it's impossible to make them abort due to "conflicting" transactions. There's also no promise that if client C sends a commit to proxy A, and later a client D sends a commit to proxy B, that B must log its commit after A. (We only promise that if C is told it was committed before D is told it was committed, then A committed before B.) There was a failed attempt to fix this problem. We tried to add read conflict ranges to dumpData transactions so that they could be aborted by "conflicting" transactions. However, this failed because this now means that dumpData transactions require conflict resolution, and the stale read version that they use can cause them to be aborted with a transaction_too_old error. (Transactions that don't have read conflict ranges will never return transaction_too_old, because with no reads, the read snapshot version is effectively meaningless.) This was never previously possible, so the existing code doesn't retry commits, and to make things more complicated, the dumpData commits must be applied in order. This would require either adding dependencies to transactions (if A is going to commit then B must also be/have committed), which would be complicated, or submitting transactions with a fixed read version, and replaying the failed commits with a higher read version once we get a transaction_too_old error, which would unacceptably slow down the maximum throughput of dumpData. Thus, we've instead elected to add a special transaction option that bypasses proxy load balancing for commits, and always commits against proxy 0. We can know for certain that after the transaction from (2) is committed, all of the dumpData transactions that will be committed have been added to the commit promise stream on proxy 0. Thus, if we enqueue another transaction against proxy 0, we can know that it will be placed into the promise stream after all of the dumpData transactions, thus providing the semantics that we require: no dumpData transaction can commit after the destination version upgrade transaction.
This commit is contained in:
parent
c7dbd31a1e
commit
b5a6bc0ab7
|
@ -87,7 +87,7 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) {
|
|||
}
|
||||
|
||||
struct CommitTransactionRef {
|
||||
CommitTransactionRef() {}
|
||||
CommitTransactionRef() : read_snapshot(0) {}
|
||||
CommitTransactionRef(Arena &a, const CommitTransactionRef &from)
|
||||
: read_conflict_ranges(a, from.read_conflict_ranges),
|
||||
write_conflict_ranges(a, from.write_conflict_ranges),
|
||||
|
|
|
@ -1554,6 +1554,14 @@ public:
|
|||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
// dumpData's commits are unstoppable, and we need to make sure that no dumpData commits
|
||||
// happen after this transaction, as it would mean that the applyMutationsBeginRange read we
|
||||
// do isn't the final value, and thus a greater version of commits could have been applied.
|
||||
// Thus, we need to commit it against the same proxy that all dumpData transactions were
|
||||
// submitted to. The transaction above will stop any further dumpData calls from adding
|
||||
// transactions to the proxy's commit promise stream, so our commit will come after all
|
||||
// dumpData transactions.
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
try {
|
||||
// Ensure that we're at a version higher than the data that we've written.
|
||||
Optional<Value> lastApplied = wait(tr->get(logUid.withPrefix(applyMutationsBeginRange.begin)));
|
||||
|
|
|
@ -2317,13 +2317,12 @@ void Transaction::setupWatches() {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, bool causalWriteRisky ) {
|
||||
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
|
||||
state TraceInterval interval( "TransactionCommit" );
|
||||
state double startTime;
|
||||
if (info.debugID.present())
|
||||
TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
|
||||
|
||||
req.transaction.read_snapshot = 0;
|
||||
try {
|
||||
Version v = wait( readVersion );
|
||||
req.transaction.read_snapshot = v;
|
||||
|
@ -2337,7 +2336,13 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
}
|
||||
|
||||
req.debugID = commitID;
|
||||
state Future<CommitID> reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
|
||||
state Future<CommitID> reply;
|
||||
if (options.commitOnFirstProxy) {
|
||||
const std::vector<MasterProxyInterface>& proxies = cx->clientInfo->get().proxies;
|
||||
reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never();
|
||||
} else {
|
||||
reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
|
||||
}
|
||||
|
||||
choose {
|
||||
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {
|
||||
|
@ -2382,7 +2387,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
|
||||
// We don't know if the commit happened, and it might even still be in flight.
|
||||
|
||||
if (!causalWriteRisky) {
|
||||
if (!options.causalWriteRisky) {
|
||||
// Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the version we submitted with is dead, or by committing a conflicting transaction successfully
|
||||
//if ( cx->getMasterProxies()->masterGeneration <= originalMasterGeneration )
|
||||
|
||||
|
@ -2470,7 +2475,7 @@ Future<Void> Transaction::commitMutations() {
|
|||
|
||||
tr.isLockAware = options.lockAware;
|
||||
|
||||
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options.causalWriteRisky );
|
||||
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
|
||||
|
||||
if (isCheckingWrites) {
|
||||
Promise<Void> committed;
|
||||
|
@ -2553,6 +2558,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
options.causalWriteRisky = true;
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY:
|
||||
validateOptionValue(value, false);
|
||||
options.commitOnFirstProxy = true;
|
||||
break;
|
||||
|
||||
case FDBTransactionOptions::CHECK_WRITES_ENABLE:
|
||||
validateOptionValue(value, false);
|
||||
options.checkWritesEnabled = true;
|
||||
|
@ -2565,7 +2575,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
|
||||
case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
|
||||
validateOptionValue(value, true);
|
||||
|
||||
|
||||
if(value.get().size() > 100) {
|
||||
throw invalid_option_value();
|
||||
}
|
||||
|
|
|
@ -142,17 +142,24 @@ struct StorageMetrics;
|
|||
|
||||
struct TransactionOptions {
|
||||
double maxBackoff;
|
||||
uint32_t getReadVersionFlags : 32;
|
||||
uint32_t customTransactionSizeLimit : 32;
|
||||
uint32_t getReadVersionFlags;
|
||||
uint32_t customTransactionSizeLimit;
|
||||
bool checkWritesEnabled : 1;
|
||||
bool causalWriteRisky : 1;
|
||||
bool commitOnFirstProxy : 1;
|
||||
bool debugDump : 1;
|
||||
bool lockAware : 1;
|
||||
bool readOnly : 1;
|
||||
|
||||
TransactionOptions() { reset(); }
|
||||
void reset() {
|
||||
memset(this, 0, sizeof(*this));
|
||||
|
||||
TransactionOptions() {
|
||||
reset();
|
||||
if (BUGGIFY) {
|
||||
commitOnFirstProxy = true;
|
||||
}
|
||||
}
|
||||
|
||||
void reset() {
|
||||
memset(this, 0, sizeof(*this));
|
||||
maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF;
|
||||
}
|
||||
};
|
||||
|
@ -319,4 +326,4 @@ std::string unprintable( const std::string& );
|
|||
|
||||
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::numeric_limits<int64_t>::min(), int64_t maxValue = std::numeric_limits<int64_t>::max() );
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -131,6 +131,8 @@ description is not currently required but encouraged.
|
|||
<Option name="causal_read_disable" code="21" />
|
||||
<Option name="next_write_no_write_conflict_range" code="30"
|
||||
description="The next write performed on this transaction will not generate a write conflict range. As a result, other transactions which read the key(s) being modified by the next write will not conflict with this transaction. Care needs to be taken when using this option on a transaction that is shared between multiple threads. When setting this option, write conflict ranges will be disabled on the next write operation, regardless of what thread it is on." />
|
||||
<Option name="commit_on_first_proxy" code="40"
|
||||
description="Committing this transaction will bypass the normal load balancing across proxies and go directly to the specifically nominated 'first proxy'." />
|
||||
<Option name="check_writes_enable" code="50" />
|
||||
<Option name="read_your_writes_disable" code="51"
|
||||
description="Reads performed by a transaction will not see any prior mutations that occured in that transaction, instead seeing the value which was in the database at the transaction's read version. This option may provide a small performance benefit for the client, but also disables a number of client-side optimizations which are beneficial for transactions which tend to read and write the same keys within a single transaction."/>
|
||||
|
|
|
@ -216,6 +216,7 @@ ACTOR Future<Void> newProxies( Reference<MasterData> self, Future< RecruitFromCo
|
|||
}
|
||||
|
||||
vector<MasterProxyInterface> newRecruits = wait( getAll( initializationReplies ) );
|
||||
// It is required for the correctness of COMMIT_ON_FIRST_PROXY that self->proxies[0] is the firstProxy.
|
||||
self->proxies = newRecruits;
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -1077,12 +1077,24 @@ ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
|
|||
return t;
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_broken_promise)
|
||||
throw e;
|
||||
throw;
|
||||
Void _ = wait(Never()); // never return
|
||||
throw internal_error(); // does not happen
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
|
||||
try {
|
||||
T t = wait(in);
|
||||
return t;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_broken_promise) {
|
||||
throw request_maybe_delivered();
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
|
||||
state Promise<T> out( std::move(*pOutputPromise) );
|
||||
Void _ = wait( signal );
|
||||
|
|
Loading…
Reference in New Issue