Merge pull request #2583 from etschannen/feature-keep-status-connected
Clients should not disconnect from the CC after fetching status
This commit is contained in:
commit
4f1301b2dd
|
@ -1600,9 +1600,9 @@ ACTOR Future<Void> timeWarning( double when, const char* msg ) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkStatus(Future<Void> f, Reference<ClusterConnectionFile> clusterFile, bool displayDatabaseAvailable = true) {
|
||||
ACTOR Future<Void> checkStatus(Future<Void> f, Database db, bool displayDatabaseAvailable = true) {
|
||||
wait(f);
|
||||
StatusObject s = wait(StatusClient::statusFetcher(clusterFile));
|
||||
StatusObject s = wait(StatusClient::statusFetcher(db));
|
||||
printf("\n");
|
||||
printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable);
|
||||
printf("\n");
|
||||
|
@ -1644,7 +1644,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
|||
|
||||
state Optional<ConfigureAutoResult> conf;
|
||||
if( tokens[startToken] == LiteralStringRef("auto") ) {
|
||||
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( ccf )) );
|
||||
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( db )) );
|
||||
if(warn.isValid())
|
||||
warn.cancel();
|
||||
|
||||
|
@ -2061,7 +2061,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
}
|
||||
|
||||
if(!force) {
|
||||
StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( ccf ) ) );
|
||||
StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( db ) ) );
|
||||
|
||||
state std::string errorString = "ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n"
|
||||
"Please try the exclude again in 30 seconds.\n"
|
||||
|
@ -2636,7 +2636,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
|
||||
if (!opt.exec.present()) {
|
||||
if(opt.initialStatusCheck) {
|
||||
Future<Void> checkStatusF = checkStatus(Void(), db->getConnectionFile());
|
||||
Future<Void> checkStatusF = checkStatus(Void(), db);
|
||||
wait(makeInterruptable(success(checkStatusF)));
|
||||
}
|
||||
else {
|
||||
|
@ -2674,7 +2674,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
linenoise.historyAdd(line);
|
||||
}
|
||||
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db->getConnectionFile());
|
||||
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db);
|
||||
|
||||
try {
|
||||
state UID randomID = deterministicRandom()->randomUniqueID();
|
||||
|
@ -2819,7 +2819,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
|
||||
StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db->getConnectionFile())));
|
||||
StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db)));
|
||||
|
||||
if (!opt.exec.present()) printf("\n");
|
||||
printStatus(s, level);
|
||||
|
|
|
@ -1959,8 +1959,8 @@ public:
|
|||
}
|
||||
|
||||
if (!g_network->isSimulated() && !forceAction) {
|
||||
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile()));
|
||||
StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile()));
|
||||
state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src));
|
||||
StatusObject destStatus = wait(StatusClient::statusFetcher(dest));
|
||||
checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName);
|
||||
}
|
||||
|
||||
|
|
|
@ -191,6 +191,10 @@ public:
|
|||
Future<Void> clientInfoMonitor;
|
||||
Future<Void> connected;
|
||||
|
||||
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
|
||||
Future<Void> statusLeaderMon;
|
||||
double lastStatusFetch;
|
||||
|
||||
int apiVersion;
|
||||
|
||||
int mvCacheInsertLocation;
|
||||
|
|
|
@ -46,6 +46,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
|||
init( CLIENT_EXAMPLE_AMOUNT, 20 );
|
||||
init( MAX_CLIENT_STATUS_AGE, 1.0 );
|
||||
init( MAX_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_PROXY_CONNECTIONS = 1;
|
||||
init( STATUS_IDLE_TIMEOUT, 120.0 );
|
||||
|
||||
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ public:
|
|||
int CLIENT_EXAMPLE_AMOUNT;
|
||||
double MAX_CLIENT_STATUS_AGE;
|
||||
int MAX_PROXY_CONNECTIONS;
|
||||
double STATUS_IDLE_TIMEOUT;
|
||||
|
||||
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
|
||||
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
|
||||
|
|
|
@ -1165,8 +1165,8 @@ Optional<Value> getValueFromJSON(StatusObject statusObj) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) {
|
||||
StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile));
|
||||
ACTOR Future<Optional<Value>> getJSON(Database db) {
|
||||
StatusObject statusObj = wait(StatusClient::statusFetcher(db));
|
||||
return getValueFromJSON(statusObj);
|
||||
}
|
||||
|
||||
|
@ -1194,7 +1194,7 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
|
|||
|
||||
if (key == LiteralStringRef("\xff\xff/status/json")){
|
||||
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
|
||||
return getJSON(tr.getDatabase()->getConnectionFile());
|
||||
return getJSON(tr.getDatabase());
|
||||
}
|
||||
else {
|
||||
return Optional<Value>();
|
||||
|
|
|
@ -452,7 +452,7 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead
|
|||
return databaseStatus;
|
||||
}
|
||||
|
||||
ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f ) {
|
||||
ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface) {
|
||||
if (!g_network) throw network_not_setup();
|
||||
|
||||
state StatusObject statusObj;
|
||||
|
@ -462,13 +462,10 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f
|
|||
// This could be read from the JSON but doing so safely is ugly so using a real var.
|
||||
state bool quorum_reachable = false;
|
||||
state int coordinatorsFaultTolerance = 0;
|
||||
state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
|
||||
|
||||
try {
|
||||
state int64_t clientTime = time(0);
|
||||
|
||||
state Future<Void> leaderMon = monitorLeader<ClusterInterface>(f, clusterInterface);
|
||||
|
||||
StatusObject _statusObjClient = wait(clientStatusFetcher(f, &clientMessages, &quorum_reachable, &coordinatorsFaultTolerance));
|
||||
statusObjClient = _statusObjClient;
|
||||
|
||||
|
@ -548,6 +545,23 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f
|
|||
return statusObj;
|
||||
}
|
||||
|
||||
Future<StatusObject> StatusClient::statusFetcher( Reference<ClusterConnectionFile> clusterFile ) {
|
||||
return statusFetcherImpl(clusterFile);
|
||||
ACTOR Future<Void> timeoutMonitorLeader(Database db) {
|
||||
state Future<Void> leadMon = monitorLeader<ClusterInterface>(db->getConnectionFile(), db->statusClusterInterface);
|
||||
loop {
|
||||
wait(delay(CLIENT_KNOBS->STATUS_IDLE_TIMEOUT + 0.00001 + db->lastStatusFetch - now()));
|
||||
if(now() - db->lastStatusFetch > CLIENT_KNOBS->STATUS_IDLE_TIMEOUT) {
|
||||
db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>();
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<StatusObject> StatusClient::statusFetcher( Database db ) {
|
||||
db->lastStatusFetch = now();
|
||||
if(!db->statusClusterInterface) {
|
||||
db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>(new AsyncVar<Optional<ClusterInterface>>);
|
||||
db->statusLeaderMon = timeoutMonitorLeader(db);
|
||||
}
|
||||
|
||||
return statusFetcherImpl(db->getConnectionFile(), db->statusClusterInterface);
|
||||
}
|
||||
|
|
|
@ -23,11 +23,12 @@
|
|||
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/Status.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
|
||||
class StatusClient {
|
||||
public:
|
||||
enum StatusLevel { MINIMAL = 0, NORMAL = 1, DETAILED = 2, JSON = 3 };
|
||||
static Future<StatusObject> statusFetcher(Reference<ClusterConnectionFile> clusterFile);
|
||||
static Future<StatusObject> statusFetcher(Database db);
|
||||
};
|
||||
|
||||
#endif
|
|
@ -46,7 +46,7 @@ struct DDMetricsExcludeWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<double> getMovingDataAmount(Database cx, DDMetricsExcludeWorkload* self) {
|
||||
try {
|
||||
StatusObject statusObj = wait(StatusClient::statusFetcher(cx->getConnectionFile()));
|
||||
StatusObject statusObj = wait(StatusClient::statusFetcher(cx));
|
||||
StatusObjectReader statusObjCluster;
|
||||
((StatusObjectReader)statusObj).get("cluster", statusObjCluster);
|
||||
StatusObjectReader statusObjData;
|
||||
|
|
|
@ -69,7 +69,7 @@ struct StatusWorkload : TestWorkload {
|
|||
if (clientId != 0)
|
||||
return Void();
|
||||
|
||||
return success(timeout(fetcher(cx->getConnectionFile(), this), testDuration));
|
||||
return success(timeout(fetcher(cx, this), testDuration));
|
||||
}
|
||||
virtual Future<bool> check(Database const& cx) {
|
||||
return errors.getValue() == 0;
|
||||
|
@ -161,7 +161,7 @@ struct StatusWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) {
|
||||
ACTOR Future<Void> fetcher(Database cx, StatusWorkload *self) {
|
||||
state double lastTime = now();
|
||||
|
||||
loop{
|
||||
|
@ -170,7 +170,7 @@ struct StatusWorkload : TestWorkload {
|
|||
// Since we count the requests that start, we could potentially never really hear back?
|
||||
++self->requests;
|
||||
state double issued = now();
|
||||
StatusObject result = wait(StatusClient::statusFetcher(connFile));
|
||||
StatusObject result = wait(StatusClient::statusFetcher(cx));
|
||||
++self->replies;
|
||||
BinaryWriter br(AssumeVersion(currentProtocolVersion));
|
||||
save(br, result);
|
||||
|
|
Loading…
Reference in New Issue