fdbrpc: Reduced connection monitoring from clients
This patch does two changes to connection monitoring: 1. Connection monitoring at client side will check if the connection has been stayed idle for some time. If connection is unused for a while, we close the connection. There is some weirdness involved here as ping messages are by themselves are connection traffic. We get over this by making it two-phase process, first being checking idle reliable traffic, followed by disabling pings and then checking for idle unreliable traffic. 2. Connection monitoring of clients from server will no longer send pings to clients. Instead, it keep monitor the received bytes and close after certain period of inactivity.
This commit is contained in:
parent
7647d3e3c0
commit
867986cdea
|
@ -305,15 +305,18 @@ struct Peer : NonCopyable {
|
|||
int peerReferences;
|
||||
bool incompatibleProtocolVersionNewer;
|
||||
int64_t bytesReceived;
|
||||
double lastSentTime;
|
||||
|
||||
explicit Peer( TransportData* transport, NetworkAddress const& destination )
|
||||
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME),
|
||||
compatible(true), incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0)
|
||||
compatible(true), incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastSentTime(now())
|
||||
{
|
||||
connect = connectionKeeper(this);
|
||||
}
|
||||
|
||||
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
|
||||
if (rp)
|
||||
lastSentTime = now();
|
||||
unsent.setWriteBuffer(pb);
|
||||
if (rp) reliable.insert(rp);
|
||||
if (firstUnsent) dataToSend.trigger();
|
||||
|
@ -396,17 +399,47 @@ struct Peer : NonCopyable {
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> connectionMonitor( Peer *peer ) {
|
||||
state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
|
||||
if (!peer->destination.isPublic()) {
|
||||
// Don't send ping messages to clients. Instead monitor incoming client pings.
|
||||
state double lastRefreshed = now();
|
||||
state int64_t lastBytesReceived = peer->bytesReceived;
|
||||
loop {
|
||||
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
|
||||
if (lastBytesReceived < peer->bytesReceived) {
|
||||
lastRefreshed = now();
|
||||
lastBytesReceived = peer->bytesReceived;
|
||||
} else if (lastRefreshed < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT*2.5) {
|
||||
throw connection_idle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
|
||||
loop {
|
||||
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
|
||||
const bool pendingPacketsEmpty = peer->reliable.empty() && peer->unsent.empty();
|
||||
|
||||
if (peer->peerReferences == 0 && pendingPacketsEmpty) {
|
||||
throw connection_unreferenced();
|
||||
}
|
||||
|
||||
wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) );
|
||||
// TODO: Investigate connection idling at server-side peer too.
|
||||
const bool monitorStateActive = peer->destination.isPublic() &&
|
||||
(peer->lastSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT) &&
|
||||
(peer->lastConnectTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT);
|
||||
if (!monitorStateActive) {
|
||||
choose {
|
||||
when(wait(peer->dataToSend.onTrigger())){
|
||||
peer->lastSentTime = now();
|
||||
}
|
||||
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT))) {
|
||||
throw connection_idle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SOMEDAY: Stop monitoring and close the connection after a long period of inactivity with no reliable or onDisconnect requests outstanding
|
||||
wait (delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
|
||||
|
||||
// TODO: Stop monitoring and close the connection with no onDisconnect requests outstanding
|
||||
state ReplyPromise<Void> reply;
|
||||
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint );
|
||||
state int64_t startingBytes = peer->bytesReceived;
|
||||
|
@ -414,12 +447,17 @@ struct Peer : NonCopyable {
|
|||
loop {
|
||||
choose {
|
||||
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
|
||||
// TODO: Since server will not ping clients (but will respond to incoming pings), is this
|
||||
// a safe metric, or instead we should fail after multiple timeouts?
|
||||
if(startingBytes == peer->bytesReceived) {
|
||||
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination);
|
||||
throw connection_failed();
|
||||
}
|
||||
if(timeouts > 1) {
|
||||
TraceEvent(SevWarnAlways, "ConnectionSlowPing").suppressFor(1.0).detail("WithAddr", peer->destination).detail("Timeouts", timeouts);
|
||||
TraceEvent(SevWarnAlways, "ConnectionSlowPing")
|
||||
.suppressFor(1.0)
|
||||
.detail("WithAddr", peer->destination)
|
||||
.detail("Timeouts", timeouts);
|
||||
}
|
||||
startingBytes = peer->bytesReceived;
|
||||
timeouts++;
|
||||
|
@ -550,14 +588,21 @@ struct Peer : NonCopyable {
|
|||
self->discardUnreliablePackets();
|
||||
reader = Future<Void>();
|
||||
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
|
||||
e.code() == error_code_connection_unreferenced ||
|
||||
e.code() == error_code_connection_unreferenced || e.code() == error_code_connection_idle ||
|
||||
(g_network->isSimulated() && e.code() == error_code_checksum_failed);
|
||||
|
||||
if(self->compatible) {
|
||||
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
|
||||
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID())
|
||||
.error(e, true)
|
||||
.suppressFor(1.0)
|
||||
.detail("PeerAddr", self->destination);
|
||||
}
|
||||
else {
|
||||
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
|
||||
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed",
|
||||
conn ? conn->getDebugID() : UID())
|
||||
.error(e, true)
|
||||
.suppressFor(1.0)
|
||||
.detail("PeerAddr", self->destination);
|
||||
}
|
||||
|
||||
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
|
||||
|
@ -565,20 +610,25 @@ struct Peer : NonCopyable {
|
|||
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
|
||||
it.first = now();
|
||||
} else if(now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) {
|
||||
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID()).suppressFor(5.0).detail("PeerAddr", self->destination);
|
||||
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID())
|
||||
.suppressFor(5.0)
|
||||
.detail("PeerAddr", self->destination);
|
||||
self->transport->degraded->set(true);
|
||||
}
|
||||
it.second = now();
|
||||
}
|
||||
|
||||
if (conn) {
|
||||
if (FlowTransport::transport().isClient()) {
|
||||
if (FlowTransport::transport().isClient() && e.code() != error_code_connection_idle) {
|
||||
clientReconnectDelay = true;
|
||||
}
|
||||
conn->close();
|
||||
conn = Reference<IConnection>();
|
||||
}
|
||||
IFailureMonitor::failureMonitor().notifyDisconnect( self->destination ); //< Clients might send more packets in response, which needs to go out on the next connection
|
||||
|
||||
// Clients might send more packets in response, which needs to go out on the next connection
|
||||
IFailureMonitor::failureMonitor().notifyDisconnect( self->destination );
|
||||
|
||||
if (e.code() == error_code_actor_cancelled) throw;
|
||||
// Try to recover, even from serious errors, by retrying
|
||||
|
||||
|
|
|
@ -381,8 +381,13 @@ private:
|
|||
ACTOR static Future<Void> trackLeakedConnection( Sim2Conn* self ) {
|
||||
wait( g_simulator.onProcess( self->process ) );
|
||||
// SOMEDAY: Make this value variable? Dependent on buggification status?
|
||||
wait( delay( 20.0 ) );
|
||||
TraceEvent(SevError, "LeakedConnection", self->dbgid).error(connection_leaked()).detail("MyAddr", self->process->address).detail("PeerAddr", self->peerEndpoint).detail("PeerId", self->peerId).detail("Opened", self->opened);
|
||||
wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT * 4 ) );
|
||||
TraceEvent(SevError, "LeakedConnection", self->dbgid)
|
||||
.error(connection_leaked())
|
||||
.detail("MyAddr", self->process->address)
|
||||
.detail("PeerAddr", self->peerEndpoint)
|
||||
.detail("PeerId", self->peerId)
|
||||
.detail("Opened", self->opened);
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -55,6 +55,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
//connectionMonitor
|
||||
init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0;
|
||||
init( CONNECTION_MONITOR_TIMEOUT, isSimulated ? 1.50 : 2.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_TIMEOUT = 6.0;
|
||||
init( CONNECTION_MONITOR_IDLE_TIMEOUT, 10.0 );
|
||||
|
||||
//FlowTransport
|
||||
init( CONNECTION_REJECTED_MESSAGE_DELAY, 1.0 );
|
||||
|
|
|
@ -73,6 +73,7 @@ public:
|
|||
//connectionMonitor
|
||||
double CONNECTION_MONITOR_LOOP_TIME;
|
||||
double CONNECTION_MONITOR_TIMEOUT;
|
||||
double CONNECTION_MONITOR_IDLE_TIMEOUT;
|
||||
|
||||
//FlowTransport
|
||||
double CONNECTION_REJECTED_MESSAGE_DELAY;
|
||||
|
|
|
@ -69,6 +69,7 @@ ERROR( transaction_not_permitted, 1045, "Operation not permitted")
|
|||
ERROR( cluster_not_fully_recovered, 1046, "Cluster not fully recovered")
|
||||
ERROR( txn_exec_log_anti_quorum, 1047, "Execute Transaction not supported when log anti quorum is configured")
|
||||
ERROR( connection_unreferenced, 1048, "No peer references for connection" )
|
||||
ERROR( connection_idle, 1049, "Connection closed after idle timeout" )
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
Loading…
Reference in New Issue