diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 4e34b2a89f..4b6f227857 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -87,7 +87,7 @@ static inline bool isNonAssociativeOp(MutationRef::Type mutationType) { } struct CommitTransactionRef { - CommitTransactionRef() {} + CommitTransactionRef() : read_snapshot(0) {} CommitTransactionRef(Arena &a, const CommitTransactionRef &from) : read_conflict_ranges(a, from.read_conflict_ranges), write_conflict_ranges(a, from.write_conflict_ranges), diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index ff733d04f7..41e5ead251 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -1554,6 +1554,14 @@ public: loop { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); + // dumpData's commits are unstoppable, and we need to make sure that no dumpData commits + // happen after this transaction, as it would mean that the applyMutationsBeginRange read we + // do isn't the final value, and thus a greater version of commits could have been applied. + // Thus, we need to commit it against the same proxy that all dumpData transactions were + // submitted to. The transaction above will stop any further dumpData calls from adding + // transactions to the proxy's commit promise stream, so our commit will come after all + // dumpData transactions. + tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY); try { // Ensure that we're at a version higher than the data that we've written. Optional lastApplied = wait(tr->get(logUid.withPrefix(applyMutationsBeginRange.begin))); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 508f6730b6..b49d136619 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2317,13 +2317,12 @@ void Transaction::setupWatches() { } } -ACTOR static Future tryCommit( Database cx, Reference trLogInfo, CommitTransactionRequest req, Future readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, bool causalWriteRisky ) { +ACTOR static Future tryCommit( Database cx, Reference trLogInfo, CommitTransactionRequest req, Future readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) { state TraceInterval interval( "TransactionCommit" ); state double startTime; if (info.debugID.present()) TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() ); - req.transaction.read_snapshot = 0; try { Version v = wait( readVersion ); req.transaction.read_snapshot = v; @@ -2337,7 +2336,13 @@ ACTOR static Future tryCommit( Database cx, Reference } req.debugID = commitID; - state Future reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true ); + state Future reply; + if (options.commitOnFirstProxy) { + const std::vector& proxies = cx->clientInfo->get().proxies; + reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never(); + } else { + reply = loadBalance( cx->getMasterProxies(), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true ); + } choose { when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) { @@ -2382,7 +2387,7 @@ ACTOR static Future tryCommit( Database cx, Reference if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) { // We don't know if the commit happened, and it might even still be in flight. - if (!causalWriteRisky) { + if (!options.causalWriteRisky) { // Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the version we submitted with is dead, or by committing a conflicting transaction successfully //if ( cx->getMasterProxies()->masterGeneration <= originalMasterGeneration ) @@ -2470,7 +2475,7 @@ Future Transaction::commitMutations() { tr.isLockAware = options.lockAware; - Future commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options.causalWriteRisky ); + Future commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options ); if (isCheckingWrites) { Promise committed; @@ -2553,6 +2558,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional 100) { throw invalid_option_value(); } diff --git a/fdbclient/NativeAPI.h b/fdbclient/NativeAPI.h index 785e6f3be9..2abc66ad87 100644 --- a/fdbclient/NativeAPI.h +++ b/fdbclient/NativeAPI.h @@ -142,17 +142,24 @@ struct StorageMetrics; struct TransactionOptions { double maxBackoff; - uint32_t getReadVersionFlags : 32; - uint32_t customTransactionSizeLimit : 32; + uint32_t getReadVersionFlags; + uint32_t customTransactionSizeLimit; bool checkWritesEnabled : 1; bool causalWriteRisky : 1; + bool commitOnFirstProxy : 1; bool debugDump : 1; bool lockAware : 1; bool readOnly : 1; - - TransactionOptions() { reset(); } - void reset() { - memset(this, 0, sizeof(*this)); + + TransactionOptions() { + reset(); + if (BUGGIFY) { + commitOnFirstProxy = true; + } + } + + void reset() { + memset(this, 0, sizeof(*this)); maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; } }; @@ -319,4 +326,4 @@ std::string unprintable( const std::string& ); int64_t extractIntOption( Optional value, int64_t minValue = std::numeric_limits::min(), int64_t maxValue = std::numeric_limits::max() ); -#endif \ No newline at end of file +#endif diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index fdbd721c81..17a5af781b 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -131,6 +131,8 @@ description is not currently required but encouraged.