Merge pull request #5458 from sfc-gh-clin/refactor-transaction-related-commands
Refactor transaction related commands
This commit is contained in:
commit
0806309230
|
@ -12,7 +12,7 @@ set(FDBCLI_SRCS
|
|||
SetClassCommand.actor.cpp
|
||||
SnapshotCommand.actor.cpp
|
||||
ThrottleCommand.actor.cpp
|
||||
Util.cpp
|
||||
Util.actor.cpp
|
||||
linenoise/linenoise.h)
|
||||
|
||||
if(NOT WIN32)
|
||||
|
|
|
@ -71,15 +71,22 @@ ACTOR Future<Void> printProcessClass(Reference<IDatabase> db) {
|
|||
};
|
||||
|
||||
ACTOR Future<bool> setProcessClass(Reference<IDatabase> db, KeyRef network_address, KeyRef class_type) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
try {
|
||||
tr->set(network_address.withPrefix(fdb_cli::processClassTypeSpecialKeyRange.begin), class_type);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
tr->set(network_address.withPrefix(fdb_cli::processClassTypeSpecialKeyRange.begin), class_type);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
state Error err(e);
|
||||
if (e.code() == error_code_special_keys_api_failure) {
|
||||
std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
|
||||
// error message already has \n at the end
|
||||
fprintf(stderr, "%s", errorMsgStr.c_str());
|
||||
return false;
|
||||
}
|
||||
wait(safeThreadFutureToFuture(tr->onError(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -103,8 +110,8 @@ ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<Str
|
|||
} else if (tokens.size() == 1) {
|
||||
wait(printProcessClass(db));
|
||||
} else {
|
||||
bool successful = wait(setProcessClass(db, tokens[1], tokens[2]));
|
||||
return successful;
|
||||
bool successful = wait(setProcessClass(db, tokens[1], tokens[2]));
|
||||
return successful;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Util.cpp
|
||||
* Util.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -19,9 +19,14 @@
|
|||
*/
|
||||
|
||||
#include "fdbcli/fdbcli.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/Schemas.h"
|
||||
#include "fdbclient/Status.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace fdb_cli {
|
||||
|
||||
bool tokencmp(StringRef token, const char* command) {
|
||||
|
@ -40,4 +45,18 @@ void printUsage(StringRef command) {
|
|||
fprintf(stderr, "ERROR: Unknown command `%s'\n", command.toString().c_str());
|
||||
}
|
||||
|
||||
ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransaction> tr) {
|
||||
Optional<Value> errorMsg = wait(safeThreadFutureToFuture(tr->get(fdb_cli::errorMsgSpecialKey)));
|
||||
// Error message should be present
|
||||
ASSERT(errorMsg.present());
|
||||
// Read the json string
|
||||
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
|
||||
// verify schema
|
||||
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
|
||||
std::string errorStr;
|
||||
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
|
||||
// return the error message
|
||||
return valueObj["message"].get_str();
|
||||
}
|
||||
|
||||
} // namespace fdb_cli
|
|
@ -44,6 +44,7 @@
|
|||
#include "flow/Platform.h"
|
||||
|
||||
#include "flow/TLSConfig.actor.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/SimpleOpt.h"
|
||||
|
||||
#include "fdbcli/FlowLineNoise.h"
|
||||
|
@ -1963,6 +1964,16 @@ ACTOR Future<Void> commitTransaction(Reference<ReadYourWritesTransaction> tr) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitTransaction(Reference<ITransaction> tr) {
|
||||
wait(makeInterruptable(safeThreadFutureToFuture(tr->commit())));
|
||||
auto ver = tr->getCommittedVersion();
|
||||
if (ver != invalidVersion)
|
||||
printf("Committed (%" PRId64 ")\n", ver);
|
||||
else
|
||||
printf("Nothing to commit\n");
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<bool> configure(Database db,
|
||||
std::vector<StringRef> tokens,
|
||||
Reference<ClusterConnectionFile> ccf,
|
||||
|
@ -3442,7 +3453,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
|
||||
if (tokencmp(tokens[0], "waitopen")) {
|
||||
wait(success(getTransaction(db, tr, options, intrans)->getReadVersion()));
|
||||
wait(success(
|
||||
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->getReadVersion())));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3652,7 +3664,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
} else {
|
||||
activeOptions = FdbOptions(globalOptions);
|
||||
options = &activeOptions;
|
||||
getTransaction(db, tr, options, false);
|
||||
getTransaction(db, tr, tr2, options, false);
|
||||
intrans = true;
|
||||
printf("Transaction started\n");
|
||||
}
|
||||
|
@ -3667,7 +3679,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
fprintf(stderr, "ERROR: No active transaction\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
wait(commitTransaction(tr));
|
||||
wait(commitTransaction(tr2));
|
||||
intrans = false;
|
||||
options = &globalOptions;
|
||||
}
|
||||
|
@ -3684,9 +3696,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
is_error = true;
|
||||
} else {
|
||||
tr->reset();
|
||||
tr2->reset();
|
||||
activeOptions = FdbOptions(globalOptions);
|
||||
options = &activeOptions;
|
||||
options->apply(tr);
|
||||
options->apply(tr2);
|
||||
printf("Transaction reset\n");
|
||||
}
|
||||
continue;
|
||||
|
@ -3712,8 +3726,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else {
|
||||
Optional<Standalone<StringRef>> v =
|
||||
wait(makeInterruptable(getTransaction(db, tr, options, intrans)->get(tokens[1])));
|
||||
Optional<Standalone<StringRef>> v = wait(makeInterruptable(
|
||||
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->get(tokens[1]))));
|
||||
|
||||
if (v.present())
|
||||
printf("`%s' is `%s'\n", printable(tokens[1]).c_str(), printable(v.get()).c_str());
|
||||
|
@ -3728,7 +3742,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else {
|
||||
Version v = wait(makeInterruptable(getTransaction(db, tr, options, intrans)->getReadVersion()));
|
||||
Version v = wait(makeInterruptable(
|
||||
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)->getReadVersion())));
|
||||
printf("%ld\n", v);
|
||||
}
|
||||
continue;
|
||||
|
@ -4233,7 +4248,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
|
||||
RangeResult kvs = wait(makeInterruptable(
|
||||
getTransaction(db, tr, options, intrans)->getRange(KeyRangeRef(tokens[1], endKey), limit)));
|
||||
safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans)
|
||||
->getRange(KeyRangeRef(tokens[1], endKey), limit))));
|
||||
|
||||
printf("\nRange limited to %d keys\n", limit);
|
||||
for (auto iter = kvs.begin(); iter < kvs.end(); iter++) {
|
||||
|
@ -4276,11 +4292,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else {
|
||||
getTransaction(db, tr, options, intrans);
|
||||
tr->set(tokens[1], tokens[2]);
|
||||
getTransaction(db, tr, tr2, options, intrans);
|
||||
tr2->set(tokens[1], tokens[2]);
|
||||
|
||||
if (!intrans) {
|
||||
wait(commitTransaction(tr));
|
||||
wait(commitTransaction(tr2));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
|
@ -4297,11 +4313,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else {
|
||||
getTransaction(db, tr, options, intrans);
|
||||
tr->clear(tokens[1]);
|
||||
getTransaction(db, tr, tr2, options, intrans);
|
||||
tr2->clear(tokens[1]);
|
||||
|
||||
if (!intrans) {
|
||||
wait(commitTransaction(tr));
|
||||
wait(commitTransaction(tr2));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
|
@ -4318,11 +4334,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else {
|
||||
getTransaction(db, tr, options, intrans);
|
||||
tr->clear(KeyRangeRef(tokens[1], tokens[2]));
|
||||
getTransaction(db, tr, tr2, options, intrans);
|
||||
tr2->clear(KeyRangeRef(tokens[1], tokens[2]));
|
||||
|
||||
if (!intrans) {
|
||||
wait(commitTransaction(tr));
|
||||
wait(commitTransaction(tr2));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
|
@ -4422,6 +4438,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
intrans = false;
|
||||
options = &globalOptions;
|
||||
options->apply(tr);
|
||||
options->apply(tr2);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,12 +70,17 @@ extern const KeyRef ignoreSSFailureSpecialKey;
|
|||
// setclass
|
||||
extern const KeyRangeRef processClassSourceSpecialKeyRange;
|
||||
extern const KeyRangeRef processClassTypeSpecialKeyRange;
|
||||
// Other special keys
|
||||
inline const KeyRef errorMsgSpecialKey = LiteralStringRef("\xff\xff/error_message");
|
||||
// help functions (Copied from fdbcli.actor.cpp)
|
||||
|
||||
// compare StringRef with the given c string
|
||||
bool tokencmp(StringRef token, const char* command);
|
||||
// print the usage of the specified command
|
||||
void printUsage(StringRef command);
|
||||
// Pre: tr failed with special_keys_api_failure error
|
||||
// Read the error message special key and return the message
|
||||
ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransaction> tr);
|
||||
|
||||
// All fdbcli commands (alphabetically)
|
||||
// advanceversion command
|
||||
|
|
Loading…
Reference in New Issue