Fix the issue in CheckStatus where it stuck as we cannot talk to any of the coordinators
This commit is contained in:
parent
99cc042d9c
commit
af5643c998
|
@ -1179,7 +1179,14 @@ void printStatus(StatusObjectReader statusObj,
|
|||
return;
|
||||
}
|
||||
|
||||
ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens, bool isExecMode) {
|
||||
// "db" is the handler to the multiversion databse
|
||||
// localDb is the native Database object
|
||||
// localDb is rarely needed except the "db" has not establised a connection to the cluster where the operation will
|
||||
// return Never as we expect status command to always return, we use "localDb" to return the default result
|
||||
ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db,
|
||||
Database localDb,
|
||||
std::vector<StringRef> tokens,
|
||||
bool isExecMode) {
|
||||
|
||||
state StatusClient::StatusLevel level;
|
||||
if (tokens.size() == 1)
|
||||
|
@ -1195,15 +1202,21 @@ ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db, std::vector<Strin
|
|||
return false;
|
||||
}
|
||||
|
||||
state StatusObject s;
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
state ThreadFuture<Optional<Value>> statusValueF = tr->get(LiteralStringRef("\xff\xff/status/json"));
|
||||
Optional<Value> statusValue = wait(safeThreadFutureToFuture(statusValueF));
|
||||
if (!statusValue.present()) {
|
||||
fprintf(stderr, "ERROR: Failed to get status json from the cluster\n");
|
||||
if (!tr->isValid()) {
|
||||
StatusObject _s = wait(StatusClient::statusFetcher(localDb));
|
||||
s = _s;
|
||||
} else {
|
||||
state ThreadFuture<Optional<Value>> statusValueF = tr->get(LiteralStringRef("\xff\xff/status/json"));
|
||||
Optional<Value> statusValue = wait(safeThreadFutureToFuture(statusValueF));
|
||||
if (!statusValue.present()) {
|
||||
fprintf(stderr, "ERROR: Failed to get status json from the cluster\n");
|
||||
}
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string(statusValue.get().toString(), mv);
|
||||
s = StatusObject(mv.get_obj());
|
||||
}
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string(statusValue.get().toString(), mv);
|
||||
StatusObject s = StatusObject(mv.get_obj());
|
||||
|
||||
if (!isExecMode)
|
||||
printf("\n");
|
||||
|
|
|
@ -661,18 +661,27 @@ ACTOR Future<Void> timeWarning(double when, const char* msg) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkStatus(Future<Void> f, Reference<IDatabase> db, bool displayDatabaseAvailable = true) {
|
||||
ACTOR Future<Void> checkStatus(Future<Void> f,
|
||||
Reference<IDatabase> db,
|
||||
Database localDb,
|
||||
bool displayDatabaseAvailable = true) {
|
||||
wait(f);
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
state ThreadFuture<Optional<Value>> statusValueF = tr->get(LiteralStringRef("\xff\xff/status/json"));
|
||||
Optional<Optional<Value>> statusValue = wait(timeout(safeThreadFutureToFuture(statusValueF), 5.0));
|
||||
if (!statusValue.present() || !statusValue.get().present()) {
|
||||
fprintf(stderr, "ERROR: Failed to get status json from the cluster\n");
|
||||
return Void();
|
||||
state StatusObject s;
|
||||
if (!tr->isValid()) {
|
||||
StatusObject _s = wait(StatusClient::statusFetcher(localDb));
|
||||
s = _s;
|
||||
} else {
|
||||
state ThreadFuture<Optional<Value>> statusValueF = tr->get(LiteralStringRef("\xff\xff/status/json"));
|
||||
Optional<Value> statusValue = wait(safeThreadFutureToFuture(statusValueF));
|
||||
if (!statusValue.present()) {
|
||||
fprintf(stderr, "ERROR: Failed to get status json from the cluster\n");
|
||||
return Void();
|
||||
}
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string(statusValue.get().toString(), mv);
|
||||
s = StatusObject(mv.get_obj());
|
||||
}
|
||||
json_spirit::mValue mv;
|
||||
json_spirit::read_string(statusValue.get().get().toString(), mv);
|
||||
StatusObject s = StatusObject(mv.get_obj());
|
||||
printf("\n");
|
||||
printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable);
|
||||
printf("\n");
|
||||
|
@ -1987,6 +1996,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
|
||||
// used to catch the first cluster_version_changed error when using external clients
|
||||
// when using external clients, it will throw cluster_version_changed for the first time establish the connection to
|
||||
// the cluster. Thus, we catch it by doing a get version request to establish the connection
|
||||
// The 3.0 timeout is a guard to avoid waiting forever when the cli cannot talk to any coordinators
|
||||
loop {
|
||||
try {
|
||||
getTransaction(db2, tr, options, intrans);
|
||||
|
@ -1994,13 +2006,20 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
wait(delay(3.0) || success(safeThreadFutureToFuture(tr->getReadVersion())));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
if (e.code() == error_code_cluster_version_changed) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
} else {
|
||||
// unexpected errors
|
||||
fprintf(stderr, "ERROR: unexpected error %d while initializing the multiversion database\n", e.code());
|
||||
tr->reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!opt.exec.present()) {
|
||||
if (opt.initialStatusCheck) {
|
||||
Future<Void> checkStatusF = checkStatus(Void(), db2);
|
||||
Future<Void> checkStatusF = checkStatus(Void(), db2, db);
|
||||
wait(makeInterruptable(success(checkStatusF)));
|
||||
} else {
|
||||
printf("\n");
|
||||
|
@ -2038,7 +2057,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
linenoise.historyAdd(line);
|
||||
}
|
||||
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db2);
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db2, db);
|
||||
|
||||
try {
|
||||
state UID randomID = deterministicRandom()->randomUniqueID();
|
||||
|
@ -2192,7 +2211,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
// printStatus(s, level);
|
||||
// if (!opt.exec.present())
|
||||
// printf("\n");
|
||||
bool _result = wait(makeInterruptable(statusCommandActor(db2, tokens, opt.exec.present())));
|
||||
bool _result = wait(makeInterruptable(statusCommandActor(db2, db, tokens, opt.exec.present())));
|
||||
if (!_result)
|
||||
is_error = true;
|
||||
continue;
|
||||
|
@ -2301,7 +2320,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
fflush(stdout);
|
||||
Optional<std::string> input =
|
||||
wait(linenoise.read(format("Repeat the above passphrase if you would like to proceed:")));
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db2);
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db2, db);
|
||||
if (input.present() && input.get() == passPhrase) {
|
||||
UID unlockUID = UID::fromString(tokens[1].toString());
|
||||
try {
|
||||
|
|
|
@ -137,7 +137,10 @@ ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<Str
|
|||
// snapshot command
|
||||
ACTOR Future<bool> snapshotCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// status command
|
||||
ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens, bool isExecMode = false);
|
||||
ACTOR Future<bool> statusCommandActor(Reference<IDatabase> db,
|
||||
Database localDb,
|
||||
std::vector<StringRef> tokens,
|
||||
bool isExecMode = false);
|
||||
// suspend command
|
||||
ACTOR Future<bool> suspendCommandActor(Reference<IDatabase> db,
|
||||
Reference<ITransaction> tr,
|
||||
|
|
|
@ -92,6 +92,10 @@ public:
|
|||
// used in template functions as returned Future type
|
||||
template <class Type>
|
||||
using FutureT = ThreadFuture<Type>;
|
||||
// internal use only, return true by default
|
||||
// Only if it's a MultiVersionTransaction and the underlying transaction handler is null,
|
||||
// it will return false
|
||||
virtual bool isValid() { return true; }
|
||||
};
|
||||
|
||||
// An interface that represents a connection to a cluster made by a client
|
||||
|
|
|
@ -877,6 +877,11 @@ void MultiVersionTransaction::reset() {
|
|||
updateTransaction();
|
||||
}
|
||||
|
||||
bool MultiVersionTransaction::isValid() {
|
||||
auto tr = getTransaction();
|
||||
return tr.transaction ? true : false;
|
||||
}
|
||||
|
||||
// MultiVersionDatabase
|
||||
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
|
||||
int threadIdx,
|
||||
|
|
|
@ -388,6 +388,9 @@ public:
|
|||
void addref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
|
||||
|
||||
// return true if the underlying transaction pointer is not empty
|
||||
bool isValid() override;
|
||||
|
||||
private:
|
||||
const Reference<MultiVersionDatabase> db;
|
||||
ThreadSpinLock lock;
|
||||
|
|
Loading…
Reference in New Issue