Format FailureMonitor* files

This commit is contained in:
Vishesh Yadav 2020-01-09 14:26:05 -08:00
parent 760dc68b7f
commit 04f925f770
2 changed files with 94 additions and 79 deletions

View File

@ -19,55 +19,56 @@
*/
#include "fdbrpc/FailureMonitor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> waitForStateEqual( IFailureMonitor* monitor, Endpoint endpoint, FailureStatus status ) {
ACTOR Future<Void> waitForStateEqual(IFailureMonitor* monitor, Endpoint endpoint, FailureStatus status) {
loop {
Future<Void> change = monitor->onStateChanged(endpoint);
if (monitor->getState(endpoint) == status)
return Void();
wait( change );
if (monitor->getState(endpoint) == status) return Void();
wait(change);
}
}
ACTOR Future<Void> waitForContinuousFailure( IFailureMonitor* monitor, Endpoint endpoint, double sustainedFailureDuration, double slope ) {
ACTOR Future<Void> waitForContinuousFailure(IFailureMonitor* monitor, Endpoint endpoint,
double sustainedFailureDuration, double slope) {
state double startT = now();
loop {
wait( monitor->onFailed( endpoint ) );
if(monitor->permanentlyFailed(endpoint))
return Void();
wait(monitor->onFailed(endpoint));
if (monitor->permanentlyFailed(endpoint)) return Void();
// X == sustainedFailureDuration + slope * (now()-startT+X)
double waitDelay = (sustainedFailureDuration + slope * (now()-startT)) / (1-slope);
double waitDelay = (sustainedFailureDuration + slope * (now() - startT)) / (1 - slope);
//SOMEDAY: if we know that this process is a server or client we can tune this optimization better
if(waitDelay < std::min(FLOW_KNOBS->CLIENT_REQUEST_INTERVAL, FLOW_KNOBS->SERVER_REQUEST_INTERVAL)) //We will not get a failure monitoring update in this amount of time, so there is no point in waiting for changes
// SOMEDAY: if we know that this process is a server or client we can tune this optimization better
if (waitDelay <
std::min(FLOW_KNOBS->CLIENT_REQUEST_INTERVAL,
FLOW_KNOBS->SERVER_REQUEST_INTERVAL)) // We will not get a failure monitoring update in this amount
// of time, so there is no point in waiting for changes
waitDelay = 0;
choose {
when (wait( monitor->onStateEqual( endpoint, FailureStatus(false) ) )) {} // SOMEDAY: Use onStateChanged() for efficiency
when (wait( delay(waitDelay) )) {
return Void();
}
when(wait(monitor->onStateEqual(endpoint, FailureStatus(false)))) {
} // SOMEDAY: Use onStateChanged() for efficiency
when(wait(delay(waitDelay))) { return Void(); }
}
}
}
Future<Void> IFailureMonitor::onStateEqual( Endpoint const& endpoint, FailureStatus status ) {
if ( status == getState(endpoint) ) return Void();
Future<Void> IFailureMonitor::onStateEqual(Endpoint const& endpoint, FailureStatus status) {
if (status == getState(endpoint)) return Void();
return waitForStateEqual(this, endpoint, status);
}
Future<Void> IFailureMonitor::onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double slope ) {
ASSERT( slope < 1.0 );
return waitForContinuousFailure( this, endpoint, sustainedFailureDuration, slope );
Future<Void> IFailureMonitor::onFailedFor(Endpoint const& endpoint, double sustainedFailureDuration, double slope) {
ASSERT(slope < 1.0);
return waitForContinuousFailure(this, endpoint, sustainedFailureDuration, slope);
}
void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStatus const& status ) {
void SimpleFailureMonitor::setStatus(NetworkAddress const& address, FailureStatus const& status) {
//if (status.failed)
// printf("On machine '%s': Machine '%s' is failed\n", g_network->getLocalAddress().toString().c_str(), address.toString().c_str());
//printf("%s.setState(%s, %s) %p\n", g_network->getLocalAddress().toString(), address.toString(), status.failed ? "FAILED" : "OK", this);
//addressStatus.set( address, status );
// if (status.failed)
// printf("On machine '%s': Machine '%s' is failed\n", g_network->getLocalAddress().toString().c_str(),
// address.toString().c_str()); printf("%s.setState(%s, %s) %p\n", g_network->getLocalAddress().toString(),
// address.toString(), status.failed ? "FAILED" : "OK", this); addressStatus.set( address, status );
// onStateChanged() will be waiting on endpointKnownFailed only where it is false, so if the address status
// for an endpoint that is waited on changes, the waiter sees its failure status change
@ -96,22 +97,29 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
}
}
void SimpleFailureMonitor::endpointNotFound( Endpoint const& endpoint ) {
void SimpleFailureMonitor::endpointNotFound(Endpoint const& endpoint) {
// SOMEDAY: Expiration (this "leaks" memory)
if(endpoint.token.first() == -1) {
TraceEvent("WellKnownEndpointNotFound").suppressFor(1.0).detail("Address", endpoint.getPrimaryAddress()).detail("TokenFirst", endpoint.token.first()).detail("TokenSecond", endpoint.token.second());
if (endpoint.token.first() == -1) {
TraceEvent("WellKnownEndpointNotFound")
.suppressFor(1.0)
.detail("Address", endpoint.getPrimaryAddress())
.detail("TokenFirst", endpoint.token.first())
.detail("TokenSecond", endpoint.token.second());
return;
}
TraceEvent("EndpointNotFound").suppressFor(1.0).detail("Address", endpoint.getPrimaryAddress()).detail("Token", endpoint.token);
endpointKnownFailed.set( endpoint, true );
TraceEvent("EndpointNotFound")
.suppressFor(1.0)
.detail("Address", endpoint.getPrimaryAddress())
.detail("Token", endpoint.token);
endpointKnownFailed.set(endpoint, true);
}
void SimpleFailureMonitor::notifyDisconnect( NetworkAddress const& address ) {
void SimpleFailureMonitor::notifyDisconnect(NetworkAddress const& address) {
//TraceEvent("NotifyDisconnect").detail("Address", address);
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
endpointKnownFailed.triggerRange(Endpoint({ address }, UID()), Endpoint({ address }, UID(-1, -1)));
}
Future<Void> SimpleFailureMonitor::onDisconnectOrFailure( Endpoint const& endpoint ) {
Future<Void> SimpleFailureMonitor::onDisconnectOrFailure(Endpoint const& endpoint) {
// If the endpoint or address is already failed, return right away
auto i = addressStatus.find(endpoint.getPrimaryAddress());
if (i == addressStatus.end() || i->second.isFailed() || endpointKnownFailed.get(endpoint)) {
@ -120,12 +128,12 @@ Future<Void> SimpleFailureMonitor::onDisconnectOrFailure( Endpoint const& endpoi
}
// Return when the endpoint is triggered, which means that either the endpoint has become known failed, or the
// address has changed state (and since it was previously not failed, it must now be failed), or notifyDisconnect()
// has been called.
// address has changed state (and since it was previously not failed, it must now be failed), or
// notifyDisconnect() has been called.
return endpointKnownFailed.onChange(endpoint);
}
Future<Void> SimpleFailureMonitor::onStateChanged( Endpoint const& endpoint ) {
Future<Void> SimpleFailureMonitor::onStateChanged(Endpoint const& endpoint) {
// Wait on endpointKnownFailed if it is false, to pick up both endpointNotFound errors (which set it to true)
// and changes to addressStatus (which trigger a range). Don't wait on endpointKnownFailed if it is true, because
// failure status for that endpoint can never change (and we could be spuriously triggered by setStatus)
@ -137,36 +145,42 @@ Future<Void> SimpleFailureMonitor::onStateChanged( Endpoint const& endpoint ) {
return endpointKnownFailed.onChange(endpoint);
}
FailureStatus SimpleFailureMonitor::getState( Endpoint const& endpoint ) {
FailureStatus SimpleFailureMonitor::getState(Endpoint const& endpoint) {
if (endpointKnownFailed.get(endpoint))
return FailureStatus(true);
else {
auto a = addressStatus.find(endpoint.getPrimaryAddress());
if (a == addressStatus.end()) return FailureStatus();
else return a->second;
//printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.address.toString(), a.failed ? "FAILED" : "OK", this);
if (a == addressStatus.end())
return FailureStatus();
else
return a->second;
// printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.address.toString(),
// a.failed ? "FAILED" : "OK", this);
}
}
FailureStatus SimpleFailureMonitor::getState( NetworkAddress const& address ) {
FailureStatus SimpleFailureMonitor::getState(NetworkAddress const& address) {
auto a = addressStatus.find(address);
if (a == addressStatus.end()) return FailureStatus();
else return a->second;
if (a == addressStatus.end())
return FailureStatus();
else
return a->second;
}
bool SimpleFailureMonitor::onlyEndpointFailed( Endpoint const& endpoint ) {
if(!endpointKnownFailed.get(endpoint))
return false;
bool SimpleFailureMonitor::onlyEndpointFailed(Endpoint const& endpoint) {
if (!endpointKnownFailed.get(endpoint)) return false;
auto a = addressStatus.find(endpoint.getPrimaryAddress());
if (a == addressStatus.end()) return true;
else return !a->second.failed;
if (a == addressStatus.end())
return true;
else
return !a->second.failed;
}
bool SimpleFailureMonitor::permanentlyFailed( Endpoint const& endpoint ) {
bool SimpleFailureMonitor::permanentlyFailed(Endpoint const& endpoint) {
return endpointKnownFailed.get(endpoint);
}
void SimpleFailureMonitor::reset() {
addressStatus = std::unordered_map< NetworkAddress, FailureStatus >();
addressStatus = std::unordered_map<NetworkAddress, FailureStatus>();
endpointKnownFailed.resetNoWaiting();
}

View File

@ -76,8 +76,8 @@ struct FailureStatus {
bool isFailed() const { return failed; }
bool isAvailable() const { return !failed; }
bool operator == (FailureStatus const& r) const { return failed == r.failed; }
bool operator != (FailureStatus const& r) const { return failed != r.failed; }
bool operator==(FailureStatus const& r) const { return failed == r.failed; }
bool operator!=(FailureStatus const& r) const { return failed != r.failed; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, failed);
@ -87,43 +87,43 @@ struct FailureStatus {
class IFailureMonitor {
public:
// Returns the currently known status for the endpoint
virtual FailureStatus getState( Endpoint const& endpoint ) = 0;
virtual FailureStatus getState(Endpoint const& endpoint) = 0;
// Returns the currently known status for the address
virtual FailureStatus getState( NetworkAddress const& address ) = 0;
virtual FailureStatus getState(NetworkAddress const& address) = 0;
// Only use this function when the endpoint is known to be failed
virtual void endpointNotFound( Endpoint const& ) = 0;
virtual void endpointNotFound(Endpoint const&) = 0;
// The next time the known status for the endpoint changes, returns the new status.
virtual Future<Void> onStateChanged( Endpoint const& endpoint ) = 0;
virtual Future<Void> onStateChanged(Endpoint const& endpoint) = 0;
// Returns when onFailed(endpoint) || transport().onDisconnect( endpoint.getPrimaryAddress() ), but more efficiently
virtual Future<Void> onDisconnectOrFailure( Endpoint const& endpoint ) = 0;
virtual Future<Void> onDisconnectOrFailure(Endpoint const& endpoint) = 0;
// Returns true if the endpoint is failed but the address of the endpoint is not failed.
virtual bool onlyEndpointFailed( Endpoint const& endpoint ) = 0;
virtual bool onlyEndpointFailed(Endpoint const& endpoint) = 0;
// Returns true if the endpoint will never become available.
virtual bool permanentlyFailed( Endpoint const& endpoint ) = 0;
virtual bool permanentlyFailed(Endpoint const& endpoint) = 0;
// Called by FlowTransport when a connection closes and a prior request or reply might be lost
virtual void notifyDisconnect( NetworkAddress const& ) = 0;
virtual void notifyDisconnect(NetworkAddress const&) = 0;
// Called to update the failure status of network address directly when running client.
virtual void setStatus(NetworkAddress const& address, FailureStatus const& status) = 0;
// Returns when the known status of endpoint is next equal to status. Returns immediately
// if appropriate.
Future<Void> onStateEqual( Endpoint const& endpoint, FailureStatus status );
Future<Void> onStateEqual(Endpoint const& endpoint, FailureStatus status);
// Returns when the status of the given endpoint is next considered "failed"
Future<Void> onFailed( Endpoint const& endpoint ) {
return onStateEqual( endpoint, FailureStatus() );
}
Future<Void> onFailed(Endpoint const& endpoint) { return onStateEqual(endpoint, FailureStatus()); }
// Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope)
Future<Void> onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 );
// Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration +
// (elapsedTime*sustainedFailureSlope)
Future<Void> onFailedFor(Endpoint const& endpoint, double sustainedFailureDuration,
double sustainedFailureSlope = 0.0);
// Returns the failure monitor that the calling machine should use
static IFailureMonitor& failureMonitor() {
@ -137,22 +137,23 @@ public:
class SimpleFailureMonitor : public IFailureMonitor {
public:
SimpleFailureMonitor() : endpointKnownFailed() { }
void setStatus( NetworkAddress const& address, FailureStatus const& status );
void endpointNotFound( Endpoint const& );
virtual void notifyDisconnect( NetworkAddress const& );
SimpleFailureMonitor() : endpointKnownFailed() {}
void setStatus(NetworkAddress const& address, FailureStatus const& status);
void endpointNotFound(Endpoint const&);
virtual void notifyDisconnect(NetworkAddress const&);
virtual Future<Void> onStateChanged( Endpoint const& endpoint );
virtual FailureStatus getState( Endpoint const& endpoint );
virtual FailureStatus getState( NetworkAddress const& address );
virtual Future<Void> onDisconnectOrFailure( Endpoint const& endpoint );
virtual bool onlyEndpointFailed( Endpoint const& endpoint );
virtual bool permanentlyFailed( Endpoint const& endpoint );
virtual Future<Void> onStateChanged(Endpoint const& endpoint);
virtual FailureStatus getState(Endpoint const& endpoint);
virtual FailureStatus getState(NetworkAddress const& address);
virtual Future<Void> onDisconnectOrFailure(Endpoint const& endpoint);
virtual bool onlyEndpointFailed(Endpoint const& endpoint);
virtual bool permanentlyFailed(Endpoint const& endpoint);
void reset();
private:
std::unordered_map< NetworkAddress, FailureStatus > addressStatus;
YieldedAsyncMap< Endpoint, bool > endpointKnownFailed;
std::unordered_map<NetworkAddress, FailureStatus> addressStatus;
YieldedAsyncMap<Endpoint, bool> endpointKnownFailed;
friend class OnStateChangedActorActor;
};