Transactions would not honor the transaction timeout option if the MVC did not have an active database.

This commit is contained in:
A.J. Beamon 2021-09-14 19:30:52 -07:00 committed by Markus Pilman
parent 2a0415e640
commit b5fbd07155
2 changed files with 87 additions and 32 deletions

View File

@ -606,7 +606,7 @@ void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParame
// MultiVersionTransaction
MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db,
UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions)
: db(db) {
: db(db), startTime(timer_monotonic()), timeoutFuture(new ThreadSingleAssignmentVar<Void>()) {
setDefaultOptions(defaultOptions);
updateTransaction();
}
@ -622,20 +622,23 @@ void MultiVersionTransaction::updateTransaction() {
TransactionInfo newTr;
if (currentDb.value) {
newTr.transaction = currentDb.value->createTransaction();
}
Optional<StringRef> timeout;
for (auto option : persistentOptions) {
if (option.first == FDBTransactionOptions::TIMEOUT) {
timeout = option.second.castTo<StringRef>();
} else {
newTr.transaction->setOption(option.first, option.second.castTo<StringRef>());
}
Optional<StringRef> timeout;
for (auto option : persistentOptions) {
if (option.first == FDBTransactionOptions::TIMEOUT) {
timeout = option.second.castTo<StringRef>();
} else if (currentDb.value) {
newTr.transaction->setOption(option.first, option.second.castTo<StringRef>());
}
}
// Setting a timeout can immediately cause a transaction to fail. The only timeout
// that matters is the one most recently set, so we ignore any earlier set timeouts
// that might inadvertently fail the transaction.
if (timeout.present()) {
// Setting a timeout can immediately cause a transaction to fail. The only timeout
// that matters is the one most recently set, so we ignore any earlier set timeouts
// that might inadvertently fail the transaction.
if (timeout.present()) {
setTimeout(timeout);
if (currentDb.value) {
newTr.transaction->setOption(FDBTransactionOptions::TIMEOUT, timeout);
}
}
@ -670,19 +673,19 @@ void MultiVersionTransaction::setVersion(Version v) {
}
ThreadFuture<Version> MultiVersionTransaction::getReadVersion() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture<Version>(Never());
auto f = tr.transaction ? tr.transaction->getReadVersion() : makeTimeout<Version>();
return abortableFuture(f, tr.onChange);
}
ThreadFuture<Optional<Value>> MultiVersionTransaction::get(const KeyRef& key, bool snapshot) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->get(key, snapshot) : ThreadFuture<Optional<Value>>(Never());
auto f = tr.transaction ? tr.transaction->get(key, snapshot) : makeTimeout<Optional<Value>>();
return abortableFuture(f, tr.onChange);
}
ThreadFuture<Key> MultiVersionTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : ThreadFuture<Key>(Never());
auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : makeTimeout<Key>();
return abortableFuture(f, tr.onChange);
}
@ -692,8 +695,8 @@ ThreadFuture<RangeResult> MultiVersionTransaction::getRange(const KeySelectorRef
bool snapshot,
bool reverse) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse)
: ThreadFuture<RangeResult>(Never());
auto f =
tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse) : makeTimeout<RangeResult>();
return abortableFuture(f, tr.onChange);
}
@ -703,8 +706,8 @@ ThreadFuture<RangeResult> MultiVersionTransaction::getRange(const KeySelectorRef
bool snapshot,
bool reverse) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse)
: ThreadFuture<RangeResult>(Never());
auto f =
tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse) : makeTimeout<RangeResult>();
return abortableFuture(f, tr.onChange);
}
@ -713,8 +716,7 @@ ThreadFuture<RangeResult> MultiVersionTransaction::getRange(const KeyRangeRef& k
bool snapshot,
bool reverse) {
auto tr = getTransaction();
auto f =
tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : ThreadFuture<RangeResult>(Never());
auto f = tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : makeTimeout<RangeResult>();
return abortableFuture(f, tr.onChange);
}
@ -723,21 +725,20 @@ ThreadFuture<RangeResult> MultiVersionTransaction::getRange(const KeyRangeRef& k
bool snapshot,
bool reverse) {
auto tr = getTransaction();
auto f =
tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : ThreadFuture<RangeResult>(Never());
auto f = tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : makeTimeout<RangeResult>();
return abortableFuture(f, tr.onChange);
}
ThreadFuture<Standalone<StringRef>> MultiVersionTransaction::getVersionstamp() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getVersionstamp() : ThreadFuture<Standalone<StringRef>>(Never());
auto f = tr.transaction ? tr.transaction->getVersionstamp() : makeTimeout<Standalone<StringRef>>();
return abortableFuture(f, tr.onChange);
}
ThreadFuture<Standalone<VectorRef<const char*>>> MultiVersionTransaction::getAddressesForKey(const KeyRef& key) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getAddressesForKey(key)
: ThreadFuture<Standalone<VectorRef<const char*>>>(Never());
auto f =
tr.transaction ? tr.transaction->getAddressesForKey(key) : makeTimeout<Standalone<VectorRef<const char*>>>();
return abortableFuture(f, tr.onChange);
}
@ -750,7 +751,7 @@ void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) {
ThreadFuture<int64_t> MultiVersionTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : ThreadFuture<int64_t>(Never());
auto f = tr.transaction ? tr.transaction->getEstimatedRangeSizeBytes(keys) : makeTimeout<int64_t>();
return abortableFuture(f, tr.onChange);
}
@ -758,7 +759,7 @@ ThreadFuture<Standalone<VectorRef<KeyRef>>> MultiVersionTransaction::getRangeSpl
int64_t chunkSize) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getRangeSplitPoints(range, chunkSize)
: ThreadFuture<Standalone<VectorRef<KeyRef>>>(Never());
: makeTimeout<Standalone<VectorRef<KeyRef>>>();
return abortableFuture(f, tr.onChange);
}
@ -799,7 +800,7 @@ void MultiVersionTransaction::clear(const KeyRef& key) {
ThreadFuture<Void> MultiVersionTransaction::watch(const KeyRef& key) {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->watch(key) : ThreadFuture<Void>(Never());
auto f = tr.transaction ? tr.transaction->watch(key) : makeTimeout<Void>();
return abortableFuture(f, tr.onChange);
}
@ -812,7 +813,7 @@ void MultiVersionTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
ThreadFuture<Void> MultiVersionTransaction::commit() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->commit() : ThreadFuture<Void>(Never());
auto f = tr.transaction ? tr.transaction->commit() : makeTimeout<Void>();
return abortableFuture(f, tr.onChange);
}
@ -827,7 +828,7 @@ Version MultiVersionTransaction::getCommittedVersion() {
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());
auto f = tr.transaction ? tr.transaction->getApproximateSize() : makeTimeout<int64_t>();
return abortableFuture(f, tr.onChange);
}
@ -841,6 +842,11 @@ void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Op
if (MultiVersionApi::apiVersionAtLeast(610) && itr->second.persistent) {
persistentOptions.emplace_back(option, value.castTo<Standalone<StringRef>>());
}
if (itr->first == FDBTransactionOptions::TIMEOUT) {
setTimeout(value);
}
auto tr = getTransaction();
if (tr.transaction) {
tr.transaction->setOption(option, value);
@ -853,7 +859,7 @@ ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
return ThreadFuture<Void>(Void());
} else {
auto tr = getTransaction();
auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture<Void>(Never());
auto f = tr.transaction ? tr.transaction->onError(e) : makeTimeout<Void>();
f = abortableFuture(f, tr.onChange);
return flatMapThreadFuture<Void, Void>(f, [this, e](ErrorOr<Void> ready) {
@ -871,6 +877,46 @@ ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
}
}
ACTOR Future<Void> timeoutImpl(Reference<ThreadSingleAssignmentVarBase> tsav, double duration) {
wait(delay(duration));
if (!tsav->isReady()) {
tsav->sendError(transaction_timed_out());
}
return Void();
}
void MultiVersionTransaction::setTimeout(Optional<StringRef> value) {
double timeoutDuration = extractIntOption(value, 0, std::numeric_limits<int>::max()) / 1000.0;
ThreadFuture<Void> prevTimeout;
{
ThreadSpinLockHolder holder(timeoutLock);
prevTimeout = currentTimeout;
currentTimeout = onMainThread([this, timeoutDuration]() {
return timeoutImpl(Reference<ThreadSingleAssignmentVarBase>::addRef(timeoutFuture.getPtr()),
timeoutDuration - std::max(0.0, now() - startTime));
});
}
if (prevTimeout.isValid()) {
prevTimeout.cancel();
}
}
template <class T>
ThreadFuture<T> MultiVersionTransaction::makeTimeout() {
return mapThreadFuture<Void, T>(timeoutFuture, [](ErrorOr<Void> v) {
ASSERT(v.isError());
if (v.getError().code() == error_code_transaction_timed_out) {
return ErrorOr<T>(v.getError());
} else {
return ErrorOr<T>(transaction_cancelled());
}
});
}
void MultiVersionTransaction::reset() {
persistentOptions.clear();
setDefaultOptions(db->dbState->transactionDefaultOptions);

View File

@ -400,6 +400,15 @@ private:
ThreadFuture<Void> onChange;
};
double startTime;
ThreadSpinLock timeoutLock;
ThreadFuture<Void> timeoutFuture;
ThreadFuture<Void> currentTimeout;
void setTimeout(Optional<StringRef> value);
template <class T>
ThreadFuture<T> makeTimeout();
TransactionInfo transaction;
TransactionInfo getTransaction();