Merge pull request #1852 from vishesh/task/issue-1840-non-blocking-exclusion
fdbcli: Add `no_wait` option in `exclude` command to avoid blocking
This commit is contained in:
commit
f6183df8b9
|
@ -479,10 +479,12 @@ void initHelp() {
|
|||
"coordinators auto|<ADDRESS>+ [description=new_cluster_description]",
|
||||
"change cluster coordinators or description",
|
||||
"If 'auto' is specified, coordinator addresses will be choosen automatically to support the configured redundancy level. (If the current set of coordinators are healthy and already support the redundancy level, nothing will be changed.)\n\nOtherwise, sets the coordinators to the list of IP:port pairs specified by <ADDRESS>+. An fdbserver process must be running on each of the specified addresses.\n\ne.g. coordinators 10.0.0.1:4000 10.0.0.2:4000 10.0.0.3:4000\n\nIf 'description=desc' is specified then the description field in the cluster\nfile is changed to desc, which must match [A-Za-z0-9_]+.");
|
||||
helpMap["exclude"] = CommandHelp(
|
||||
"exclude <ADDRESS>*",
|
||||
"exclude servers from the database",
|
||||
"If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or IP:port pair in <ADDRESS>*, adds the address to the set of excluded servers then waits until all database state has been safely moved away from the specified servers.");
|
||||
helpMap["exclude"] =
|
||||
CommandHelp("exclude [no_wait] <ADDRESS>*", "exclude servers from the database",
|
||||
"If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or "
|
||||
"IP:port pair in <ADDRESS>*, adds the address to the set of excluded servers then waits until all "
|
||||
"database state has been safely moved away from the specified servers. If 'no_wait' is set, the "
|
||||
"command returns \nimmediately without checking if the exclusions have completed successfully.");
|
||||
helpMap["include"] = CommandHelp(
|
||||
"include all|<ADDRESS>*",
|
||||
"permit previously-excluded servers to rejoin the database",
|
||||
|
@ -1985,13 +1987,18 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
printf("To find out whether it is safe to remove one or more of these\n"
|
||||
"servers from the cluster, type `exclude <addresses>'.\n"
|
||||
"To return one of these servers to the cluster, type `include <addresses>'.\n");
|
||||
|
||||
return false;
|
||||
} else {
|
||||
state std::vector<AddressExclusion> addresses;
|
||||
state std::set<AddressExclusion> exclusions;
|
||||
bool force = false;
|
||||
state bool waitForAllExcluded = true;
|
||||
for(auto t = tokens.begin()+1; t != tokens.end(); ++t) {
|
||||
if(*t == LiteralStringRef("FORCE")) {
|
||||
force = true;
|
||||
} else if (*t == LiteralStringRef("no_wait")) {
|
||||
waitForAllExcluded = false;
|
||||
} else {
|
||||
auto a = AddressExclusion::parse( *t );
|
||||
if (!a.isValid()) {
|
||||
|
@ -2093,13 +2100,16 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
|
||||
wait( makeInterruptable(excludeServers(db,addresses)) );
|
||||
|
||||
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
|
||||
printf("(Interrupting this wait with CTRL+C will not cancel the data movement.)\n");
|
||||
if (waitForAllExcluded) {
|
||||
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
|
||||
printf("(Interrupting this wait with CTRL+C will not cancel the data movement.)\n");
|
||||
}
|
||||
|
||||
if(warn.isValid())
|
||||
warn.cancel();
|
||||
wait( makeInterruptable(waitForExcludedServers(db,addresses)) );
|
||||
|
||||
state std::set<NetworkAddress> notExcludedServers =
|
||||
wait(makeInterruptable(checkForExcludingServers(db, addresses, waitForAllExcluded)));
|
||||
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
|
||||
std::map<IPAddress, std::set<uint16_t>> workerPorts;
|
||||
for(auto addr : workers)
|
||||
|
@ -2126,9 +2136,18 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
}
|
||||
|
||||
printf("\n");
|
||||
}
|
||||
else
|
||||
} else if (notExcludedServers.empty()) {
|
||||
printf("\nIt is now safe to remove these machines or processes from the cluster.\n");
|
||||
} else {
|
||||
printf("\nWARNING: Exclusion in progress. It is not safe to remove the following machines\n"
|
||||
"or processes from the cluster:\n");
|
||||
for (auto addr : notExcludedServers) {
|
||||
if (addr.port == 0)
|
||||
printf(" %s\n", addr.ip.toString().c_str());
|
||||
else
|
||||
printf(" %s\n", addr.toString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
bool foundCoordinator = false;
|
||||
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
|
||||
|
@ -2142,8 +2161,9 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
if (foundCoordinator)
|
||||
printf("Type `help coordinators' for information on how to change the\n"
|
||||
"cluster's coordination servers before removing them.\n");
|
||||
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<bool> createSnapshot(Database db, StringRef snapCmd) {
|
||||
|
|
|
@ -1420,13 +1420,16 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion> excl ) {
|
||||
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> excl,
|
||||
bool waitForAllExcluded) {
|
||||
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
|
||||
state std::set<NetworkAddress> inProgressExclusion;
|
||||
|
||||
if (!excl.size()) return Void();
|
||||
if (!excl.size()) return inProgressExclusion;
|
||||
|
||||
loop {
|
||||
state Transaction tr(cx);
|
||||
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
|
||||
|
@ -1439,11 +1442,12 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
|
|||
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
state bool ok = true;
|
||||
inProgressExclusion.clear();
|
||||
for(auto& s : serverList) {
|
||||
auto addr = decodeServerListValue( s.value ).address();
|
||||
if ( addressExcluded(exclusions, addr) ) {
|
||||
ok = false;
|
||||
break;
|
||||
inProgressExclusion.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1454,24 +1458,27 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
|
|||
for( auto const& log : logs.first ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
|
||||
ok = false;
|
||||
break;
|
||||
inProgressExclusion.insert(log.second);
|
||||
}
|
||||
}
|
||||
for( auto const& log : logs.second ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
|
||||
ok = false;
|
||||
break;
|
||||
inProgressExclusion.insert(log.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ok) return Void();
|
||||
if (ok) return inProgressExclusion;
|
||||
if (!waitForAllExcluded) break;
|
||||
|
||||
wait( delayJittered( 1.0 ) ); // SOMEDAY: watches!
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
|
||||
return inProgressExclusion;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> mgmtSnapCreate(Database cx, StringRef snapCmd) {
|
||||
|
|
|
@ -154,9 +154,11 @@ ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClas
|
|||
// Get the current list of excluded servers
|
||||
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx );
|
||||
|
||||
// Wait for the given, previously excluded servers to be evacuated (no longer used for state). Once this returns it is safe to shut down all such
|
||||
// machines without impacting fault tolerance, until and unless any of them are explicitly included with includeServers()
|
||||
ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion> servers );
|
||||
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
|
||||
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
|
||||
// unless any of them are explicitly included with includeServers()
|
||||
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> servers,
|
||||
bool waitForAllExcluded);
|
||||
|
||||
// Gets a list of all workers in the cluster (excluding testers)
|
||||
ACTOR Future<vector<ProcessData>> getWorkers( Database cx );
|
||||
|
|
|
@ -413,7 +413,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
if (toKill.size()) {
|
||||
// Wait for removal to be safe
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Wait For Server Exclusion").detail("Addresses", describe(toKill)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
wait( waitForExcludedServers( cx, toKillArray ) );
|
||||
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
|
||||
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "coordinators auto").detail("DesiredCoordinators", g_simulator.desiredCoordinators).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
|
|
Loading…
Reference in New Issue