diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 42c857f835..ab4bf0fd3d 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -1920,11 +1920,11 @@ ACTOR Future submitBackup(Database db, if (dryRun) { state KeyBackedTag tag = makeBackupTag(tagName); - Optional uidFlag = wait(tag.get(db)); + Optional uidFlag = wait(tag.get(db.getReference())); if (uidFlag.present()) { BackupConfig config(uidFlag.get().first); - EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db)); + EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db.getReference())); // Throw error if a backup is currently running until we support parallel backups if (BackupAgentBase::isRunnable(backupStatus)) { @@ -2883,7 +2883,7 @@ ACTOR Future modifyBackup(Database db, std::string tagName, BackupModifyOp tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); - state Optional uidFlag = wait(tag.get(db)); + state Optional uidFlag = wait(tag.get(db.getReference())); if (!uidFlag.present()) { fprintf(stderr, "No backup exists on tag '%s'\n", tagName.c_str()); diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index b032c62db4..a2a7b07a60 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -1221,7 +1221,7 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase { // Don't need to check keepRunning(task) here because we will do that while finishing each output file, but if // bc is false then clearly the backup is no longer in progress - state Reference bc = wait(backup.backupContainer().getD(cx)); + state Reference bc = wait(backup.backupContainer().getD(cx.getReference())); if (!bc) { return Void(); } @@ -3617,8 +3617,8 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each // of which is 0 or more blocks). state int taskBatchSize = BUGGIFY ? 1 : CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE; - state RestoreConfig::FileSetT::Values files = - wait(restore.fileSet().getRange(tr, { beginVersion, beginFile }, {}, taskBatchSize)); + state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange( + tr, Optional({ beginVersion, beginFile }), {}, taskBatchSize)); // allPartsDone will be set once all block tasks in the current batch are finished. state Reference allPartsDone; @@ -5497,7 +5497,7 @@ public: } } - Reference bc = wait(backupConfig.backupContainer().getOrThrow(cx)); + Reference bc = wait(backupConfig.backupContainer().getOrThrow(cx.getReference())); if (fastRestore) { TraceEvent("AtomicParallelRestoreStartRestore").log(); diff --git a/fdbclient/include/fdbclient/FDBTypes.h b/fdbclient/include/fdbclient/FDBTypes.h index 373e28cd4e..9ba2f66f53 100644 --- a/fdbclient/include/fdbclient/FDBTypes.h +++ b/fdbclient/include/fdbclient/FDBTypes.h @@ -1509,4 +1509,16 @@ struct StorageWiggleValue { serializer(ar, id); } }; + +// Can be used to identify types (e.g. IDatabase) that can be used to create transactions with a `createTransaction` +// function +template +struct transaction_creator_traits : std::false_type {}; + +template +struct transaction_creator_traits> : std::true_type {}; + +template +constexpr bool is_transaction_creator = transaction_creator_traits::value; + #endif diff --git a/fdbclient/include/fdbclient/KeyBackedTypes.h b/fdbclient/include/fdbclient/KeyBackedTypes.h index 3733380ec2..6d564ab168 100644 --- a/fdbclient/include/fdbclient/KeyBackedTypes.h +++ b/fdbclient/include/fdbclient/KeyBackedTypes.h @@ -23,8 +23,10 @@ #include #include +#include "fdbclient/GenericTransactionHelper.h" #include "fdbclient/IClientApi.h" #include "fdbclient/ReadYourWrites.h" +#include "fdbclient/DatabaseContext.h" #include "fdbclient/Subspace.h" #include "flow/ObjectSerializer.h" #include "flow/genericactors.actor.h" @@ -153,23 +155,32 @@ template class KeyBackedProperty { public: KeyBackedProperty(KeyRef key) : key(key) {} - Future> get(Reference tr, Snapshot snapshot = Snapshot::False) const { - return map(tr->get(key, snapshot), [](Optional const& val) -> Optional { - if (val.present()) - return Codec::unpack(Tuple::unpack(val.get())); - return {}; - }); + + template + typename std::enable_if, Future>>::type get( + Transaction tr, + Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = tr->get(key, snapshot); + + return holdWhile(getFuture, + map(safeThreadFutureToFuture(getFuture), [](Optional const& val) -> Optional { + if (val.present()) + return Codec::unpack(Tuple::unpack(val.get())); + return {}; + })); } + // Get property's value or defaultValue if it doesn't exist - Future getD(Reference tr, - Snapshot snapshot = Snapshot::False, - T defaultValue = T()) const { + template + typename std::enable_if, Future>::type + getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { return map(get(tr, snapshot), [=](Optional val) -> T { return val.present() ? val.get() : defaultValue; }); } + // Get property's value or throw error if it doesn't exist - Future getOrThrow(Reference tr, - Snapshot snapshot = Snapshot::False, - Error err = key_not_found()) const { + template + typename std::enable_if, Future>::type + getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { return map(get(tr, snapshot), [=](Optional val) -> T { if (!val.present()) { throw err; @@ -179,8 +190,11 @@ public: }); } - Future> get(Database cx, Snapshot snapshot = Snapshot::False) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>>::type get( + Reference db, + Snapshot snapshot = Snapshot::False) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -188,8 +202,11 @@ public: }); } - Future getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type getD(Reference db, + Snapshot snapshot = Snapshot::False, + T defaultValue = T()) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -197,8 +214,11 @@ public: }); } - Future getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type getOrThrow(Reference db, + Snapshot snapshot = Snapshot::False, + Error err = key_not_found()) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -206,10 +226,14 @@ public: }); } - void set(Reference tr, T const& val) { return tr->set(key, Codec::pack(val).pack()); } + template + typename std::enable_if, void>::type set(Transaction tr, T const& val) { + return tr->set(key, Codec::pack(val).pack()); + } - Future set(Database cx, T const& val) { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type set(Reference db, T const& val) { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); self->set(tr, val); @@ -217,7 +241,11 @@ public: }); } - void clear(Reference tr) { return tr->clear(key); } + template + typename std::enable_if, void>::type clear(Transaction tr) { + return tr->clear(key); + } + Key key; }; @@ -228,27 +256,40 @@ template class KeyBackedBinaryValue { public: KeyBackedBinaryValue(KeyRef key) : key(key) {} - Future> get(Reference tr, Snapshot snapshot = Snapshot::False) const { - return map(tr->get(key, snapshot), [](Optional const& val) -> Optional { - if (val.present()) - return BinaryReader::fromStringRef(val.get(), Unversioned()); - return {}; - }); + + template + Future> get(Transaction tr, Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = tr->get(key, snapshot); + + return holdWhile(getFuture, + map(safeThreadFutureToFuture(getFuture), [](Optional const& val) -> Optional { + if (val.present()) + return BinaryReader::fromStringRef(val.get(), Unversioned()); + return {}; + })); } // Get property's value or defaultValue if it doesn't exist - Future getD(Reference tr, - Snapshot snapshot = Snapshot::False, - T defaultValue = T()) const { + template + Future getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { return map(get(tr, Snapshot::False), [=](Optional val) -> T { return val.present() ? val.get() : defaultValue; }); } - void set(Reference tr, T const& val) { + + template + void set(Transaction tr, T const& val) { return tr->set(key, BinaryWriter::toValue(val, Unversioned())); } - void atomicOp(Reference tr, T const& val, MutationRef::Type type) { + + template + void atomicOp(Transaction tr, T const& val, MutationRef::Type type) { return tr->atomicOp(key, BinaryWriter::toValue(val, Unversioned()), type); } - void clear(Reference tr) { return tr->clear(key); } + + template + void clear(Transaction tr) { + return tr->clear(key); + } + Key key; }; @@ -265,63 +306,72 @@ public: typedef std::vector PairsType; // If end is not present one key past the end of the map is used. - Future getRange(Reference tr, - KeyType const& begin, + template + Future getRange(Transaction tr, + Optional const& begin, Optional const& end, int limit, Snapshot snapshot = Snapshot::False, Reverse reverse = Reverse::False) const { Subspace s = space; // 'this' could be invalid inside lambda - Key endKey = end.present() ? s.pack(Codec::pack(end.get())) : space.range().end; - return map( - tr->getRange( - KeyRangeRef(s.pack(Codec::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse), - [s](RangeResult const& kvs) -> PairsType { - PairsType results; - for (int i = 0; i < kvs.size(); ++i) { - KeyType key = Codec::unpack(s.unpack(kvs[i].key)); - ValueType val = Codec::unpack(Tuple::unpack(kvs[i].value)); - results.push_back(PairType(key, val)); - } - return results; - }); + + Key beginKey = begin.present() ? s.pack(Codec::pack(begin.get())) : s.range().begin; + Key endKey = end.present() ? s.pack(Codec::pack(end.get())) : s.range().end; + + typename transaction_future_type::type getRangeFuture = + tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse); + + return holdWhile(getRangeFuture, + map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> PairsType { + PairsType results; + for (int i = 0; i < kvs.size(); ++i) { + KeyType key = Codec::unpack(s.unpack(kvs[i].key)); + ValueType val = Codec::unpack(Tuple::unpack(kvs[i].value)); + results.push_back(PairType(key, val)); + } + return results; + })); } - Future> get(Reference tr, - KeyType const& key, - Snapshot snapshot = Snapshot::False) const { - return map(tr->get(space.pack(Codec::pack(key)), snapshot), - [](Optional const& val) -> Optional { - if (val.present()) - return Codec::unpack(Tuple::unpack(val.get())); - return {}; - }); + template + Future> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = + tr->get(space.pack(Codec::pack(key)), snapshot); + + return holdWhile( + getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional const& val) -> Optional { + if (val.present()) + return Codec::unpack(Tuple::unpack(val.get())); + return {}; + })); } // Returns a Property that can be get/set that represents key's entry in this this. KeyBackedProperty getProperty(KeyType const& key) const { return space.pack(Codec::pack(key)); } // Returns the expectedSize of the set key - int set(Reference tr, KeyType const& key, ValueType const& val) { + template + int set(Transaction tr, KeyType const& key, ValueType const& val) { Key k = space.pack(Codec::pack(key)); Value v = Codec::pack(val).pack(); tr->set(k, v); return k.expectedSize() + v.expectedSize(); } - void erase(Reference tr, KeyType const& key) { + template + void erase(Transaction tr, KeyType const& key) { return tr->clear(space.pack(Codec::pack(key))); } - void erase(Reference tr, KeyType const& key) { - return tr->clear(space.pack(Codec::pack(key))); - } - - void erase(Reference tr, KeyType const& begin, KeyType const& end) { + template + void erase(Transaction tr, KeyType const& begin, KeyType const& end) { return tr->clear(KeyRangeRef(space.pack(Codec::pack(begin)), space.pack(Codec::pack(end)))); } - void clear(Reference tr) { return tr->clear(space.range()); } + template + void clear(Transaction tr) { + return tr->clear(space.range()); + } Subspace space; }; @@ -332,25 +382,32 @@ template class KeyBackedObjectProperty { public: KeyBackedObjectProperty(KeyRef key, VersionOptions versionOptions) : key(key), versionOptions(versionOptions) {} - Future> get(Reference tr, Snapshot snapshot = Snapshot::False) const { - return map(tr->get(key, snapshot), [vo = versionOptions](Optional const& val) -> Optional { - if (val.present()) - return ObjectReader::fromStringRef(val.get(), vo); - return {}; - }); + template + typename std::enable_if, Future>>::type get( + Transaction tr, + Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = tr->get(key, snapshot); + + return holdWhile( + getFuture, + map(safeThreadFutureToFuture(getFuture), [vo = versionOptions](Optional const& val) -> Optional { + if (val.present()) + return ObjectReader::fromStringRef(val.get(), vo); + return {}; + })); } // Get property's value or defaultValue if it doesn't exist - Future getD(Reference tr, - Snapshot snapshot = Snapshot::False, - T defaultValue = T()) const { + template + typename std::enable_if, Future>::type + getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { return map(get(tr, snapshot), [=](Optional val) -> T { return val.present() ? val.get() : defaultValue; }); } // Get property's value or throw error if it doesn't exist - Future getOrThrow(Reference tr, - Snapshot snapshot = Snapshot::False, - Error err = key_not_found()) const { + template + typename std::enable_if, Future>::type + getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { return map(get(tr, snapshot), [=](Optional val) -> T { if (!val.present()) { throw err; @@ -360,8 +417,11 @@ public: }); } - Future> get(Database cx, Snapshot snapshot = Snapshot::False) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>>::type get( + Reference db, + Snapshot snapshot = Snapshot::False) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -369,8 +429,11 @@ public: }); } - Future getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type getD(Reference db, + Snapshot snapshot = Snapshot::False, + T defaultValue = T()) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -378,8 +441,11 @@ public: }); } - Future getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type getOrThrow(Reference db, + Snapshot snapshot = Snapshot::False, + Error err = key_not_found()) const { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -387,12 +453,14 @@ public: }); } - void set(Reference tr, T const& val) { + template + typename std::enable_if, void>::type set(Transaction tr, T const& val) { return tr->set(key, ObjectWriter::toValue(val, versionOptions)); } - Future set(Database cx, T const& val) { - return runRYWTransaction(cx, [=, self = *this](Reference tr) { + template + typename std::enable_if, Future>::type set(Reference db, T const& val) { + return runTransaction(db, [=, self = *this](Reference tr) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); self.set(tr, val); @@ -400,7 +468,10 @@ public: }); } - void clear(Reference tr) { return tr->clear(key); } + template + typename std::enable_if, void>::type clear(Transaction tr) { + return tr->clear(key); + } Key key; VersionOptions versionOptions; @@ -419,18 +490,22 @@ public: typedef std::pair PairType; typedef std::vector PairsType; - // If end is not present one key past the end of the map is used. - Future getRange(Reference tr, - KeyType const& begin, + template + Future getRange(Transaction tr, + Optional const& begin, Optional const& end, int limit, Snapshot snapshot = Snapshot::False, Reverse reverse = Reverse::False) const { + Key beginKey = begin.present() ? space.pack(Codec::pack(begin.get())) : space.range().begin; Key endKey = end.present() ? space.pack(Codec::pack(end.get())) : space.range().end; - return map( - tr->getRange( - KeyRangeRef(space.pack(Codec::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse), - [self = *this](RangeResult const& kvs) -> PairsType { + + typename transaction_future_type::type getRangeFuture = + tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse); + + return holdWhile( + getRangeFuture, + map(safeThreadFutureToFuture(getRangeFuture), [self = *this](RangeResult const& kvs) -> PairsType { PairsType results; for (int i = 0; i < kvs.size(); ++i) { KeyType key = Codec::unpack(self.space.unpack(kvs[i].key)); @@ -438,18 +513,21 @@ public: results.push_back(PairType(key, val)); } return results; - }); + })); } - Future> get(Reference tr, - KeyType const& key, - Snapshot snapshot = Snapshot::False) const { - return map(tr->get(space.pack(Codec::pack(key)), snapshot), - [vo = versionOptions](Optional const& val) -> Optional { - if (val.present()) - return ObjectReader::fromStringRef(val.get(), vo); - return {}; - }); + template + Future> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = + tr->get(space.pack(Codec::pack(key)), snapshot); + + return holdWhile(getFuture, + map(safeThreadFutureToFuture(getFuture), + [vo = versionOptions](Optional const& val) -> Optional { + if (val.present()) + return ObjectReader::fromStringRef(val.get(), vo); + return {}; + })); } // Returns a Property that can be get/set that represents key's entry in this this. @@ -459,7 +537,8 @@ public: } // Returns the expectedSize of the set key - int set(Reference tr, KeyType const& key, ValueType const& val) { + template + int set(Transaction tr, KeyType const& key, ValueType const& val) { Key k = space.pack(Codec::pack(key)); Value v = ObjectWriter::toValue(val, versionOptions); tr->set(k, v); @@ -470,19 +549,20 @@ public: Value serializeValue(ValueType const& val) { return ObjectWriter::toValue(val, versionOptions); } - void erase(Reference tr, KeyType const& key) { + template + void erase(Transaction tr, KeyType const& key) { return tr->clear(space.pack(Codec::pack(key))); } - void erase(Reference tr, KeyType const& key) { - return tr->clear(space.pack(Codec::pack(key))); - } - - void erase(Reference tr, KeyType const& begin, KeyType const& end) { + template + void erase(Transaction tr, KeyType const& begin, KeyType const& end) { return tr->clear(KeyRangeRef(space.pack(Codec::pack(begin)), space.pack(Codec::pack(end)))); } - void clear(Reference tr) { return tr->clear(space.range()); } + template + void clear(Transaction tr) { + return tr->clear(space.range()); + } Subspace space; VersionOptions versionOptions; @@ -496,49 +576,63 @@ public: typedef _ValueType ValueType; typedef std::vector Values; - // If end is not present one key past the end of the map is used. - Future getRange(Reference tr, - ValueType const& begin, + template + Future getRange(Transaction tr, + Optional const& begin, Optional const& end, int limit, - Snapshot snapshot = Snapshot::False) const { + Snapshot snapshot = Snapshot::False, + Reverse reverse = Reverse::False) const { Subspace s = space; // 'this' could be invalid inside lambda + Key beginKey = begin.present() ? s.pack(Codec::pack(begin.get())) : space.range().begin; Key endKey = end.present() ? s.pack(Codec::pack(end.get())) : space.range().end; - return map( - tr->getRange(KeyRangeRef(s.pack(Codec::pack(begin)), endKey), GetRangeLimits(limit), snapshot), - [s](RangeResult const& kvs) -> Values { - Values results; - for (int i = 0; i < kvs.size(); ++i) { - results.push_back(Codec::unpack(s.unpack(kvs[i].key))); - } - return results; - }); + + typename transaction_future_type::type getRangeFuture = + tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse); + + return holdWhile(getRangeFuture, + map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> Values { + Values results; + for (int i = 0; i < kvs.size(); ++i) { + results.push_back(Codec::unpack(s.unpack(kvs[i].key))); + } + return results; + })); } - Future exists(Reference tr, - ValueType const& val, - Snapshot snapshot = Snapshot::False) const { - return map(tr->get(space.pack(Codec::pack(val)), snapshot), - [](Optional const& val) -> bool { return val.present(); }); + template + Future exists(Transaction tr, ValueType const& val, Snapshot snapshot = Snapshot::False) const { + typename transaction_future_type>::type getFuture = + tr->get(space.pack(Codec::pack(val)), snapshot); + + return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional const& val) -> bool { + return val.present(); + })); } // Returns the expectedSize of the set key - int insert(Reference tr, ValueType const& val) { + template + int insert(Transaction tr, ValueType const& val) { Key k = space.pack(Codec::pack(val)); tr->set(k, StringRef()); return k.expectedSize(); } - void erase(Reference tr, ValueType const& val) { + template + void erase(Transaction tr, ValueType const& val) { return tr->clear(space.pack(Codec::pack(val))); } - void erase(Reference tr, ValueType const& begin, ValueType const& end) { + template + void erase(Transaction tr, ValueType const& begin, ValueType const& end) { return tr->clear( KeyRangeRef(space.pack(Codec::pack(begin)), space.pack(Codec::pack(end)))); } - void clear(Reference tr) { return tr->clear(space.range()); } + template + void clear(Transaction tr) { + return tr->clear(space.range()); + } Subspace space; }; diff --git a/fdbclient/include/fdbclient/RunTransaction.actor.h b/fdbclient/include/fdbclient/RunTransaction.actor.h index 91530a02cd..ee2392eb6d 100644 --- a/fdbclient/include/fdbclient/RunTransaction.actor.h +++ b/fdbclient/include/fdbclient/RunTransaction.actor.h @@ -52,6 +52,24 @@ Future()(Reference()) } } +ACTOR template +Future()(Reference()).getValue())> runTransaction( + Reference db, + Function func) { + state Reference tr = db->createTransaction(); + loop { + try { + // func should be idempodent; otherwise, retry will get undefined result + state decltype(std::declval()(Reference()).getValue()) result = + wait(func(tr)); + wait(safeThreadFutureToFuture(tr->commit())); + return result; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } +} + ACTOR template Future()(Reference()).getValue())> runRYWTransactionFailIfLocked(Database cx, Function func) { diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 2e8508225e..91c9962577 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -125,8 +125,8 @@ struct BackupData { PerBackupInfo(BackupData* data, UID uid, Version v) : self(data), startVersion(v) { // Open the container and get key ranges BackupConfig config(uid); - container = config.backupContainer().get(data->cx); - ranges = config.backupRanges().get(data->cx); + container = config.backupContainer().get(data->cx.getReference()); + ranges = config.backupRanges().get(data->cx.getReference()); if (self->backupEpoch == self->recruitedEpoch) { // Only current epoch's worker update the number of backup workers. updateWorker = _updateStartedWorkers(this, data, uid); diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 650ca6f2c6..c1bcd5c5d0 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -466,11 +466,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { .detail("AbortAndRestartAfter", self->abortAndRestartAfter); state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); - UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx)); + UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference())); state UID logUid = uidFlag.first; - state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx)); + state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference())); state Reference lastBackupContainer = - wait(BackupConfig(logUid).backupContainer().getD(cx)); + wait(BackupConfig(logUid).backupContainer().getD(cx.getReference())); // Occasionally start yet another backup that might still be running when we restore if (!self->locked && BUGGIFY) { diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index 4c82762764..595eca5470 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -513,11 +513,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { .detail("AbortAndRestartAfter", self->abortAndRestartAfter); state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); - UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx)); + UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference())); state UID logUid = uidFlag.first; - state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx)); + state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference())); state Reference lastBackupContainer = - wait(BackupConfig(logUid).backupContainer().getD(cx)); + wait(BackupConfig(logUid).backupContainer().getD(cx.getReference())); // Occasionally start yet another backup that might still be running when we restore if (!self->locked && BUGGIFY) {