fix: addPeerReference only marks a connection as healthy if it is the first peerReference
added additional logging to long LoadBalance calls, and when the failure monitor state changes for an address
This commit is contained in:
parent
3c30215662
commit
9e137d3b49
|
@ -73,9 +73,9 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
|
||||||
// for an endpoint that is waited on changes, the waiter sees its failure status change
|
// for an endpoint that is waited on changes, the waiter sees its failure status change
|
||||||
auto it = addressStatus.find(address);
|
auto it = addressStatus.find(address);
|
||||||
|
|
||||||
//TraceEvent("NotifyFailureStatus").detail("Address", address).detail("Status", status.failed ? "Failed" : "OK").detail("Present", it == addressStatus.end());
|
|
||||||
if (it == addressStatus.end()) {
|
if (it == addressStatus.end()) {
|
||||||
if (status != FailureStatus()) {
|
if (status != FailureStatus()) {
|
||||||
|
TraceEvent("NotifyAddressHealthy").suppressFor(1.0).detail("Address", address);
|
||||||
addressStatus[address]=status;
|
addressStatus[address]=status;
|
||||||
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
|
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
|
||||||
}
|
}
|
||||||
|
@ -85,8 +85,14 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat
|
||||||
it->second = status;
|
it->second = status;
|
||||||
else
|
else
|
||||||
addressStatus.erase(it);
|
addressStatus.erase(it);
|
||||||
if(triggerEndpoint)
|
if(triggerEndpoint) {
|
||||||
|
if(status.failed) {
|
||||||
|
TraceEvent("NotifyAddressFailed").suppressFor(1.0).detail("Address", address);
|
||||||
|
} else {
|
||||||
|
TraceEvent("NotifyAddressHealthyPresent").suppressFor(1.0).detail("Address", address);
|
||||||
|
}
|
||||||
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
|
endpointKnownFailed.triggerRange( Endpoint({address}, UID()), Endpoint({address}, UID(-1,-1)) );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1094,11 +1094,13 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) {
|
||||||
void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
|
void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
|
||||||
if (!isStream || !endpoint.getPrimaryAddress().isValid())
|
if (!isStream || !endpoint.getPrimaryAddress().isValid())
|
||||||
return;
|
return;
|
||||||
else if (FlowTransport::transport().isClient())
|
|
||||||
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
|
|
||||||
|
|
||||||
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
|
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
|
||||||
|
|
||||||
if(peer->peerReferences == -1) {
|
if(peer->peerReferences == -1) {
|
||||||
|
if (FlowTransport::transport().isClient()) {
|
||||||
|
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
|
||||||
|
}
|
||||||
peer->peerReferences = 1;
|
peer->peerReferences = 1;
|
||||||
} else {
|
} else {
|
||||||
peer->peerReferences++;
|
peer->peerReferences++;
|
||||||
|
|
|
@ -185,6 +185,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||||
state Future<Void> secondDelay = Never();
|
state Future<Void> secondDelay = Never();
|
||||||
|
|
||||||
state Promise<Void> requestFinished;
|
state Promise<Void> requestFinished;
|
||||||
|
state double startTime = now();
|
||||||
|
|
||||||
setReplyPriority(request, taskID);
|
setReplyPriority(request, taskID);
|
||||||
if (!alternatives)
|
if (!alternatives)
|
||||||
|
@ -278,6 +279,21 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||||
state double backoff = 0;
|
state double backoff = 0;
|
||||||
state bool triedAllOptions = false;
|
state bool triedAllOptions = false;
|
||||||
loop {
|
loop {
|
||||||
|
if(now() - startTime > g_network->isSimulated() ? 30.0 : 600.0) {
|
||||||
|
TraceEvent ev(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LoadBalanceTooLong");
|
||||||
|
ev.suppressFor(1.0);
|
||||||
|
ev.detail("Duration", now() - startTime);
|
||||||
|
ev.detail("NumAttempts", numAttempts);
|
||||||
|
ev.detail("Backoff", backoff);
|
||||||
|
ev.detail("TriedAllOptions", triedAllOptions);
|
||||||
|
if(ev.isEnabled()) {
|
||||||
|
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
|
||||||
|
RequestStream<Request> const* thisStream = &alternatives->get( alternativeNum, channel );
|
||||||
|
TraceEvent(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LoadBalanceTooLongEndpoint").detail("Addr", thisStream->getEndpoint().getPrimaryAddress()).detail("Token", thisStream->getEndpoint().token).detail("Failed", IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Find an alternative, if any, that is not failed, starting with nextAlt
|
// Find an alternative, if any, that is not failed, starting with nextAlt
|
||||||
state RequestStream<Request> const* stream = NULL;
|
state RequestStream<Request> const* stream = NULL;
|
||||||
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
|
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
|
||||||
|
|
|
@ -460,6 +460,10 @@ public:
|
||||||
|
|
||||||
TraceEvent& GetLastError();
|
TraceEvent& GetLastError();
|
||||||
|
|
||||||
|
bool isEnabled() const {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
|
||||||
~TraceEvent(); // Actually logs the event
|
~TraceEvent(); // Actually logs the event
|
||||||
|
|
||||||
// Return the number of invocations of TraceEvent() at the specified logging level.
|
// Return the number of invocations of TraceEvent() at the specified logging level.
|
||||||
|
|
Loading…
Reference in New Issue