Separating two functions for excluded and failed list. Exposing transaction based interface from Management Api
This commit is contained in:
parent
5f6093b59e
commit
b8251878c4
|
@ -38,7 +38,6 @@
|
|||
#include "fdbrpc/Replication.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
ACTOR static Future<vector<AddressExclusion>> getExcludedServers(Transaction* tr);
|
||||
|
||||
bool isInteger(const std::string& s) {
|
||||
if( s.empty() ) return false;
|
||||
|
@ -1385,11 +1384,9 @@ ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClass
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<vector<AddressExclusion>> getExcludedServers( Transaction* tr ) {
|
||||
ACTOR Future<std::vector<AddressExclusion>> getExcludedServers( Transaction* tr ) {
|
||||
state Standalone<RangeResultRef> r = wait( tr->getRange( excludedServersKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !r.more && r.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
state Standalone<RangeResultRef> r2 = wait( tr->getRange( failedServersKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !r2.more && r2.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
vector<AddressExclusion> exclusions;
|
||||
for(auto i = r.begin(); i != r.end(); ++i) {
|
||||
|
@ -1397,23 +1394,33 @@ ACTOR static Future<vector<AddressExclusion>> getExcludedServers( Transaction* t
|
|||
if (a.isValid())
|
||||
exclusions.push_back( a );
|
||||
}
|
||||
for(auto i = r2.begin(); i != r2.end(); ++i) {
|
||||
auto a = decodeFailedServersKey( i->key );
|
||||
if (a.isValid())
|
||||
exclusions.push_back( a );
|
||||
}
|
||||
uniquify(exclusions);
|
||||
return exclusions;
|
||||
}
|
||||
|
||||
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx ) {
|
||||
ACTOR Future<std::vector<AddressExclusion>> getFailedServers( Transaction* tr ) {
|
||||
state Standalone<RangeResultRef> r = wait( tr->getRange( failedServersKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !r.more && r.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
vector<AddressExclusion> failedServers;
|
||||
for(auto i = r.begin(); i != r.end(); ++i) {
|
||||
auto a = decodeExcludedServersKey( i->key );
|
||||
if (a.isValid())
|
||||
failedServers.push_back( a );
|
||||
}
|
||||
return failedServers;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<AddressExclusion>> getCombinedExcludedServers( Database cx ) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
|
||||
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
vector<AddressExclusion> exclusions = wait( getExcludedServers(&tr) );
|
||||
state std::vector<AddressExclusion> exclusions = wait( getExcludedServers(&tr) );
|
||||
state std::vector<AddressExclusion> failures = wait( getFailedServers(&tr) );
|
||||
exclusions.insert(exclusions.end(), failures.begin(), failures.end());
|
||||
uniquify(exclusions); // sort and uniquify
|
||||
return exclusions;
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
|
@ -1555,62 +1562,65 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> checkForExcludingServersTxActor(Transaction* tr, std::set<AddressExclusion>* exclusions, std::set<NetworkAddress>* inProgressExclusion) {
|
||||
ASSERT(inProgressExclusion->size() == 0); // Make sure every time it is cleared beforehand
|
||||
if (!exclusions->size()) return true;
|
||||
|
||||
tr->setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
|
||||
tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
|
||||
tr->setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
|
||||
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery
|
||||
|
||||
// Check that there aren't any storage servers with addresses violating the exclusions
|
||||
Standalone<RangeResultRef> serverList = wait( tr->getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
state bool ok = true;
|
||||
for(auto& s : serverList) {
|
||||
auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses;
|
||||
if ( addressExcluded(*exclusions, addresses.address) ) {
|
||||
ok = false;
|
||||
inProgressExclusion->insert(addresses.address);
|
||||
}
|
||||
if ( addresses.secondaryAddress.present() && addressExcluded(*exclusions, addresses.secondaryAddress.get()) ) {
|
||||
ok = false;
|
||||
inProgressExclusion->insert(addresses.secondaryAddress.get());
|
||||
}
|
||||
}
|
||||
|
||||
if (ok) {
|
||||
Optional<Standalone<StringRef>> value = wait( tr->get(logsKey) );
|
||||
ASSERT(value.present());
|
||||
auto logs = decodeLogsValue(value.get());
|
||||
for( auto const& log : logs.first ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) {
|
||||
ok = false;
|
||||
inProgressExclusion->insert(log.second);
|
||||
}
|
||||
}
|
||||
for( auto const& log : logs.second ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) {
|
||||
ok = false;
|
||||
inProgressExclusion->insert(log.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> excl,
|
||||
bool waitForAllExcluded) {
|
||||
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
|
||||
state std::set<NetworkAddress> inProgressExclusion;
|
||||
|
||||
if (!excl.size()) return inProgressExclusion;
|
||||
|
||||
loop {
|
||||
state Transaction tr(cx);
|
||||
|
||||
inProgressExclusion.clear();
|
||||
try {
|
||||
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
|
||||
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
|
||||
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
|
||||
|
||||
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery
|
||||
|
||||
// Check that there aren't any storage servers with addresses violating the exclusions
|
||||
Standalone<RangeResultRef> serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
state bool ok = true;
|
||||
inProgressExclusion.clear();
|
||||
for(auto& s : serverList) {
|
||||
auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses;
|
||||
if ( addressExcluded(exclusions, addresses.address) ) {
|
||||
ok = false;
|
||||
inProgressExclusion.insert(addresses.address);
|
||||
}
|
||||
if ( addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get()) ) {
|
||||
ok = false;
|
||||
inProgressExclusion.insert(addresses.secondaryAddress.get());
|
||||
}
|
||||
}
|
||||
|
||||
if (ok) {
|
||||
Optional<Standalone<StringRef>> value = wait( tr.get(logsKey) );
|
||||
ASSERT(value.present());
|
||||
auto logs = decodeLogsValue(value.get());
|
||||
for( auto const& log : logs.first ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
|
||||
ok = false;
|
||||
inProgressExclusion.insert(log.second);
|
||||
}
|
||||
}
|
||||
for( auto const& log : logs.second ) {
|
||||
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
|
||||
ok = false;
|
||||
inProgressExclusion.insert(log.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ok = wait(checkForExcludingServersTxActor(&tr, &exclusions, &inProgressExclusion));
|
||||
if (ok) return inProgressExclusion;
|
||||
if (!waitForAllExcluded) break;
|
||||
|
||||
wait( delayJittered( 1.0 ) ); // SOMEDAY: watches!
|
||||
} catch (Error& e) {
|
||||
wait( tr.onError(e) );
|
||||
|
|
|
@ -153,14 +153,17 @@ ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers,
|
|||
// Set the process class of processes with the given address. A NetworkAddress with a port of 0 means all servers on the given IP.
|
||||
ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClass processClass );
|
||||
|
||||
// Get the current list of excluded servers
|
||||
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx );
|
||||
// Get the current list of excluded and failed servers
|
||||
ACTOR Future<std::vector<AddressExclusion>> getCombinedExcludedServers( Database cx );
|
||||
ACTOR Future<std::vector<AddressExclusion>> getExcludedServers(Transaction* tr);
|
||||
ACTOR Future<std::vector<AddressExclusion>> getFailedServers(Transaction* tr);
|
||||
|
||||
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
|
||||
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
|
||||
// unless any of them are explicitly included with includeServers()
|
||||
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> servers,
|
||||
bool waitForAllExcluded);
|
||||
ACTOR Future<bool> checkForExcludingServersTxActor(Transaction* tr, std::set<AddressExclusion>* exclusions, std::set<NetworkAddress>* inProgressExclusion);
|
||||
|
||||
// Gets a list of all workers in the cluster (excluding testers)
|
||||
ACTOR Future<vector<ProcessData>> getWorkers( Database cx );
|
||||
|
|
Loading…
Reference in New Issue