open a connection with processes before attempting to kill them to improve the reliability of the kill process

secondaryAddresses are included in the list of processes which can be killed
This commit is contained in:
Evan Tschannen 2020-01-03 16:10:44 -08:00
parent 032797ca5c
commit 9a3dfec7c5
3 changed files with 41 additions and 9 deletions

View File

@ -2553,6 +2553,27 @@ Future<T> stopNetworkAfter( Future<T> what ) {
}
}
ACTOR Future<Void> addInterface( std::map<Key,std::pair<Value,ClientLeaderRegInterface>>* address_interface, Reference<FlowLock> connectLock, KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when( Optional<LeaderInfo> rep = wait( brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())) ) ) {
StringRef ip_port = kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key;
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 = StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls")) ? full_ip_port2.removeSuffix(LiteralStringRef(":tls")) : full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
}
when( wait(delay(1.0)) ) {}
}
return Void();
}
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;
@ -2563,7 +2584,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state bool writeMode = false;
state std::string clusterConnectString;
state std::map<Key,Value> address_interface;
state std::map<Key,std::pair<Value,ClientLeaderRegInterface>> address_interface;
state FdbOptions globalOptions;
state FdbOptions activeOptions;
@ -2956,10 +2977,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
auto ip_port = it.key.endsWith(LiteralStringRef(":tls")) ? it.key.removeSuffix(LiteralStringRef(":tls")) : it.key;
address_interface[ip_port] = it.value;
addInterfs.push_back(addInterface(&address_interface, connectLock, it));
}
wait( waitForAll(addInterfs) );
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if(address_interface.size() == 0) {
@ -2975,7 +2998,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second);
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to kill. You must run the `kill command before running `kill all.\n");
@ -2993,7 +3016,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]]);
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]].first);
}
printf("Attempted to kill %zu processes\n", tokens.size() - 1);
}
@ -3268,9 +3291,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
address_interface[it.key] = it.value;
addInterfs.push_back(addInterface(&address_interface, connectLock, it));
}
wait( waitForAll(addInterfs) );
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if(address_interface.size() == 0) {
@ -3286,7 +3312,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second);
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to check. You must run the `expensive_data_check command before running `expensive_data_check all.\n");
@ -3304,7 +3330,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]]);
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]].first);
}
printf("Attempted to kill and check %zu processes\n", tokens.size() - 1);
}

View File

@ -197,6 +197,9 @@ ClientKnobs::ClientKnobs(bool randomize) {
}
init(CSI_STATUS_DELAY, 10.0 );
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
//fdbcli
init( CLI_CONNECT_PARALLELISM, 20 );
}

View File

@ -189,6 +189,9 @@ public:
int CONSISTENCY_CHECK_RATE_LIMIT_MAX;
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME;
//fdbcli
int CLI_CONNECT_PARALLELISM;
ClientKnobs(bool randomize = false);
};