From b5fbd071552928a5123ad7993fd6e84a56c4515b Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 14 Sep 2021 19:30:52 -0700 Subject: [PATCH] Transactions would not honor the transaction timeout option if the MVC did not have an active database. --- fdbclient/MultiVersionTransaction.actor.cpp | 110 ++++++++++++++------ fdbclient/MultiVersionTransaction.h | 9 ++ 2 files changed, 87 insertions(+), 32 deletions(-) diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 61425401c2..c763437ced 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -606,7 +606,7 @@ void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParame // MultiVersionTransaction MultiVersionTransaction::MultiVersionTransaction(Reference db, UniqueOrderedOptionList defaultOptions) - : db(db) { + : db(db), startTime(timer_monotonic()), timeoutFuture(new ThreadSingleAssignmentVar()) { setDefaultOptions(defaultOptions); updateTransaction(); } @@ -622,20 +622,23 @@ void MultiVersionTransaction::updateTransaction() { TransactionInfo newTr; if (currentDb.value) { newTr.transaction = currentDb.value->createTransaction(); + } - Optional timeout; - for (auto option : persistentOptions) { - if (option.first == FDBTransactionOptions::TIMEOUT) { - timeout = option.second.castTo(); - } else { - newTr.transaction->setOption(option.first, option.second.castTo()); - } + Optional timeout; + for (auto option : persistentOptions) { + if (option.first == FDBTransactionOptions::TIMEOUT) { + timeout = option.second.castTo(); + } else if (currentDb.value) { + newTr.transaction->setOption(option.first, option.second.castTo()); } + } - // Setting a timeout can immediately cause a transaction to fail. The only timeout - // that matters is the one most recently set, so we ignore any earlier set timeouts - // that might inadvertently fail the transaction. - if (timeout.present()) { + // Setting a timeout can immediately cause a transaction to fail. The only timeout + // that matters is the one most recently set, so we ignore any earlier set timeouts + // that might inadvertently fail the transaction. + if (timeout.present()) { + setTimeout(timeout); + if (currentDb.value) { newTr.transaction->setOption(FDBTransactionOptions::TIMEOUT, timeout); } } @@ -670,19 +673,19 @@ void MultiVersionTransaction::setVersion(Version v) { } ThreadFuture MultiVersionTransaction::getReadVersion() { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getReadVersion() : makeTimeout(); return abortableFuture(f, tr.onChange); } ThreadFuture> MultiVersionTransaction::get(const KeyRef& key, bool snapshot) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->get(key, snapshot) : ThreadFuture>(Never()); + auto f = tr.transaction ? tr.transaction->get(key, snapshot) : makeTimeout>(); return abortableFuture(f, tr.onChange); } ThreadFuture MultiVersionTransaction::getKey(const KeySelectorRef& key, bool snapshot) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -692,8 +695,8 @@ ThreadFuture MultiVersionTransaction::getRange(const KeySelectorRef bool snapshot, bool reverse) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse) - : ThreadFuture(Never()); + auto f = + tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -703,8 +706,8 @@ ThreadFuture MultiVersionTransaction::getRange(const KeySelectorRef bool snapshot, bool reverse) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse) - : ThreadFuture(Never()); + auto f = + tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -713,8 +716,7 @@ ThreadFuture MultiVersionTransaction::getRange(const KeyRangeRef& k bool snapshot, bool reverse) { auto tr = getTransaction(); - auto f = - tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -723,21 +725,20 @@ ThreadFuture MultiVersionTransaction::getRange(const KeyRangeRef& k bool snapshot, bool reverse) { auto tr = getTransaction(); - auto f = - tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : makeTimeout(); return abortableFuture(f, tr.onChange); } ThreadFuture> MultiVersionTransaction::getVersionstamp() { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getVersionstamp() : ThreadFuture>(Never()); + auto f = tr.transaction ? tr.transaction->getVersionstamp() : makeTimeout>(); return abortableFuture(f, tr.onChange); } ThreadFuture>> MultiVersionTransaction::getAddressesForKey(const KeyRef& key) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getAddressesForKey(key) - : ThreadFuture>>(Never()); + auto f = + tr.transaction ? tr.transaction->getAddressesForKey(key) : makeTimeout>>(); return abortableFuture(f, tr.onChange); } @@ -750,7 +751,7 @@ void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) { ThreadFuture MultiVersionTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -758,7 +759,7 @@ ThreadFuture>> MultiVersionTransaction::getRangeSpl int64_t chunkSize) { auto tr = getTransaction(); auto f = tr.transaction ? tr.transaction->getRangeSplitPoints(range, chunkSize) - : ThreadFuture>>(Never()); + : makeTimeout>>(); return abortableFuture(f, tr.onChange); } @@ -799,7 +800,7 @@ void MultiVersionTransaction::clear(const KeyRef& key) { ThreadFuture MultiVersionTransaction::watch(const KeyRef& key) { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->watch(key) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->watch(key) : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -812,7 +813,7 @@ void MultiVersionTransaction::addWriteConflictRange(const KeyRangeRef& keys) { ThreadFuture MultiVersionTransaction::commit() { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->commit() : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->commit() : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -827,7 +828,7 @@ Version MultiVersionTransaction::getCommittedVersion() { ThreadFuture MultiVersionTransaction::getApproximateSize() { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->getApproximateSize() : makeTimeout(); return abortableFuture(f, tr.onChange); } @@ -841,6 +842,11 @@ void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Op if (MultiVersionApi::apiVersionAtLeast(610) && itr->second.persistent) { persistentOptions.emplace_back(option, value.castTo>()); } + + if (itr->first == FDBTransactionOptions::TIMEOUT) { + setTimeout(value); + } + auto tr = getTransaction(); if (tr.transaction) { tr.transaction->setOption(option, value); @@ -853,7 +859,7 @@ ThreadFuture MultiVersionTransaction::onError(Error const& e) { return ThreadFuture(Void()); } else { auto tr = getTransaction(); - auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture(Never()); + auto f = tr.transaction ? tr.transaction->onError(e) : makeTimeout(); f = abortableFuture(f, tr.onChange); return flatMapThreadFuture(f, [this, e](ErrorOr ready) { @@ -871,6 +877,46 @@ ThreadFuture MultiVersionTransaction::onError(Error const& e) { } } +ACTOR Future timeoutImpl(Reference tsav, double duration) { + wait(delay(duration)); + if (!tsav->isReady()) { + tsav->sendError(transaction_timed_out()); + } + + return Void(); +} + +void MultiVersionTransaction::setTimeout(Optional value) { + double timeoutDuration = extractIntOption(value, 0, std::numeric_limits::max()) / 1000.0; + + ThreadFuture prevTimeout; + { + ThreadSpinLockHolder holder(timeoutLock); + + prevTimeout = currentTimeout; + currentTimeout = onMainThread([this, timeoutDuration]() { + return timeoutImpl(Reference::addRef(timeoutFuture.getPtr()), + timeoutDuration - std::max(0.0, now() - startTime)); + }); + } + + if (prevTimeout.isValid()) { + prevTimeout.cancel(); + } +} + +template +ThreadFuture MultiVersionTransaction::makeTimeout() { + return mapThreadFuture(timeoutFuture, [](ErrorOr v) { + ASSERT(v.isError()); + if (v.getError().code() == error_code_transaction_timed_out) { + return ErrorOr(v.getError()); + } else { + return ErrorOr(transaction_cancelled()); + } + }); +} + void MultiVersionTransaction::reset() { persistentOptions.clear(); setDefaultOptions(db->dbState->transactionDefaultOptions); diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index f3d7f5d8ce..419f3b28c9 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -400,6 +400,15 @@ private: ThreadFuture onChange; }; + double startTime; + ThreadSpinLock timeoutLock; + ThreadFuture timeoutFuture; + ThreadFuture currentTimeout; + void setTimeout(Optional value); + + template + ThreadFuture makeTimeout(); + TransactionInfo transaction; TransactionInfo getTransaction();