Merge pull request #2488 from etschannen/feature-addpeer-fix

Deserializing a request stream could unnecessarily mark a peer as healthy
This commit is contained in:
A.J. Beamon 2020-01-06 15:52:47 -08:00 committed by GitHub
commit bdf8c8a7bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 34 deletions

View File

@ -73,9 +73,9 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
// for an endpoint that is waited on changes, the waiter sees its failure status change
auto it = addressStatus.find(address);
//TraceEvent("NotifyFailureStatus").detail("Address", address).detail("Status", status.failed ? "Failed" : "OK").detail("Present", it == addressStatus.end());
if (it == addressStatus.end()) {
if (status != FailureStatus()) {
TraceEvent("NotifyAddressHealthy").suppressFor(1.0).detail("Address", address);
addressStatus[address]=status;
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
}
@ -85,8 +85,14 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
it->second = status;
else
addressStatus.erase(it);
if(triggerEndpoint)
if(triggerEndpoint) {
if(status.failed) {
TraceEvent("NotifyAddressFailed").suppressFor(1.0).detail("Address", address);
} else {
TraceEvent("NotifyAddressHealthyPresent").suppressFor(1.0).detail("Address", address);
}
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
}
}
}

View File

@ -1094,11 +1094,13 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) {
void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
if (!isStream || !endpoint.getPrimaryAddress().isValid())
return;
else if (FlowTransport::transport().isClient())
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
if(peer->peerReferences == -1) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
}
peer->peerReferences = 1;
} else {
peer->peerReferences++;

View File

@ -185,6 +185,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
state Future<Void> secondDelay = Never();
state Promise<Void> requestFinished;
state double startTime = now();
setReplyPriority(request, taskID);
if (!alternatives)
@ -278,6 +279,22 @@ Future< REPLY_TYPE(Request) > loadBalance(
state double backoff = 0;
state bool triedAllOptions = false;
loop {
if(now() - startTime > (g_network->isSimulated() ? 30.0 : 600.0)) {
TraceEvent ev(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LoadBalanceTooLong");
ev.suppressFor(1.0);
ev.detail("Duration", now() - startTime);
ev.detail("NumAttempts", numAttempts);
ev.detail("Backoff", backoff);
ev.detail("TriedAllOptions", triedAllOptions);
if(ev.isEnabled()) {
ev.log();
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
RequestStream<Request> const* thisStream = &alternatives->get( alternativeNum, channel );
TraceEvent(SevWarn, "LoadBalanceTooLongEndpoint").detail("Addr", thisStream->getEndpoint().getPrimaryAddress()).detail("Token", thisStream->getEndpoint().token).detail("Failed", IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed);
}
}
}
// Find an alternative, if any, that is not failed, starting with nextAlt
state RequestStream<Request> const* stream = NULL;
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {

View File

@ -653,13 +653,13 @@ void removeTraceRole(std::string role) {
g_traceLog.removeRole(role);
}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true) {
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {
g_trace_depth++;
setMaxFieldLength(0);
setMaxEventLength(0);
}
TraceEvent::TraceEvent( Severity severity, const char* type, UID id )
: id(id), type(type), severity(severity), initialized(false),
: id(id), type(type), severity(severity), initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
setMaxFieldLength(0);
@ -668,7 +668,7 @@ TraceEvent::TraceEvent( Severity severity, const char* type, UID id )
TraceEvent::TraceEvent( TraceInterval& interval, UID id )
: id(id), type(interval.type),
severity(interval.severity),
initialized(false),
initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity) {
g_trace_depth++;
@ -680,7 +680,7 @@ TraceEvent::TraceEvent( TraceInterval& interval, UID id )
TraceEvent::TraceEvent( Severity severity, TraceInterval& interval, UID id )
: id(id), type(interval.type),
severity(severity),
initialized(false),
initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
@ -701,6 +701,7 @@ bool TraceEvent::init( TraceInterval& interval ) {
}
bool TraceEvent::init() {
ASSERT(!logged);
if(initialized) {
return enabled;
}
@ -765,6 +766,7 @@ bool TraceEvent::init() {
}
TraceEvent& TraceEvent::errorImpl(class Error const& error, bool includeCancelled) {
ASSERT(!logged);
if (error.code() != error_code_actor_cancelled || includeCancelled) {
err = error;
if (initialized) {
@ -847,12 +849,14 @@ TraceEvent& TraceEvent::detailfNoMetric( std::string&& key, const char* valueFor
}
TraceEvent& TraceEvent::trackLatest( const char *trackingKey ){
ASSERT(!logged);
this->trackingKey = trackingKey;
ASSERT( this->trackingKey.size() != 0 && this->trackingKey[0] != '/' && this->trackingKey[0] != '\\');
return *this;
}
TraceEvent& TraceEvent::sample( double sampleRate, bool logSampleRate ) {
ASSERT(!logged);
if(enabled) {
if(initialized) {
TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, std::string(TRACE_EVENT_INVALID_SUPPRESSION).append(type).c_str()).suppressFor(5);
@ -870,6 +874,7 @@ TraceEvent& TraceEvent::sample( double sampleRate, bool logSampleRate ) {
}
TraceEvent& TraceEvent::suppressFor( double duration, bool logSuppressedEventCount ) {
ASSERT(!logged);
if(enabled) {
if(initialized) {
TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, std::string(TRACE_EVENT_INVALID_SUPPRESSION).append(type).c_str()).suppressFor(5);
@ -896,6 +901,7 @@ TraceEvent& TraceEvent::suppressFor( double duration, bool logSuppressedEventCou
}
TraceEvent& TraceEvent::setMaxFieldLength(int maxFieldLength) {
ASSERT(!logged);
if(maxFieldLength == 0) {
this->maxFieldLength = FLOW_KNOBS ? FLOW_KNOBS->MAX_TRACE_FIELD_LENGTH : 495;
}
@ -907,6 +913,7 @@ TraceEvent& TraceEvent::setMaxFieldLength(int maxFieldLength) {
}
TraceEvent& TraceEvent::setMaxEventLength(int maxEventLength) {
ASSERT(!logged);
if(maxEventLength == 0) {
this->maxEventLength = FLOW_KNOBS ? FLOW_KNOBS->MAX_TRACE_EVENT_LENGTH : 4000;
}
@ -934,44 +941,52 @@ unsigned long TraceEvent::CountEventsLoggedAt(Severity sev) {
}
TraceEvent& TraceEvent::backtrace(const std::string& prefix) {
ASSERT(!logged);
if (this->severity == SevError || !enabled) return *this; // We'll backtrace this later in ~TraceEvent
return detail(prefix + "Backtrace", platform::get_backtrace());
}
TraceEvent::~TraceEvent() {
init();
try {
if (enabled) {
if (this->severity == SevError) {
severity = SevInfo;
backtrace();
severity = SevError;
}
void TraceEvent::log() {
if(!logged) {
init();
try {
if (enabled) {
if (this->severity == SevError) {
severity = SevInfo;
backtrace();
severity = SevError;
}
if(isNetworkThread()) {
TraceEvent::eventCounts[severity/10]++;
}
if(isNetworkThread()) {
TraceEvent::eventCounts[severity/10]++;
}
g_traceLog.writeEvent( fields, trackingKey, severity > SevWarnAlways );
g_traceLog.writeEvent( fields, trackingKey, severity > SevWarnAlways );
if (g_traceLog.isOpen()) {
// Log Metrics
if(g_traceLog.logTraceEventMetrics && isNetworkThread()) {
// Get the persistent Event Metric representing this trace event and push the fields (details) accumulated in *this to it and then log() it.
// Note that if the event metric is disabled it won't actually be logged BUT any new fields added to it will be registered.
// If the event IS logged, a timestamp will be returned, if not then 0. Either way, pass it through to be used if possible
// in the Sev* event metrics.
if (g_traceLog.isOpen()) {
// Log Metrics
if(g_traceLog.logTraceEventMetrics && isNetworkThread()) {
// Get the persistent Event Metric representing this trace event and push the fields (details) accumulated in *this to it and then log() it.
// Note that if the event metric is disabled it won't actually be logged BUT any new fields added to it will be registered.
// If the event IS logged, a timestamp will be returned, if not then 0. Either way, pass it through to be used if possible
// in the Sev* event metrics.
uint64_t event_ts = DynamicEventMetric::getOrCreateInstance(format("TraceEvent.%s", type), StringRef(), true)->setFieldsAndLogFrom(tmpEventMetric);
g_traceLog.log(severity, type, id, event_ts);
uint64_t event_ts = DynamicEventMetric::getOrCreateInstance(format("TraceEvent.%s", type), StringRef(), true)->setFieldsAndLogFrom(tmpEventMetric);
g_traceLog.log(severity, type, id, event_ts);
}
}
}
} catch( Error &e ) {
TraceEvent(SevError, "TraceEventLoggingError").error(e,true);
}
} catch( Error &e ) {
TraceEvent(SevError, "TraceEventDestructorError").error(e,true);
delete tmpEventMetric;
g_trace_depth--;
logged = true;
}
delete tmpEventMetric;
g_trace_depth--;
}
TraceEvent::~TraceEvent() {
log();
}
thread_local bool TraceEvent::networkThread = false;

View File

@ -460,6 +460,12 @@ public:
TraceEvent& GetLastError();
bool isEnabled() const {
return enabled;
}
void log();
~TraceEvent(); // Actually logs the event
// Return the number of invocations of TraceEvent() at the specified logging level.
@ -470,6 +476,7 @@ public:
private:
bool initialized;
bool enabled;
bool logged;
std::string trackingKey;
TraceEventFields fields;
Severity severity;