Remove using the RYWTransaction object in fdbcli

This commit is contained in:
Chaoguang Lin 2021-07-13 01:16:54 +00:00
parent 46fd774d8c
commit 389e7051af
1 changed files with 38 additions and 82 deletions

View File

@ -165,16 +165,14 @@ std::string lineWrap(const char* text, int col) {
class FdbOptions {
public:
// Prints an error and throws invalid_option or invalid_option_value if the option could not be set
// TODO: remove Reference<ReadYourWritesTransaction> after we refactor all fdbcli code
void setOption(Reference<ReadYourWritesTransaction> tr,
Reference<ITransaction> tr2,
void setOption(Reference<ITransaction> tr,
StringRef optionStr,
bool enabled,
Optional<StringRef> arg,
bool intrans) {
auto transactionItr = transactionOptions.legalOptions.find(optionStr.toString());
if (transactionItr != transactionOptions.legalOptions.end())
setTransactionOption(tr, tr2, transactionItr->second, enabled, arg, intrans);
setTransactionOption(tr, transactionItr->second, enabled, arg, intrans);
else {
fprintf(stderr,
"ERROR: invalid option '%s'. Try `help options' for a list of available options.\n",
@ -184,13 +182,6 @@ public:
}
// Applies all enabled transaction options to the given transaction
void apply(Reference<ReadYourWritesTransaction> tr) {
for (const auto& [name, value] : transactionOptions.options) {
tr->setOption(name, value.castTo<StringRef>());
}
}
// TODO: replace the above function after we refactor all fdbcli code
void apply(Reference<ITransaction> tr) {
for (const auto& [name, value] : transactionOptions.options) {
tr->setOption(name, value.castTo<StringRef>());
@ -217,9 +208,7 @@ public:
private:
// Sets a transaction option. If intrans == true, then this option is also applied to the passed in transaction.
// TODO: remove Reference<ReadYourWritesTransaction> after we refactor all fdbcli code
void setTransactionOption(Reference<ReadYourWritesTransaction> tr,
Reference<ITransaction> tr2,
void setTransactionOption(Reference<ITransaction> tr,
FDBTransactionOptions::Option option,
bool enabled,
Optional<StringRef> arg,
@ -231,7 +220,6 @@ private:
if (intrans) {
tr->setOption(option, arg);
tr2->setOption(option, arg);
}
transactionOptions.setOption(option, enabled, arg.castTo<StringRef>());
@ -1844,16 +1832,6 @@ Future<T> makeInterruptable(Future<T> f) {
}
}
ACTOR Future<Void> commitTransaction(Reference<ReadYourWritesTransaction> tr) {
wait(makeInterruptable(tr->commit()));
auto ver = tr->getCommittedVersion();
if (ver != invalidVersion)
printf("Committed (%" PRId64 ")\n", ver);
else
printf("Nothing to commit\n");
return Void();
}
ACTOR Future<Void> commitTransaction(Reference<ITransaction> tr) {
wait(makeInterruptable(safeThreadFutureToFuture(tr->commit())));
auto ver = tr->getCommittedVersion();
@ -2629,37 +2607,19 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens) {
return false;
}
Reference<ReadYourWritesTransaction> getTransaction(Database db,
Reference<ReadYourWritesTransaction>& tr,
FdbOptions* options,
bool intrans) {
if (!tr || !intrans) {
tr = makeReference<ReadYourWritesTransaction>(db);
options->apply(tr);
}
return tr;
}
// TODO: Update the function to get rid of Database and ReadYourWritesTransaction after refactoring
// The original ReadYourWritesTransaciton handle "tr" is needed as some commands can be called inside a
// transaction and "tr" holds the pointer to the ongoing transaction object. As it's not easy to get ride of "tr" in
// one shot and we are refactoring the code to use Reference<ITransaction> (tr2), we need to let "tr2" point to the same
// underlying transaction like "tr". Thus everytime we need to use "tr2", we first update "tr" and let "tr2" points to
// "tr1". "tr2" is always having the same lifetime as "tr1"
Reference<ITransaction> getTransaction(Database db,
Reference<ReadYourWritesTransaction>& tr,
Reference<ITransaction>& tr2,
// TODO: Update the function to get rid of the Database after refactoring
Reference<ITransaction> getTransaction(Reference<IDatabase> db,
Reference<ITransaction>& tr,
FdbOptions* options,
bool intrans) {
// Update "tr" to point to a brand new transaction object when it's not initialized or "intrans" flag is "false",
// which indicates we need a new transaction object
if (!tr || !intrans) {
tr = makeReference<ReadYourWritesTransaction>(db);
tr = db->createTransaction();
options->apply(tr);
}
tr2 = Reference<ITransaction>(new ThreadSafeTransaction(tr.getPtr()));
return tr2;
return tr;
}
std::string newCompletion(const char* base, const char* name) {
@ -3104,10 +3064,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state bool intrans = false;
state Database db;
state Reference<ReadYourWritesTransaction> tr;
// TODO: refactoring work, will replace db, tr when we have all commands through the general fdb interface
// TODO: refactoring work, will replace db when we have all commands through the general fdb interface
state Reference<IDatabase> db2;
state Reference<ITransaction> tr2;
state Reference<ITransaction> tr;
state bool writeMode = false;
@ -3313,8 +3272,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "waitopen")) {
wait(success(
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->getReadVersion())));
wait(
success(safeThreadFutureToFuture(getTransaction(db2, tr, options, intrans)->getReadVersion())));
continue;
}
@ -3506,7 +3465,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
} else {
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
getTransaction(db, tr, tr2, options, false);
getTransaction(db2, tr, options, false);
intrans = true;
printf("Transaction started\n");
}
@ -3521,7 +3480,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
wait(commitTransaction(tr2));
wait(commitTransaction(tr));
intrans = false;
options = &globalOptions;
}
@ -3538,11 +3497,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
} else {
tr->reset();
tr2->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
options->apply(tr2);
printf("Transaction reset\n");
}
continue;
@ -3569,7 +3526,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
} else {
Optional<Standalone<StringRef>> v = wait(makeInterruptable(
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->get(tokens[1]))));
safeThreadFutureToFuture(getTransaction(db2, tr, options, intrans)->get(tokens[1]))));
if (v.present())
printf("`%s' is `%s'\n", printable(tokens[1]).c_str(), printable(v.get()).c_str());
@ -3585,7 +3542,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
is_error = true;
} else {
Version v = wait(makeInterruptable(
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->getReadVersion())));
safeThreadFutureToFuture(getTransaction(db2, tr, options, intrans)->getReadVersion())));
printf("%ld\n", v);
}
continue;
@ -3599,16 +3556,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "kill")) {
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(killCommandActor(db2, tr2, tokens, &address_interface)));
getTransaction(db2, tr, options, intrans);
bool _result = wait(makeInterruptable(killCommandActor(db2, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "suspend")) {
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(suspendCommandActor(db2, tr2, tokens, &address_interface)));
getTransaction(db2, tr, options, intrans);
bool _result = wait(makeInterruptable(suspendCommandActor(db2, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
@ -3629,25 +3586,25 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "consistencycheck")) {
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr2, tokens, intrans)));
getTransaction(db2, tr, options, intrans);
bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "profile")) {
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(profileCommandActor(tr2, tokens, intrans)));
getTransaction(db2, tr, options, intrans);
bool _result = wait(makeInterruptable(profileCommandActor(tr, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "expensive_data_check")) {
getTransaction(db, tr, tr2, options, intrans);
getTransaction(db2, tr, options, intrans);
bool _result =
wait(makeInterruptable(expensiveDataCheckCommandActor(db2, tr2, tokens, &address_interface)));
wait(makeInterruptable(expensiveDataCheckCommandActor(db2, tr, tokens, &address_interface)));
if (!_result)
is_error = true;
continue;
@ -3706,7 +3663,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
RangeResult kvs = wait(makeInterruptable(
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)
safeThreadFutureToFuture(getTransaction(db2, tr, options, intrans)
->getRange(KeyRangeRef(tokens[1], endKey), limit))));
printf("\nRange limited to %d keys\n", limit);
@ -3750,11 +3707,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, tr2, options, intrans);
tr2->set(tokens[1], tokens[2]);
getTransaction(db2, tr, options, intrans);
tr->set(tokens[1], tokens[2]);
if (!intrans) {
wait(commitTransaction(tr2));
wait(commitTransaction(tr));
}
}
continue;
@ -3771,11 +3728,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, tr2, options, intrans);
tr2->clear(tokens[1]);
getTransaction(db2, tr, options, intrans);
tr->clear(tokens[1]);
if (!intrans) {
wait(commitTransaction(tr2));
wait(commitTransaction(tr));
}
}
continue;
@ -3792,11 +3749,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printUsage(tokens[0]);
is_error = true;
} else {
getTransaction(db, tr, tr2, options, intrans);
tr2->clear(KeyRangeRef(tokens[1], tokens[2]));
getTransaction(db2, tr, options, intrans);
tr->clear(KeyRangeRef(tokens[1], tokens[2]));
if (!intrans) {
wait(commitTransaction(tr2));
wait(commitTransaction(tr));
}
}
continue;
@ -3853,7 +3810,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
Optional<StringRef> arg = (tokens.size() > 3) ? tokens[3] : Optional<StringRef>();
try {
options->setOption(tr, tr2, tokens[2], isOn, arg, intrans);
options->setOption(tr, tokens[2], isOn, arg, intrans);
printf("Option %s for %s\n",
isOn ? "enabled" : "disabled",
intrans ? "current transaction" : "all transactions");
@ -3896,7 +3853,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
intrans = false;
options = &globalOptions;
options->apply(tr);
options->apply(tr2);
}
}