added logging for the ping latencies for all network connections
This commit is contained in:
parent
d927e33ff6
commit
9efda1fec5
|
@ -157,16 +157,7 @@ struct PingReceiver : NetworkMessageReceiver {
|
|||
|
||||
class TransportData {
|
||||
public:
|
||||
TransportData(uint64_t transportId)
|
||||
: endpointNotFoundReceiver(endpoints),
|
||||
pingReceiver(endpoints),
|
||||
warnAlwaysForLargePacket(true),
|
||||
lastIncompatibleMessage(0),
|
||||
transportId(transportId),
|
||||
numIncompatibleConnections(0)
|
||||
{
|
||||
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
|
||||
}
|
||||
TransportData(uint64_t transportId);
|
||||
|
||||
~TransportData();
|
||||
|
||||
|
@ -189,6 +180,7 @@ public:
|
|||
std::vector<Future<Void>> listeners;
|
||||
std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
|
||||
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
|
||||
std::set<NetworkAddress> orderedAddresses;
|
||||
Reference<AsyncVar<bool>> degraded;
|
||||
bool warnAlwaysForLargePacket;
|
||||
|
||||
|
@ -212,8 +204,43 @@ public:
|
|||
uint64_t transportId;
|
||||
|
||||
Future<Void> multiVersionCleanup;
|
||||
Future<Void> pingLogger;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
|
||||
state NetworkAddress lastAddress = NetworkAddress();
|
||||
loop {
|
||||
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
|
||||
if(self->orderedAddresses.size()) {
|
||||
auto it = self->orderedAddresses.upper_bound(lastAddress);
|
||||
if(it == self->orderedAddresses.end()) {
|
||||
it = self->orderedAddresses.begin();
|
||||
}
|
||||
lastAddress = *it;
|
||||
auto peer = self->getPeer(lastAddress);
|
||||
if(peer && peer->totalPingCount > 0) {
|
||||
TraceEvent("PingLatency").detail("PeerAddr", lastAddress).detail("MinLatency", peer->minPingLatency).detail("MaxLatency", peer->maxPingLatency).detail("AvgLatency", peer->totalPingLatency/peer->totalPingCount).detail("Count", peer->totalPingCount);
|
||||
peer->minPingLatency = 1000;
|
||||
peer->maxPingLatency = 0;
|
||||
peer->totalPingLatency = 0;
|
||||
peer->totalPingCount = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TransportData::TransportData(uint64_t transportId)
|
||||
: endpointNotFoundReceiver(endpoints),
|
||||
pingReceiver(endpoints),
|
||||
warnAlwaysForLargePacket(true),
|
||||
lastIncompatibleMessage(0),
|
||||
transportId(transportId),
|
||||
numIncompatibleConnections(0)
|
||||
{
|
||||
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
|
||||
pingLogger = pingLatencyLogger(this);
|
||||
}
|
||||
|
||||
#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
|
||||
#define CONNECT_PACKET_V0_SIZE 14
|
||||
|
||||
|
@ -339,6 +366,7 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
|
|||
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint, true );
|
||||
state int64_t startingBytes = peer->bytesReceived;
|
||||
state int timeouts = 0;
|
||||
state double startTime = now();
|
||||
loop {
|
||||
choose {
|
||||
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
|
||||
|
@ -356,6 +384,11 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
|
|||
timeouts++;
|
||||
}
|
||||
when (wait( reply.getFuture() )) {
|
||||
double pingLatency = now() - startTime;
|
||||
peer->minPingLatency = std::min(peer->minPingLatency, pingLatency);
|
||||
peer->maxPingLatency = std::max(peer->maxPingLatency, pingLatency);
|
||||
peer->totalPingLatency += pingLatency;
|
||||
peer->totalPingCount++;
|
||||
break;
|
||||
}
|
||||
when (wait( peer->resetPing.onTrigger())) {
|
||||
|
@ -542,6 +575,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
|
|||
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
|
||||
self->connect.cancel();
|
||||
self->transport->peers.erase(self->destination);
|
||||
self->transport->orderedAddresses.erase(self->destination);
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -1029,6 +1063,7 @@ Reference<Peer> TransportData::getOrOpenPeer( NetworkAddress const& address, boo
|
|||
peer->connect = connectionKeeper(peer);
|
||||
}
|
||||
peers[address] = peer;
|
||||
orderedAddresses.insert(address);
|
||||
}
|
||||
|
||||
return peer;
|
||||
|
|
|
@ -124,11 +124,16 @@ struct Peer : public ReferenceCounted<Peer> {
|
|||
int64_t bytesReceived;
|
||||
double lastDataPacketSentTime;
|
||||
int outstandingReplies;
|
||||
double minPingLatency;
|
||||
double maxPingLatency;
|
||||
double totalPingLatency;
|
||||
int totalPingCount;
|
||||
|
||||
explicit Peer(TransportData* transport, NetworkAddress const& destination)
|
||||
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
|
||||
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
|
||||
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {}
|
||||
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
|
||||
minPingLatency(1000), maxPingLatency(0), totalPingLatency(0), totalPingCount(0) {}
|
||||
|
||||
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);
|
||||
|
||||
|
|
|
@ -72,6 +72,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( USE_OBJECT_SERIALIZER, 1 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
|
||||
init( PING_LOGGING_INTERVAL, 1.0 );
|
||||
|
||||
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
|
||||
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
|
||||
|
|
|
@ -90,6 +90,7 @@ public:
|
|||
double ALWAYS_ACCEPT_DELAY;
|
||||
int ACCEPT_BATCH_SIZE;
|
||||
int USE_OBJECT_SERIALIZER;
|
||||
double PING_LOGGING_INTERVAL;
|
||||
|
||||
int TLS_CERT_REFRESH_DELAY_SECONDS;
|
||||
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
|
||||
|
|
Loading…
Reference in New Issue