diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index d3bd5d2001..fd51114363 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -157,16 +157,7 @@ struct PingReceiver : NetworkMessageReceiver { class TransportData { public: - TransportData(uint64_t transportId) - : endpointNotFoundReceiver(endpoints), - pingReceiver(endpoints), - warnAlwaysForLargePacket(true), - lastIncompatibleMessage(0), - transportId(transportId), - numIncompatibleConnections(0) - { - degraded = Reference>( new AsyncVar(false) ); - } + TransportData(uint64_t transportId); ~TransportData(); @@ -189,6 +180,7 @@ public: std::vector> listeners; std::unordered_map> peers; std::unordered_map> closedPeers; + std::set orderedAddresses; Reference> degraded; bool warnAlwaysForLargePacket; @@ -212,8 +204,43 @@ public: uint64_t transportId; Future multiVersionCleanup; + Future pingLogger; }; +ACTOR Future pingLatencyLogger(TransportData* self) { + state NetworkAddress lastAddress = NetworkAddress(); + loop { + wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL)); + if(self->orderedAddresses.size()) { + auto it = self->orderedAddresses.upper_bound(lastAddress); + if(it == self->orderedAddresses.end()) { + it = self->orderedAddresses.begin(); + } + lastAddress = *it; + auto peer = self->getPeer(lastAddress); + if(peer && peer->totalPingCount > 0) { + TraceEvent("PingLatency").detail("PeerAddr", lastAddress).detail("MinLatency", peer->minPingLatency).detail("MaxLatency", peer->maxPingLatency).detail("AvgLatency", peer->totalPingLatency/peer->totalPingCount).detail("Count", peer->totalPingCount); + peer->minPingLatency = 1000; + peer->maxPingLatency = 0; + peer->totalPingLatency = 0; + peer->totalPingCount = 0; + } + } + } +} + +TransportData::TransportData(uint64_t transportId) + : endpointNotFoundReceiver(endpoints), + pingReceiver(endpoints), + warnAlwaysForLargePacket(true), + lastIncompatibleMessage(0), + transportId(transportId), + numIncompatibleConnections(0) +{ + degraded = Reference>( new AsyncVar(false) ); + pingLogger = pingLatencyLogger(this); +} + #define CONNECT_PACKET_V0 0x0FDB00A444020001LL #define CONNECT_PACKET_V0_SIZE 14 @@ -339,6 +366,7 @@ ACTOR Future connectionMonitor( Reference peer ) { FlowTransport::transport().sendUnreliable( SerializeSource>(reply), remotePingEndpoint, true ); state int64_t startingBytes = peer->bytesReceived; state int timeouts = 0; + state double startTime = now(); loop { choose { when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { @@ -356,6 +384,11 @@ ACTOR Future connectionMonitor( Reference peer ) { timeouts++; } when (wait( reply.getFuture() )) { + double pingLatency = now() - startTime; + peer->minPingLatency = std::min(peer->minPingLatency, pingLatency); + peer->maxPingLatency = std::max(peer->maxPingLatency, pingLatency); + peer->totalPingLatency += pingLatency; + peer->totalPingCount++; break; } when (wait( peer->resetPing.onTrigger())) { @@ -542,6 +575,7 @@ ACTOR Future connectionKeeper( Reference self, TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination); self->connect.cancel(); self->transport->peers.erase(self->destination); + self->transport->orderedAddresses.erase(self->destination); return Void(); } } @@ -1029,6 +1063,7 @@ Reference TransportData::getOrOpenPeer( NetworkAddress const& address, boo peer->connect = connectionKeeper(peer); } peers[address] = peer; + orderedAddresses.insert(address); } return peer; diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 7ff5109a59..27378ff27c 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -124,11 +124,16 @@ struct Peer : public ReferenceCounted { int64_t bytesReceived; double lastDataPacketSentTime; int outstandingReplies; + double minPingLatency; + double maxPingLatency; + double totalPingLatency; + int totalPingCount; explicit Peer(TransportData* transport, NetworkAddress const& destination) : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0), - incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {} + incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()), + minPingLatency(1000), maxPingLatency(0), totalPingLatency(0), totalPingCount(0) {} void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 0a789bbe3d..f4a8fdebfe 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -72,6 +72,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) { init( USE_OBJECT_SERIALIZER, 1 ); init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 ); init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 ); + init( PING_LOGGING_INTERVAL, 1.0 ); init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 ); init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 ); diff --git a/flow/Knobs.h b/flow/Knobs.h index 58afd86699..bcf14707fe 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -90,6 +90,7 @@ public: double ALWAYS_ACCEPT_DELAY; int ACCEPT_BATCH_SIZE; int USE_OBJECT_SERIALIZER; + double PING_LOGGING_INTERVAL; int TLS_CERT_REFRESH_DELAY_SECONDS; double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;