Removed SYSTEM_PRIORITY_IMMEDIATE from KeyBackedTypes and all options from KeyBackedRangeMap database functions. Added SystemTransactionGenerator<> for wrapping Database types and generating transactions with selected system level options.
This commit is contained in:
parent
46cde666a5
commit
639d4d05ef
|
@ -31,7 +31,7 @@
|
|||
|
||||
namespace fdb_cli {
|
||||
|
||||
ACTOR Future<bool> rangeConfigCommandActor(Database localDb, std::vector<StringRef> tokens) {
|
||||
ACTOR Future<bool> rangeConfigCommandActor(Database cx, std::vector<StringRef> tokens) {
|
||||
state std::function<bool(std::string)> fail = [&](std::string msg) {
|
||||
if (!msg.empty()) {
|
||||
fmt::print(stderr, "ERROR: {}\n", msg);
|
||||
|
@ -72,8 +72,8 @@ ACTOR Future<bool> rangeConfigCommandActor(Database localDb, std::vector<StringR
|
|||
}
|
||||
}
|
||||
|
||||
DDConfiguration::RangeConfigMapSnapshot config =
|
||||
wait(DDConfiguration().userRangeConfig().getSnapshot(localDb.getReference(), allKeys.begin, allKeys.end));
|
||||
DDConfiguration::RangeConfigMapSnapshot config = wait(DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBLockWriteNow(cx.getReference()), allKeys.begin, allKeys.end));
|
||||
fmt::print(
|
||||
"{}\n",
|
||||
json_spirit::write_string(DDConfiguration::toJSON(config, includeDefault), json_spirit::pretty_print));
|
||||
|
@ -110,7 +110,7 @@ ACTOR Future<bool> rangeConfigCommandActor(Database localDb, std::vector<StringR
|
|||
}
|
||||
|
||||
wait(DDConfiguration().userRangeConfig().updateRange(
|
||||
localDb.getReference(), begin, end, rangeConfig, cmd == "set"_sr));
|
||||
SystemDBLockWriteNow(cx.getReference()), begin, end, rangeConfig, cmd == "set"_sr));
|
||||
}
|
||||
} else {
|
||||
return fail(fmt::format("Unknown command: '{}'", cmd.printable()));
|
||||
|
|
|
@ -311,10 +311,6 @@ public:
|
|||
ValueType const& valueUpdate,
|
||||
bool replace = false) const {
|
||||
return runTransaction(db, [=, self = *this](Reference<typename DB::TransactionT> tr) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return self.updateRange(tr, begin, end, valueUpdate, replace);
|
||||
});
|
||||
}
|
||||
|
@ -380,10 +376,6 @@ public:
|
|||
typename std::enable_if<is_transaction_creator<DB>, Future<LocalSnapshot>>::type
|
||||
getSnapshot(Reference<DB> db, KeyType const& begin, KeyType const& end) const {
|
||||
return runTransaction(db, [=, self = *this](Reference<typename DB::TransactionT> tr) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
return self.getSnapshot(tr, begin, end);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include <utility>
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
ACTOR template <class Function, class DB>
|
||||
|
@ -66,5 +67,52 @@ Future<Void> runTransactionVoid(Reference<DB> db, Function func) {
|
|||
}
|
||||
}
|
||||
|
||||
// SystemTransactionGenerator is a Database-like wrapper which produces transactions which have selected
|
||||
// options set for lock awareness, reading and optionally writing the system keys, and immediate priority.
|
||||
// All options are false by default.
|
||||
template <typename DB>
|
||||
struct SystemTransactionGenerator : ReferenceCounted<SystemTransactionGenerator<DB>> {
|
||||
typedef typename DB::TransactionT TransactionT;
|
||||
|
||||
SystemTransactionGenerator(Reference<DB> db, bool write, bool lockAware, bool immediate)
|
||||
: db(db), write(write), lockAware(lockAware), immediate(immediate) {}
|
||||
|
||||
Reference<TransactionT> createTransaction() const {
|
||||
Reference<TransactionT> tr = db->createTransaction();
|
||||
|
||||
if (write) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
} else {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
}
|
||||
|
||||
if (immediate) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
}
|
||||
|
||||
if (lockAware) {
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
}
|
||||
return tr;
|
||||
}
|
||||
|
||||
Reference<DB> db;
|
||||
bool write;
|
||||
bool lockAware;
|
||||
bool immediate;
|
||||
};
|
||||
|
||||
// Convenient wrapper for creating SystemTransactionGenerators.
|
||||
template <typename DB>
|
||||
auto SystemDB(Reference<DB> db, bool write = false, bool lockAware = false, bool immediate = false) {
|
||||
return makeReference<SystemTransactionGenerator<DB>>(db, write, lockAware, immediate);
|
||||
}
|
||||
|
||||
// SystemDB with all options true
|
||||
template <typename DB>
|
||||
auto SystemDBLockWriteNow(Reference<DB> db) {
|
||||
return SystemDB(db, true, true, true);
|
||||
}
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -442,7 +442,8 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
|
|||
}
|
||||
|
||||
state DDConfiguration::RangeConfigMapSnapshot userRangeConfig =
|
||||
wait(DDConfiguration().userRangeConfig().getSnapshot(cx.getReference(), allKeys.begin, allKeys.end));
|
||||
wait(DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBLockWriteNow(cx.getReference()), allKeys.begin, allKeys.end));
|
||||
|
||||
for (; i < ranges.size(); i++) {
|
||||
state int shard = shardOrder[i];
|
||||
|
|
|
@ -244,7 +244,8 @@ class DDTxnProcessorImpl {
|
|||
|
||||
if (ddLargeTeamEnabled()) {
|
||||
wait(store(*result->userRangeConfig,
|
||||
DDConfiguration().userRangeConfig().getSnapshot(cx.getReference(), allKeys.begin, allKeys.end)));
|
||||
DDConfiguration().userRangeConfig().getSnapshot(
|
||||
SystemDBLockWriteNow(cx.getReference()), allKeys.begin, allKeys.end)));
|
||||
}
|
||||
state std::map<UID, Optional<Key>> server_dc;
|
||||
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
|
||||
|
|
|
@ -721,7 +721,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
// Start watching for changes before reading the config in init() below
|
||||
state Promise<Version> configChangeWatching;
|
||||
state Future<Void> onConfigChange =
|
||||
map(DDConfiguration().trigger.onChange(cx.getReference(), {}, configChangeWatching), [](Version v) {
|
||||
map(DDConfiguration().trigger.onChange(SystemDBLockWriteNow(cx.getReference()), {}, configChangeWatching),
|
||||
[](Version v) {
|
||||
CODE_PROBE(true, "DataDistribution change detected");
|
||||
TraceEvent("DataDistributionConfigChanged").detail("ChangeVersion", v);
|
||||
throw dd_config_changed();
|
||||
|
|
Loading…
Reference in New Issue