Make key backed types generic

This commit is contained in:
A.J. Beamon 2022-06-27 15:34:47 -07:00
parent eccf244ba6
commit 7ec1590452
8 changed files with 229 additions and 127 deletions

View File

@ -1920,11 +1920,11 @@ ACTOR Future<Void> submitBackup(Database db,
if (dryRun) { if (dryRun) {
state KeyBackedTag tag = makeBackupTag(tagName); state KeyBackedTag tag = makeBackupTag(tagName);
Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db)); Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db.getReference()));
if (uidFlag.present()) { if (uidFlag.present()) {
BackupConfig config(uidFlag.get().first); BackupConfig config(uidFlag.get().first);
EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db)); EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db.getReference()));
// Throw error if a backup is currently running until we support parallel backups // Throw error if a backup is currently running until we support parallel backups
if (BackupAgentBase::isRunnable(backupStatus)) { if (BackupAgentBase::isRunnable(backupStatus)) {
@ -2883,7 +2883,7 @@ ACTOR Future<Void> modifyBackup(Database db, std::string tagName, BackupModifyOp
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db)); state Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db.getReference()));
if (!uidFlag.present()) { if (!uidFlag.present()) {
fprintf(stderr, "No backup exists on tag '%s'\n", tagName.c_str()); fprintf(stderr, "No backup exists on tag '%s'\n", tagName.c_str());

View File

@ -1514,4 +1514,16 @@ struct StorageWiggleValue {
serializer(ar, id); serializer(ar, id);
} }
}; };
// Can be used to identify types (e.g. IDatabase) that can be used to create transactions with a `createTransaction`
// function
template <typename, typename = void>
struct transaction_creator_traits : std::false_type {};
template <typename T>
struct transaction_creator_traits<T, std::void_t<typename T::TransactionT>> : std::true_type {};
template <typename T>
constexpr bool is_transaction_creator = transaction_creator_traits<T>::value;
#endif #endif

View File

@ -1221,7 +1221,7 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
// Don't need to check keepRunning(task) here because we will do that while finishing each output file, but if // Don't need to check keepRunning(task) here because we will do that while finishing each output file, but if
// bc is false then clearly the backup is no longer in progress // bc is false then clearly the backup is no longer in progress
state Reference<IBackupContainer> bc = wait(backup.backupContainer().getD(cx)); state Reference<IBackupContainer> bc = wait(backup.backupContainer().getD(cx.getReference()));
if (!bc) { if (!bc) {
return Void(); return Void();
} }
@ -5497,7 +5497,7 @@ public:
} }
} }
Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx)); Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx.getReference()));
if (fastRestore) { if (fastRestore) {
TraceEvent("AtomicParallelRestoreStartRestore").log(); TraceEvent("AtomicParallelRestoreStartRestore").log();

View File

@ -26,6 +26,7 @@
#include "fdbclient/GenericTransactionHelper.h" #include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/IClientApi.h" #include "fdbclient/IClientApi.h"
#include "fdbclient/ReadYourWrites.h" #include "fdbclient/ReadYourWrites.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/Subspace.h" #include "fdbclient/Subspace.h"
#include "flow/ObjectSerializer.h" #include "flow/ObjectSerializer.h"
#include "flow/genericactors.actor.h" #include "flow/genericactors.actor.h"
@ -154,23 +155,31 @@ template <typename T>
class KeyBackedProperty { class KeyBackedProperty {
public: public:
KeyBackedProperty(KeyRef key) : key(key) {} KeyBackedProperty(KeyRef key) : key(key) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [](Optional<Value> const& val) -> Optional<T> { template <class Transaction>
if (val.present()) typename std::enable_if<!is_transaction_creator<Transaction>, Future<Optional<T>>>::type get(
return Codec<T>::unpack(Tuple::unpack(val.get())); Transaction tr,
return {}; Snapshot snapshot = Snapshot::False) const {
}); typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return Codec<T>::unpack(Tuple::unpack(val.get()));
return {};
}));
} }
// Get property's value or defaultValue if it doesn't exist // Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
Snapshot snapshot = Snapshot::False, typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
T defaultValue = T()) const { getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; }); return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
} }
// Get property's value or throw error if it doesn't exist // Get property's value or throw error if it doesn't exist
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
Snapshot snapshot = Snapshot::False, typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
Error err = key_not_found()) const { getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return map(get(tr, snapshot), [=](Optional<T> val) -> T {
if (!val.present()) { if (!val.present()) {
throw err; throw err;
@ -180,8 +189,11 @@ public:
}); });
} }
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<Optional<T>>>::type get(
Reference<DB> db,
Snapshot snapshot = Snapshot::False) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -189,8 +201,11 @@ public:
}); });
} }
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getD(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -198,8 +213,11 @@ public:
}); });
} }
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getOrThrow(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -207,10 +225,14 @@ public:
}); });
} }
void set(Reference<ReadYourWritesTransaction> tr, T const& val) { return tr->set(key, Codec<T>::pack(val).pack()); } template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type set(Transaction tr, T const& val) {
return tr->set(key, Codec<T>::pack(val).pack());
}
Future<Void> set(Database cx, T const& val) { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<Void>>::type set(Reference<DB> db, T const& val) {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
self->set(tr, val); self->set(tr, val);
@ -218,7 +240,11 @@ public:
}); });
} }
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); } template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type clear(Transaction tr) {
return tr->clear(key);
}
Key key; Key key;
}; };
@ -229,27 +255,39 @@ template <typename T>
class KeyBackedBinaryValue { class KeyBackedBinaryValue {
public: public:
KeyBackedBinaryValue(KeyRef key) : key(key) {} KeyBackedBinaryValue(KeyRef key) : key(key) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [](Optional<Value> const& val) -> Optional<T> { template <class Transaction>
if (val.present()) Future<Optional<T>> get(Transaction tr, Snapshot snapshot = Snapshot::False) const {
return BinaryReader::fromStringRef<T>(val.get(), Unversioned()); typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return {};
}); return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return BinaryReader::fromStringRef<T>(val.get(), Unversioned());
return {};
}));
} }
// Get property's value or defaultValue if it doesn't exist // Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
Snapshot snapshot = Snapshot::False, Future<T> getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
T defaultValue = T()) const {
return map(get(tr, Snapshot::False), return map(get(tr, Snapshot::False),
[=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; }); [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
} }
void set(Reference<ReadYourWritesTransaction> tr, T const& val) {
template <class Transaction>
void set(Transaction tr, T const& val) {
return tr->set(key, BinaryWriter::toValue<T>(val, Unversioned())); return tr->set(key, BinaryWriter::toValue<T>(val, Unversioned()));
} }
void atomicOp(Reference<ReadYourWritesTransaction> tr, T const& val, MutationRef::Type type) {
template <class Transaction>
void atomicOp(Transaction tr, T const& val, MutationRef::Type type) {
return tr->atomicOp(key, BinaryWriter::toValue<T>(val, Unversioned()), type); return tr->atomicOp(key, BinaryWriter::toValue<T>(val, Unversioned()), type);
} }
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(key);
}
Key key; Key key;
}; };
@ -266,63 +304,70 @@ public:
typedef std::vector<PairType> PairsType; typedef std::vector<PairType> PairsType;
// If end is not present one key past the end of the map is used. // If end is not present one key past the end of the map is used.
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
KeyType const& begin, Future<PairsType> getRange(Transaction tr,
Optional<KeyType> const& begin,
Optional<KeyType> const& end, Optional<KeyType> const& end,
int limit, int limit,
Snapshot snapshot = Snapshot::False, Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const { Reverse reverse = Reverse::False) const {
Subspace s = space; // 'this' could be invalid inside lambda Subspace s = space; // 'this' could be invalid inside lambda
Key endKey = end.present() ? s.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
return map( Key beginKey = begin.present() ? s.pack(Codec<KeyType>::pack(begin.get())) : s.range().begin;
tr->getRange( Key endKey = end.present() ? s.pack(Codec<KeyType>::pack(end.get())) : s.range().end;
KeyRangeRef(s.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse),
[s](RangeResult const& kvs) -> PairsType { typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
PairsType results; tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
for (int i = 0; i < kvs.size(); ++i) {
KeyType key = Codec<KeyType>::unpack(s.unpack(kvs[i].key)); return holdWhile(getRangeFuture, map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> PairsType {
ValueType val = Codec<ValueType>::unpack(Tuple::unpack(kvs[i].value)); PairsType results;
results.push_back(PairType(key, val)); for (int i = 0; i < kvs.size(); ++i) {
} KeyType key = Codec<KeyType>::unpack(s.unpack(kvs[i].key));
return results; ValueType val = Codec<ValueType>::unpack(Tuple::unpack(kvs[i].value));
}); results.push_back(PairType(key, val));
}
return results;
}));
} }
Future<Optional<ValueType>> get(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
KeyType const& key, Future<Optional<ValueType>> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const {
Snapshot snapshot = Snapshot::False) const { typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
return map(tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot), tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot);
[](Optional<Value> const& val) -> Optional<ValueType> {
if (val.present()) return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<ValueType> {
return Codec<ValueType>::unpack(Tuple::unpack(val.get())); if (val.present())
return {}; return Codec<ValueType>::unpack(Tuple::unpack(val.get()));
}); return {};
}));
} }
// Returns a Property that can be get/set that represents key's entry in this this. // Returns a Property that can be get/set that represents key's entry in this this.
KeyBackedProperty<ValueType> getProperty(KeyType const& key) const { return space.pack(Codec<KeyType>::pack(key)); } KeyBackedProperty<ValueType> getProperty(KeyType const& key) const { return space.pack(Codec<KeyType>::pack(key)); }
// Returns the expectedSize of the set key // Returns the expectedSize of the set key
int set(Reference<ReadYourWritesTransaction> tr, KeyType const& key, ValueType const& val) { template <class Transaction>
int set(Transaction tr, KeyType const& key, ValueType const& val) {
Key k = space.pack(Codec<KeyType>::pack(key)); Key k = space.pack(Codec<KeyType>::pack(key));
Value v = Codec<ValueType>::pack(val).pack(); Value v = Codec<ValueType>::pack(val).pack();
tr->set(k, v); tr->set(k, v);
return k.expectedSize() + v.expectedSize(); return k.expectedSize() + v.expectedSize();
} }
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) { template <class Transaction>
void erase(Transaction tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key))); return tr->clear(space.pack(Codec<KeyType>::pack(key)));
} }
void erase(Reference<ITransaction> tr, KeyType const& key) { template <class Transaction>
return tr->clear(space.pack(Codec<KeyType>::pack(key))); void erase(Transaction tr, KeyType const& begin, KeyType const& end) {
}
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& begin, KeyType const& end) {
return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end)))); return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end))));
} }
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); } template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space; Subspace space;
}; };
@ -333,25 +378,30 @@ template <typename T, typename VersionOptions>
class KeyBackedObjectProperty { class KeyBackedObjectProperty {
public: public:
KeyBackedObjectProperty(KeyRef key, VersionOptions versionOptions) : key(key), versionOptions(versionOptions) {} KeyBackedObjectProperty(KeyRef key, VersionOptions versionOptions) : key(key), versionOptions(versionOptions) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [vo = versionOptions](Optional<Value> const& val) -> Optional<T> { template <class Transaction>
if (val.present()) typename std::enable_if<!is_transaction_creator<Transaction>, Future<Optional<T>>>::type get(
return ObjectReader::fromStringRef<T>(val.get(), vo); Transaction tr,
return {}; Snapshot snapshot = Snapshot::False) const {
}); typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [vo = versionOptions](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return ObjectReader::fromStringRef<T>(val.get(), vo);
return {};
}));
} }
// Get property's value or defaultValue if it doesn't exist // Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
Snapshot snapshot = Snapshot::False, typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
T defaultValue = T()) const { getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; }); return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
} }
// Get property's value or throw error if it doesn't exist // Get property's value or throw error if it doesn't exist
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
Snapshot snapshot = Snapshot::False, typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
Error err = key_not_found()) const { getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return map(get(tr, snapshot), [=](Optional<T> val) -> T {
if (!val.present()) { if (!val.present()) {
throw err; throw err;
@ -361,8 +411,11 @@ public:
}); });
} }
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<Optional<T>>>::type get(
Reference<DB> db,
Snapshot snapshot = Snapshot::False) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -370,8 +423,11 @@ public:
}); });
} }
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getD(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -379,8 +435,11 @@ public:
}); });
} }
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getOrThrow(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -388,12 +447,14 @@ public:
}); });
} }
void set(Reference<ReadYourWritesTransaction> tr, T const& val) { template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type set(Transaction tr, T const& val) {
return tr->set(key, ObjectWriter::toValue(val, versionOptions)); return tr->set(key, ObjectWriter::toValue(val, versionOptions));
} }
Future<Void> set(Database cx, T const& val) { template <class DB>
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) { typename std::enable_if<is_transaction_creator<DB>, Future<Void>>::type set(Reference<DB> db, T const& val) {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
self.set(tr, val); self.set(tr, val);
@ -401,7 +462,10 @@ public:
}); });
} }
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); } template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type clear(Transaction tr) {
return tr->clear(key);
}
Key key; Key key;
VersionOptions versionOptions; VersionOptions versionOptions;
@ -420,18 +484,22 @@ public:
typedef std::pair<KeyType, ValueType> PairType; typedef std::pair<KeyType, ValueType> PairType;
typedef std::vector<PairType> PairsType; typedef std::vector<PairType> PairsType;
// If end is not present one key past the end of the map is used. template <class Transaction>
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr, Future<PairsType> getRange(Transaction tr,
KeyType const& begin, Optional<KeyType> const& begin,
Optional<KeyType> const& end, Optional<KeyType> const& end,
int limit, int limit,
Snapshot snapshot = Snapshot::False, Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const { Reverse reverse = Reverse::False) const {
Key beginKey = begin.present() ? space.pack(Codec<KeyType>::pack(begin.get())) : space.range().begin;
Key endKey = end.present() ? space.pack(Codec<KeyType>::pack(end.get())) : space.range().end; Key endKey = end.present() ? space.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
return map(
tr->getRange( typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse), tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
[self = *this](RangeResult const& kvs) -> PairsType {
return holdWhile(
getRangeFuture,
map(safeThreadFutureToFuture(getRangeFuture), [self = *this](RangeResult const& kvs) -> PairsType {
PairsType results; PairsType results;
for (int i = 0; i < kvs.size(); ++i) { for (int i = 0; i < kvs.size(); ++i) {
KeyType key = Codec<KeyType>::unpack(self.space.unpack(kvs[i].key)); KeyType key = Codec<KeyType>::unpack(self.space.unpack(kvs[i].key));
@ -439,18 +507,21 @@ public:
results.push_back(PairType(key, val)); results.push_back(PairType(key, val));
} }
return results; return results;
}); }));
} }
Future<Optional<ValueType>> get(Reference<ReadYourWritesTransaction> tr, template <class Transaction>
KeyType const& key, Future<Optional<ValueType>> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const {
Snapshot snapshot = Snapshot::False) const { typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
return map(tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot), tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot);
[vo = versionOptions](Optional<Value> const& val) -> Optional<ValueType> {
if (val.present()) return holdWhile(getFuture,
return ObjectReader::fromStringRef<ValueType>(val.get(), vo); map(safeThreadFutureToFuture(getFuture),
return {}; [vo = versionOptions](Optional<Value> const& val) -> Optional<ValueType> {
}); if (val.present())
return ObjectReader::fromStringRef<ValueType>(val.get(), vo);
return {};
}));
} }
// Returns a Property that can be get/set that represents key's entry in this this. // Returns a Property that can be get/set that represents key's entry in this this.
@ -460,7 +531,8 @@ public:
} }
// Returns the expectedSize of the set key // Returns the expectedSize of the set key
int set(Reference<ReadYourWritesTransaction> tr, KeyType const& key, ValueType const& val) { template <class Transaction>
int set(Transaction tr, KeyType const& key, ValueType const& val) {
Key k = space.pack(Codec<KeyType>::pack(key)); Key k = space.pack(Codec<KeyType>::pack(key));
Value v = ObjectWriter::toValue(val, versionOptions); Value v = ObjectWriter::toValue(val, versionOptions);
tr->set(k, v); tr->set(k, v);
@ -471,19 +543,20 @@ public:
Value serializeValue(ValueType const& val) { return ObjectWriter::toValue(val, versionOptions); } Value serializeValue(ValueType const& val) { return ObjectWriter::toValue(val, versionOptions); }
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) { template <class Transaction>
void erase(Transaction tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key))); return tr->clear(space.pack(Codec<KeyType>::pack(key)));
} }
void erase(Reference<ITransaction> tr, KeyType const& key) { template <class Transaction>
return tr->clear(space.pack(Codec<KeyType>::pack(key))); void erase(Transaction tr, KeyType const& begin, KeyType const& end) {
}
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& begin, KeyType const& end) {
return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end)))); return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end))));
} }
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); } template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space; Subspace space;
VersionOptions versionOptions; VersionOptions versionOptions;
@ -497,7 +570,6 @@ public:
typedef _ValueType ValueType; typedef _ValueType ValueType;
typedef std::vector<ValueType> Values; typedef std::vector<ValueType> Values;
// If end is not present one key past the end of the map is used.
template <class Transaction> template <class Transaction>
Future<Values> getRange(Transaction tr, Future<Values> getRange(Transaction tr,
Optional<ValueType> const& begin, Optional<ValueType> const& begin,

View File

@ -52,6 +52,24 @@ Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>())
} }
} }
ACTOR template <class Function, class DB>
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())> runTransaction(
Reference<DB> db,
Function func) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
// func should be idempodent; otherwise, retry will get undefined result
state decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue()) result =
wait(func(tr));
wait(safeThreadFutureToFuture(tr->commit()));
return result;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Function> ACTOR template <class Function>
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())> Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
runRYWTransactionFailIfLocked(Database cx, Function func) { runRYWTransactionFailIfLocked(Database cx, Function func) {

View File

@ -104,8 +104,8 @@ struct BackupData {
PerBackupInfo(BackupData* data, UID uid, Version v) : self(data), startVersion(v) { PerBackupInfo(BackupData* data, UID uid, Version v) : self(data), startVersion(v) {
// Open the container and get key ranges // Open the container and get key ranges
BackupConfig config(uid); BackupConfig config(uid);
container = config.backupContainer().get(data->cx); container = config.backupContainer().get(data->cx.getReference());
ranges = config.backupRanges().get(data->cx); ranges = config.backupRanges().get(data->cx.getReference());
if (self->backupEpoch == self->recruitedEpoch) { if (self->backupEpoch == self->recruitedEpoch) {
// Only current epoch's worker update the number of backup workers. // Only current epoch's worker update the number of backup workers.
updateWorker = _updateStartedWorkers(this, data, uid); updateWorker = _updateStartedWorkers(this, data, uid);

View File

@ -466,11 +466,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter); .detail("AbortAndRestartAfter", self->abortAndRestartAfter);
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx)); UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference()));
state UID logUid = uidFlag.first; state UID logUid = uidFlag.first;
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx)); state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference()));
state Reference<IBackupContainer> lastBackupContainer = state Reference<IBackupContainer> lastBackupContainer =
wait(BackupConfig(logUid).backupContainer().getD(cx)); wait(BackupConfig(logUid).backupContainer().getD(cx.getReference()));
// Occasionally start yet another backup that might still be running when we restore // Occasionally start yet another backup that might still be running when we restore
if (!self->locked && BUGGIFY) { if (!self->locked && BUGGIFY) {

View File

@ -513,11 +513,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter); .detail("AbortAndRestartAfter", self->abortAndRestartAfter);
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx)); UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference()));
state UID logUid = uidFlag.first; state UID logUid = uidFlag.first;
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx)); state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference()));
state Reference<IBackupContainer> lastBackupContainer = state Reference<IBackupContainer> lastBackupContainer =
wait(BackupConfig(logUid).backupContainer().getD(cx)); wait(BackupConfig(logUid).backupContainer().getD(cx.getReference()));
// Occasionally start yet another backup that might still be running when we restore // Occasionally start yet another backup that might still be running when we restore
if (!self->locked && BUGGIFY) { if (!self->locked && BUGGIFY) {