Add support for different transaction types (Reference<ITransaction>, Reference<ReadYourWritesTransaction>) to key backed types

This commit is contained in:
A.J. Beamon 2022-07-01 10:02:39 -07:00
parent 89a5eeb8fc
commit 1dc78a3b3f
8 changed files with 274 additions and 150 deletions

View File

@ -1920,11 +1920,11 @@ ACTOR Future<Void> submitBackup(Database db,
if (dryRun) {
state KeyBackedTag tag = makeBackupTag(tagName);
Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db));
Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db.getReference()));
if (uidFlag.present()) {
BackupConfig config(uidFlag.get().first);
EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db));
EBackupState backupStatus = wait(config.stateEnum().getOrThrow(db.getReference()));
// Throw error if a backup is currently running until we support parallel backups
if (BackupAgentBase::isRunnable(backupStatus)) {
@ -2883,7 +2883,7 @@ ACTOR Future<Void> modifyBackup(Database db, std::string tagName, BackupModifyOp
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db));
state Optional<UidAndAbortedFlagT> uidFlag = wait(tag.get(db.getReference()));
if (!uidFlag.present()) {
fprintf(stderr, "No backup exists on tag '%s'\n", tagName.c_str());

View File

@ -1221,7 +1221,7 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
// Don't need to check keepRunning(task) here because we will do that while finishing each output file, but if
// bc is false then clearly the backup is no longer in progress
state Reference<IBackupContainer> bc = wait(backup.backupContainer().getD(cx));
state Reference<IBackupContainer> bc = wait(backup.backupContainer().getD(cx.getReference()));
if (!bc) {
return Void();
}
@ -3617,8 +3617,8 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each
// of which is 0 or more blocks).
state int taskBatchSize = BUGGIFY ? 1 : CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE;
state RestoreConfig::FileSetT::Values files =
wait(restore.fileSet().getRange(tr, { beginVersion, beginFile }, {}, taskBatchSize));
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(
tr, Optional<RestoreConfig::RestoreFile>({ beginVersion, beginFile }), {}, taskBatchSize));
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;
@ -5497,7 +5497,7 @@ public:
}
}
Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx));
Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx.getReference()));
if (fastRestore) {
TraceEvent("AtomicParallelRestoreStartRestore").log();

View File

@ -1509,4 +1509,16 @@ struct StorageWiggleValue {
serializer(ar, id);
}
};
// Can be used to identify types (e.g. IDatabase) that can be used to create transactions with a `createTransaction`
// function
template <typename, typename = void>
struct transaction_creator_traits : std::false_type {};
template <typename T>
struct transaction_creator_traits<T, std::void_t<typename T::TransactionT>> : std::true_type {};
template <typename T>
constexpr bool is_transaction_creator = transaction_creator_traits<T>::value;
#endif

View File

@ -23,8 +23,10 @@
#include <utility>
#include <vector>
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/Subspace.h"
#include "flow/ObjectSerializer.h"
#include "flow/genericactors.actor.h"
@ -153,23 +155,32 @@ template <typename T>
class KeyBackedProperty {
public:
KeyBackedProperty(KeyRef key) : key(key) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [](Optional<Value> const& val) -> Optional<T> {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<Optional<T>>>::type get(
Transaction tr,
Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return holdWhile(getFuture,
map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return Codec<T>::unpack(Tuple::unpack(val.get()));
return {};
});
}));
}
// Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
}
// Get property's value or throw error if it doesn't exist
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
if (!val.present()) {
throw err;
@ -179,8 +190,11 @@ public:
});
}
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<Optional<T>>>::type get(
Reference<DB> db,
Snapshot snapshot = Snapshot::False) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -188,8 +202,11 @@ public:
});
}
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getD(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -197,8 +214,11 @@ public:
});
}
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getOrThrow(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -206,10 +226,14 @@ public:
});
}
void set(Reference<ReadYourWritesTransaction> tr, T const& val) { return tr->set(key, Codec<T>::pack(val).pack()); }
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type set(Transaction tr, T const& val) {
return tr->set(key, Codec<T>::pack(val).pack());
}
Future<Void> set(Database cx, T const& val) {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<Void>>::type set(Reference<DB> db, T const& val) {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
self->set(tr, val);
@ -217,7 +241,11 @@ public:
});
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); }
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type clear(Transaction tr) {
return tr->clear(key);
}
Key key;
};
@ -228,27 +256,40 @@ template <typename T>
class KeyBackedBinaryValue {
public:
KeyBackedBinaryValue(KeyRef key) : key(key) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [](Optional<Value> const& val) -> Optional<T> {
template <class Transaction>
Future<Optional<T>> get(Transaction tr, Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return holdWhile(getFuture,
map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return BinaryReader::fromStringRef<T>(val.get(), Unversioned());
return {};
});
}));
}
// Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
template <class Transaction>
Future<T> getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return map(get(tr, Snapshot::False),
[=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
}
void set(Reference<ReadYourWritesTransaction> tr, T const& val) {
template <class Transaction>
void set(Transaction tr, T const& val) {
return tr->set(key, BinaryWriter::toValue<T>(val, Unversioned()));
}
void atomicOp(Reference<ReadYourWritesTransaction> tr, T const& val, MutationRef::Type type) {
template <class Transaction>
void atomicOp(Transaction tr, T const& val, MutationRef::Type type) {
return tr->atomicOp(key, BinaryWriter::toValue<T>(val, Unversioned()), type);
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(key);
}
Key key;
};
@ -265,18 +306,23 @@ public:
typedef std::vector<PairType> PairsType;
// If end is not present one key past the end of the map is used.
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr,
KeyType const& begin,
template <class Transaction>
Future<PairsType> getRange(Transaction tr,
Optional<KeyType> const& begin,
Optional<KeyType> const& end,
int limit,
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const {
Subspace s = space; // 'this' could be invalid inside lambda
Key endKey = end.present() ? s.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
return map(
tr->getRange(
KeyRangeRef(s.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse),
[s](RangeResult const& kvs) -> PairsType {
Key beginKey = begin.present() ? s.pack(Codec<KeyType>::pack(begin.get())) : s.range().begin;
Key endKey = end.present() ? s.pack(Codec<KeyType>::pack(end.get())) : s.range().end;
typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
return holdWhile(getRangeFuture,
map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> PairsType {
PairsType results;
for (int i = 0; i < kvs.size(); ++i) {
KeyType key = Codec<KeyType>::unpack(s.unpack(kvs[i].key));
@ -284,44 +330,48 @@ public:
results.push_back(PairType(key, val));
}
return results;
});
}));
}
Future<Optional<ValueType>> get(Reference<ReadYourWritesTransaction> tr,
KeyType const& key,
Snapshot snapshot = Snapshot::False) const {
return map(tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot),
[](Optional<Value> const& val) -> Optional<ValueType> {
template <class Transaction>
Future<Optional<ValueType>> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot);
return holdWhile(
getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> Optional<ValueType> {
if (val.present())
return Codec<ValueType>::unpack(Tuple::unpack(val.get()));
return {};
});
}));
}
// Returns a Property that can be get/set that represents key's entry in this this.
KeyBackedProperty<ValueType> getProperty(KeyType const& key) const { return space.pack(Codec<KeyType>::pack(key)); }
// Returns the expectedSize of the set key
int set(Reference<ReadYourWritesTransaction> tr, KeyType const& key, ValueType const& val) {
template <class Transaction>
int set(Transaction tr, KeyType const& key, ValueType const& val) {
Key k = space.pack(Codec<KeyType>::pack(key));
Value v = Codec<ValueType>::pack(val).pack();
tr->set(k, v);
return k.expectedSize() + v.expectedSize();
}
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) {
template <class Transaction>
void erase(Transaction tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
}
void erase(Reference<ITransaction> tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
}
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& begin, KeyType const& end) {
template <class Transaction>
void erase(Transaction tr, KeyType const& begin, KeyType const& end) {
return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end))));
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space;
};
@ -332,25 +382,32 @@ template <typename T, typename VersionOptions>
class KeyBackedObjectProperty {
public:
KeyBackedObjectProperty(KeyRef key, VersionOptions versionOptions) : key(key), versionOptions(versionOptions) {}
Future<Optional<T>> get(Reference<ReadYourWritesTransaction> tr, Snapshot snapshot = Snapshot::False) const {
return map(tr->get(key, snapshot), [vo = versionOptions](Optional<Value> const& val) -> Optional<T> {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<Optional<T>>>::type get(
Transaction tr,
Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture = tr->get(key, snapshot);
return holdWhile(
getFuture,
map(safeThreadFutureToFuture(getFuture), [vo = versionOptions](Optional<Value> const& val) -> Optional<T> {
if (val.present())
return ObjectReader::fromStringRef<T>(val.get(), vo);
return {};
});
}));
}
// Get property's value or defaultValue if it doesn't exist
Future<T> getD(Reference<ReadYourWritesTransaction> tr,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
getD(Transaction tr, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T { return val.present() ? val.get() : defaultValue; });
}
// Get property's value or throw error if it doesn't exist
Future<T> getOrThrow(Reference<ReadYourWritesTransaction> tr,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, Future<T>>::type
getOrThrow(Transaction tr, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return map(get(tr, snapshot), [=](Optional<T> val) -> T {
if (!val.present()) {
throw err;
@ -360,8 +417,11 @@ public:
});
}
Future<Optional<T>> get(Database cx, Snapshot snapshot = Snapshot::False) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<Optional<T>>>::type get(
Reference<DB> db,
Snapshot snapshot = Snapshot::False) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -369,8 +429,11 @@ public:
});
}
Future<T> getD(Database cx, Snapshot snapshot = Snapshot::False, T defaultValue = T()) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getD(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
T defaultValue = T()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -378,8 +441,11 @@ public:
});
}
Future<T> getOrThrow(Database cx, Snapshot snapshot = Snapshot::False, Error err = key_not_found()) const {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<T>>::type getOrThrow(Reference<DB> db,
Snapshot snapshot = Snapshot::False,
Error err = key_not_found()) const {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -387,12 +453,14 @@ public:
});
}
void set(Reference<ReadYourWritesTransaction> tr, T const& val) {
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type set(Transaction tr, T const& val) {
return tr->set(key, ObjectWriter::toValue(val, versionOptions));
}
Future<Void> set(Database cx, T const& val) {
return runRYWTransaction(cx, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
template <class DB>
typename std::enable_if<is_transaction_creator<DB>, Future<Void>>::type set(Reference<DB> db, T const& val) {
return runTransaction(db, [=, self = *this](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
self.set(tr, val);
@ -400,7 +468,10 @@ public:
});
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(key); }
template <class Transaction>
typename std::enable_if<!is_transaction_creator<Transaction>, void>::type clear(Transaction tr) {
return tr->clear(key);
}
Key key;
VersionOptions versionOptions;
@ -419,18 +490,22 @@ public:
typedef std::pair<KeyType, ValueType> PairType;
typedef std::vector<PairType> PairsType;
// If end is not present one key past the end of the map is used.
Future<PairsType> getRange(Reference<ReadYourWritesTransaction> tr,
KeyType const& begin,
template <class Transaction>
Future<PairsType> getRange(Transaction tr,
Optional<KeyType> const& begin,
Optional<KeyType> const& end,
int limit,
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const {
Key beginKey = begin.present() ? space.pack(Codec<KeyType>::pack(begin.get())) : space.range().begin;
Key endKey = end.present() ? space.pack(Codec<KeyType>::pack(end.get())) : space.range().end;
return map(
tr->getRange(
KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot, reverse),
[self = *this](RangeResult const& kvs) -> PairsType {
typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
return holdWhile(
getRangeFuture,
map(safeThreadFutureToFuture(getRangeFuture), [self = *this](RangeResult const& kvs) -> PairsType {
PairsType results;
for (int i = 0; i < kvs.size(); ++i) {
KeyType key = Codec<KeyType>::unpack(self.space.unpack(kvs[i].key));
@ -438,18 +513,21 @@ public:
results.push_back(PairType(key, val));
}
return results;
});
}));
}
Future<Optional<ValueType>> get(Reference<ReadYourWritesTransaction> tr,
KeyType const& key,
Snapshot snapshot = Snapshot::False) const {
return map(tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot),
template <class Transaction>
Future<Optional<ValueType>> get(Transaction tr, KeyType const& key, Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
tr->get(space.pack(Codec<KeyType>::pack(key)), snapshot);
return holdWhile(getFuture,
map(safeThreadFutureToFuture(getFuture),
[vo = versionOptions](Optional<Value> const& val) -> Optional<ValueType> {
if (val.present())
return ObjectReader::fromStringRef<ValueType>(val.get(), vo);
return {};
});
}));
}
// Returns a Property that can be get/set that represents key's entry in this this.
@ -459,7 +537,8 @@ public:
}
// Returns the expectedSize of the set key
int set(Reference<ReadYourWritesTransaction> tr, KeyType const& key, ValueType const& val) {
template <class Transaction>
int set(Transaction tr, KeyType const& key, ValueType const& val) {
Key k = space.pack(Codec<KeyType>::pack(key));
Value v = ObjectWriter::toValue(val, versionOptions);
tr->set(k, v);
@ -470,19 +549,20 @@ public:
Value serializeValue(ValueType const& val) { return ObjectWriter::toValue(val, versionOptions); }
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& key) {
template <class Transaction>
void erase(Transaction tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
}
void erase(Reference<ITransaction> tr, KeyType const& key) {
return tr->clear(space.pack(Codec<KeyType>::pack(key)));
}
void erase(Reference<ReadYourWritesTransaction> tr, KeyType const& begin, KeyType const& end) {
template <class Transaction>
void erase(Transaction tr, KeyType const& begin, KeyType const& end) {
return tr->clear(KeyRangeRef(space.pack(Codec<KeyType>::pack(begin)), space.pack(Codec<KeyType>::pack(end))));
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space;
VersionOptions versionOptions;
@ -496,49 +576,63 @@ public:
typedef _ValueType ValueType;
typedef std::vector<ValueType> Values;
// If end is not present one key past the end of the map is used.
Future<Values> getRange(Reference<ReadYourWritesTransaction> tr,
ValueType const& begin,
template <class Transaction>
Future<Values> getRange(Transaction tr,
Optional<ValueType> const& begin,
Optional<ValueType> const& end,
int limit,
Snapshot snapshot = Snapshot::False) const {
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const {
Subspace s = space; // 'this' could be invalid inside lambda
Key beginKey = begin.present() ? s.pack(Codec<ValueType>::pack(begin.get())) : space.range().begin;
Key endKey = end.present() ? s.pack(Codec<ValueType>::pack(end.get())) : space.range().end;
return map(
tr->getRange(KeyRangeRef(s.pack(Codec<ValueType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot),
[s](RangeResult const& kvs) -> Values {
typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
return holdWhile(getRangeFuture,
map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> Values {
Values results;
for (int i = 0; i < kvs.size(); ++i) {
results.push_back(Codec<ValueType>::unpack(s.unpack(kvs[i].key)));
}
return results;
});
}));
}
Future<bool> exists(Reference<ReadYourWritesTransaction> tr,
ValueType const& val,
Snapshot snapshot = Snapshot::False) const {
return map(tr->get(space.pack(Codec<ValueType>::pack(val)), snapshot),
[](Optional<Value> const& val) -> bool { return val.present(); });
template <class Transaction>
Future<bool> exists(Transaction tr, ValueType const& val, Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
tr->get(space.pack(Codec<ValueType>::pack(val)), snapshot);
return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> bool {
return val.present();
}));
}
// Returns the expectedSize of the set key
int insert(Reference<ReadYourWritesTransaction> tr, ValueType const& val) {
template <class Transaction>
int insert(Transaction tr, ValueType const& val) {
Key k = space.pack(Codec<ValueType>::pack(val));
tr->set(k, StringRef());
return k.expectedSize();
}
void erase(Reference<ReadYourWritesTransaction> tr, ValueType const& val) {
template <class Transaction>
void erase(Transaction tr, ValueType const& val) {
return tr->clear(space.pack(Codec<ValueType>::pack(val)));
}
void erase(Reference<ReadYourWritesTransaction> tr, ValueType const& begin, ValueType const& end) {
template <class Transaction>
void erase(Transaction tr, ValueType const& begin, ValueType const& end) {
return tr->clear(
KeyRangeRef(space.pack(Codec<ValueType>::pack(begin)), space.pack(Codec<ValueType>::pack(end))));
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space;
};

View File

@ -52,6 +52,24 @@ Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>())
}
}
ACTOR template <class Function, class DB>
Future<decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue())> runTransaction(
Reference<DB> db,
Function func) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
// func should be idempodent; otherwise, retry will get undefined result
state decltype(std::declval<Function>()(Reference<typename DB::TransactionT>()).getValue()) result =
wait(func(tr));
wait(safeThreadFutureToFuture(tr->commit()));
return result;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Function>
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
runRYWTransactionFailIfLocked(Database cx, Function func) {

View File

@ -125,8 +125,8 @@ struct BackupData {
PerBackupInfo(BackupData* data, UID uid, Version v) : self(data), startVersion(v) {
// Open the container and get key ranges
BackupConfig config(uid);
container = config.backupContainer().get(data->cx);
ranges = config.backupRanges().get(data->cx);
container = config.backupContainer().get(data->cx.getReference());
ranges = config.backupRanges().get(data->cx.getReference());
if (self->backupEpoch == self->recruitedEpoch) {
// Only current epoch's worker update the number of backup workers.
updateWorker = _updateStartedWorkers(this, data, uid);

View File

@ -466,11 +466,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter);
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx));
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference()));
state UID logUid = uidFlag.first;
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx));
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference()));
state Reference<IBackupContainer> lastBackupContainer =
wait(BackupConfig(logUid).backupContainer().getD(cx));
wait(BackupConfig(logUid).backupContainer().getD(cx.getReference()));
// Occasionally start yet another backup that might still be running when we restore
if (!self->locked && BUGGIFY) {

View File

@ -513,11 +513,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
.detail("AbortAndRestartAfter", self->abortAndRestartAfter);
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx));
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference()));
state UID logUid = uidFlag.first;
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx));
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference()));
state Reference<IBackupContainer> lastBackupContainer =
wait(BackupConfig(logUid).backupContainer().getD(cx));
wait(BackupConfig(logUid).backupContainer().getD(cx.getReference()));
// Occasionally start yet another backup that might still be running when we restore
if (!self->locked && BUGGIFY) {