2017-05-26 04:48:44 +08:00
/*
* FlowTransport . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbrpc/FlowTransport.h"
2017-05-26 04:48:44 +08:00
2019-06-25 05:40:43 +08:00
# include <unordered_map>
2017-10-28 07:54:44 +08:00
# if VALGRIND
# include <memcheck.h>
# endif
2019-06-25 05:40:43 +08:00
2020-01-14 10:19:30 +08:00
# include "flow/crc32c.h"
2019-06-25 05:40:43 +08:00
# include "fdbrpc/fdbrpc.h"
# include "fdbrpc/FailureMonitor.h"
2020-02-18 13:54:05 +08:00
# include "fdbrpc/HealthMonitor.h"
2019-06-25 05:40:43 +08:00
# include "fdbrpc/genericactors.actor.h"
# include "fdbrpc/simulator.h"
# include "flow/ActorCollection.h"
# include "flow/Error.h"
# include "flow/flow.h"
# include "flow/Net2Packet.h"
# include "flow/TDMetric.actor.h"
# include "flow/ObjectSerializer.h"
# include "flow/ProtocolVersion.h"
2018-08-11 07:05:40 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-10-28 07:54:44 +08:00
2019-03-24 08:54:46 +08:00
static NetworkAddressList g_currentDeliveryPeerAddress = NetworkAddressList ( ) ;
2017-05-26 04:48:44 +08:00
const UID WLTOKEN_ENDPOINT_NOT_FOUND ( - 1 , 0 ) ;
const UID WLTOKEN_PING_PACKET ( - 1 , 1 ) ;
const UID TOKEN_IGNORE_PACKET ( 0 , 2 ) ;
const uint64_t TOKEN_STREAM_FLAG = 1 ;
class EndpointMap : NonCopyable {
public :
EndpointMap ( ) ;
2019-07-04 12:03:58 +08:00
void insert ( NetworkMessageReceiver * r , Endpoint : : Token & token , TaskPriority priority ) ;
2020-04-13 13:18:51 +08:00
const Endpoint & insert ( NetworkAddressList localAddresses , std : : vector < std : : pair < FlowReceiver * , TaskPriority > > const & streams ) ;
2017-05-26 04:48:44 +08:00
NetworkMessageReceiver * get ( Endpoint : : Token const & token ) ;
2019-06-25 17:47:35 +08:00
TaskPriority getPriority ( Endpoint : : Token const & token ) ;
2017-05-26 04:48:44 +08:00
void remove ( Endpoint : : Token const & token , NetworkMessageReceiver * r ) ;
private :
void realloc ( ) ;
struct Entry {
union {
uint64_t uid [ 2 ] ; // priority packed into lower 32 bits; actual lower 32 bits of token are the index in data[]
uint32_t nextFree ;
} ;
NetworkMessageReceiver * receiver ;
Endpoint : : Token & token ( ) { return * ( Endpoint : : Token * ) uid ; }
} ;
std : : vector < Entry > data ;
uint32_t firstFree ;
} ;
2019-01-25 09:28:26 +08:00
EndpointMap : : EndpointMap ( )
: firstFree ( - 1 )
2017-05-26 04:48:44 +08:00
{
}
void EndpointMap : : realloc ( ) {
int oldSize = data . size ( ) ;
data . resize ( std : : max ( 128 , oldSize * 2 ) ) ;
for ( int i = oldSize ; i < data . size ( ) ; i + + ) {
data [ i ] . receiver = 0 ;
data [ i ] . nextFree = i + 1 ;
}
data [ data . size ( ) - 1 ] . nextFree = firstFree ;
firstFree = oldSize ;
}
2019-07-04 12:03:58 +08:00
void EndpointMap : : insert ( NetworkMessageReceiver * r , Endpoint : : Token & token , TaskPriority priority ) {
2017-05-26 04:48:44 +08:00
if ( firstFree = = uint32_t ( - 1 ) ) realloc ( ) ;
int index = firstFree ;
firstFree = data [ index ] . nextFree ;
token = Endpoint : : Token ( token . first ( ) , ( token . second ( ) & 0xffffffff00000000LL ) | index ) ;
2019-07-04 12:03:58 +08:00
data [ index ] . token ( ) = Endpoint : : Token ( token . first ( ) , ( token . second ( ) & 0xffffffff00000000LL ) | static_cast < uint32_t > ( priority ) ) ;
2017-05-26 04:48:44 +08:00
data [ index ] . receiver = r ;
}
2020-04-13 13:18:51 +08:00
const Endpoint & EndpointMap : : insert ( NetworkAddressList localAddresses , std : : vector < std : : pair < FlowReceiver * , TaskPriority > > const & streams ) {
2020-04-13 14:51:20 +08:00
int adjacentFree = 0 ;
int adjacentStart = - 1 ;
firstFree = - 1 ;
for ( int i = 0 ; i < data . size ( ) ; i + + ) {
if ( data [ i ] . receiver ) {
adjacentFree = 0 ;
} else {
data [ i ] . nextFree = firstFree ;
firstFree = i ;
if ( adjacentStart = = - 1 & & + + adjacentFree = = streams . size ( ) ) {
adjacentStart = i + 1 - adjacentFree ;
firstFree = data [ adjacentStart ] . nextFree ;
}
}
}
if ( adjacentStart = = - 1 ) {
data . resize ( data . size ( ) + streams . size ( ) - adjacentFree ) ;
adjacentStart = data . size ( ) - streams . size ( ) ;
2020-05-07 14:24:58 +08:00
if ( adjacentFree > 0 ) {
firstFree = data [ adjacentStart ] . nextFree ;
}
2020-04-13 14:51:20 +08:00
}
2020-05-02 08:46:20 +08:00
2020-04-13 13:18:51 +08:00
UID base = deterministicRandom ( ) - > randomUniqueID ( ) ;
2020-05-20 11:41:56 +08:00
for ( uint64_t i = 0 ; i < streams . size ( ) ; i + + ) {
2020-04-13 14:51:20 +08:00
int index = adjacentStart + i ;
2020-05-21 03:32:50 +08:00
uint64_t first = ( base . first ( ) + ( i < < 32 ) ) | TOKEN_STREAM_FLAG ;
2020-05-20 11:41:56 +08:00
streams [ i ] . first - > setEndpoint ( Endpoint ( localAddresses , UID ( first , ( base . second ( ) & 0xffffffff00000000LL ) | index ) ) ) ;
data [ index ] . token ( ) = Endpoint : : Token ( first , ( base . second ( ) & 0xffffffff00000000LL ) | static_cast < uint32_t > ( streams [ i ] . second ) ) ;
2020-04-13 13:18:51 +08:00
data [ index ] . receiver = ( NetworkMessageReceiver * ) streams [ i ] . first ;
}
2020-05-02 08:46:20 +08:00
2020-04-13 13:18:51 +08:00
return streams [ 0 ] . first - > getEndpoint ( TaskPriority : : DefaultEndpoint ) ;
}
2017-05-26 04:48:44 +08:00
NetworkMessageReceiver * EndpointMap : : get ( Endpoint : : Token const & token ) {
uint32_t index = token . second ( ) ;
if ( index < data . size ( ) & & data [ index ] . token ( ) . first ( ) = = token . first ( ) & & ( ( data [ index ] . token ( ) . second ( ) & 0xffffffff00000000LL ) | index ) = = token . second ( ) )
return data [ index ] . receiver ;
return 0 ;
}
2019-06-25 17:47:35 +08:00
TaskPriority EndpointMap : : getPriority ( Endpoint : : Token const & token ) {
2017-05-26 04:48:44 +08:00
uint32_t index = token . second ( ) ;
if ( index < data . size ( ) & & data [ index ] . token ( ) . first ( ) = = token . first ( ) & & ( ( data [ index ] . token ( ) . second ( ) & 0xffffffff00000000LL ) | index ) = = token . second ( ) )
2019-06-25 17:47:35 +08:00
return static_cast < TaskPriority > ( data [ index ] . token ( ) . second ( ) ) ;
return TaskPriority : : UnknownEndpoint ;
2017-05-26 04:48:44 +08:00
}
void EndpointMap : : remove ( Endpoint : : Token const & token , NetworkMessageReceiver * r ) {
uint32_t index = token . second ( ) ;
if ( index < data . size ( ) & & data [ index ] . token ( ) . first ( ) = = token . first ( ) & & ( ( data [ index ] . token ( ) . second ( ) & 0xffffffff00000000LL ) | index ) = = token . second ( ) & & data [ index ] . receiver = = r ) {
data [ index ] . receiver = 0 ;
data [ index ] . nextFree = firstFree ;
firstFree = index ;
}
}
struct EndpointNotFoundReceiver : NetworkMessageReceiver {
EndpointNotFoundReceiver ( EndpointMap & endpoints ) {
//endpoints[WLTOKEN_ENDPOINT_NOT_FOUND] = this;
Endpoint : : Token e = WLTOKEN_ENDPOINT_NOT_FOUND ;
2019-07-04 12:03:58 +08:00
endpoints . insert ( this , e , TaskPriority : : DefaultEndpoint ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( e = = WLTOKEN_ENDPOINT_NOT_FOUND ) ;
}
2020-06-14 14:17:22 +08:00
virtual void receive ( ArenaObjectReader & reader ) override {
2017-05-26 04:48:44 +08:00
// Remote machine tells us it doesn't have endpoint e
2019-01-29 11:38:13 +08:00
Endpoint e ;
reader . deserialize ( e ) ;
IFailureMonitor : : failureMonitor ( ) . endpointNotFound ( e ) ;
}
2017-05-26 04:48:44 +08:00
} ;
struct PingReceiver : NetworkMessageReceiver {
PingReceiver ( EndpointMap & endpoints ) {
Endpoint : : Token e = WLTOKEN_PING_PACKET ;
2019-07-04 12:03:58 +08:00
endpoints . insert ( this , e , TaskPriority : : ReadSocket ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( e = = WLTOKEN_PING_PACKET ) ;
}
2020-06-14 14:17:22 +08:00
virtual void receive ( ArenaObjectReader & reader ) override {
2019-01-29 11:38:13 +08:00
ReplyPromise < Void > reply ;
reader . deserialize ( reply ) ;
reply . send ( Void ( ) ) ;
}
2017-05-26 04:48:44 +08:00
} ;
class TransportData {
public :
2019-01-25 09:28:26 +08:00
TransportData ( uint64_t transportId )
2017-05-26 04:48:44 +08:00
: endpointNotFoundReceiver ( endpoints ) ,
2017-07-18 01:40:36 +08:00
pingReceiver ( endpoints ) ,
2017-05-26 04:48:44 +08:00
warnAlwaysForLargePacket ( true ) ,
lastIncompatibleMessage ( 0 ) ,
2018-04-12 02:15:12 +08:00
transportId ( transportId ) ,
numIncompatibleConnections ( 0 )
2019-04-05 05:11:12 +08:00
{
degraded = Reference < AsyncVar < bool > > ( new AsyncVar < bool > ( false ) ) ;
}
2017-05-26 04:48:44 +08:00
2018-07-28 11:46:54 +08:00
~ TransportData ( ) ;
2017-05-26 04:48:44 +08:00
void initMetrics ( ) {
bytesSent . init ( LiteralStringRef ( " Net2.BytesSent " ) ) ;
countPacketsReceived . init ( LiteralStringRef ( " Net2.CountPacketsReceived " ) ) ;
countPacketsGenerated . init ( LiteralStringRef ( " Net2.CountPacketsGenerated " ) ) ;
countConnEstablished . init ( LiteralStringRef ( " Net2.CountConnEstablished " ) ) ;
countConnClosedWithError . init ( LiteralStringRef ( " Net2.CountConnClosedWithError " ) ) ;
countConnClosedWithoutError . init ( LiteralStringRef ( " Net2.CountConnClosedWithoutError " ) ) ;
}
2019-10-11 01:34:44 +08:00
Reference < struct Peer > getPeer ( NetworkAddress const & address ) ;
Reference < struct Peer > getOrOpenPeer ( NetworkAddress const & address , bool startConnectionKeeper = true ) ;
2018-10-31 04:44:37 +08:00
// Returns true if given network address 'address' is one of the address we are listening on.
bool isLocalAddress ( const NetworkAddress & address ) const ;
2018-12-18 03:25:44 +08:00
NetworkAddressList localAddresses ;
2019-02-01 10:20:14 +08:00
std : : vector < Future < Void > > listeners ;
2019-08-10 02:52:12 +08:00
std : : unordered_map < NetworkAddress , Reference < struct Peer > > peers ;
2019-04-05 05:11:12 +08:00
std : : unordered_map < NetworkAddress , std : : pair < double , double > > closedPeers ;
2020-02-18 13:54:05 +08:00
HealthMonitor healthMonitor ;
2019-04-05 05:11:12 +08:00
Reference < AsyncVar < bool > > degraded ;
2017-05-26 04:48:44 +08:00
bool warnAlwaysForLargePacket ;
// These declarations must be in exactly this order
EndpointMap endpoints ;
EndpointNotFoundReceiver endpointNotFoundReceiver ;
PingReceiver pingReceiver ;
// End ordered declarations
Int64MetricHandle bytesSent ;
Int64MetricHandle countPacketsReceived ;
Int64MetricHandle countPacketsGenerated ;
Int64MetricHandle countConnEstablished ;
Int64MetricHandle countConnClosedWithError ;
Int64MetricHandle countConnClosedWithoutError ;
std : : map < NetworkAddress , std : : pair < uint64_t , double > > incompatiblePeers ;
2020-04-06 14:09:36 +08:00
AsyncTrigger incompatiblePeersChanged ;
2017-11-15 10:37:29 +08:00
uint32_t numIncompatibleConnections ;
2017-05-26 04:48:44 +08:00
std : : map < uint64_t , double > multiVersionConnections ;
double lastIncompatibleMessage ;
uint64_t transportId ;
Future < Void > multiVersionCleanup ;
} ;
# define CONNECT_PACKET_V0 0x0FDB00A444020001LL
# define CONNECT_PACKET_V0_SIZE 14
# pragma pack( push, 1 )
struct ConnectPacket {
2019-03-01 08:07:49 +08:00
// The value does not inclueds the size of `connectPacketLength` itself,
2019-03-01 04:24:56 +08:00
// but only the other fields of this structure.
uint32_t connectPacketLength ;
2019-06-19 05:49:04 +08:00
ProtocolVersion protocolVersion ; // Expect currentProtocolVersion
2019-03-01 08:07:49 +08:00
2017-05-26 04:48:44 +08:00
uint16_t canonicalRemotePort ; // Port number to reconnect to the originating process
uint64_t connectionId ; // Multi-version clients will use the same Id for both connections, other connections will set this to zero. Added at protocol Version 0x0FDB00A444020001.
2019-03-01 08:07:49 +08:00
// IP Address to reconnect to the originating process. Only one of these must be populated.
uint32_t canonicalRemoteIp4 ;
enum ConnectPacketFlags {
FLAG_IPV6 = 1
} ;
uint16_t flags ;
uint8_t canonicalRemoteIp6 [ 16 ] ;
2019-02-27 10:04:03 +08:00
2019-03-09 03:40:32 +08:00
ConnectPacket ( ) {
memset ( this , 0 , sizeof ( * this ) ) ;
}
2019-02-27 10:04:03 +08:00
IPAddress canonicalRemoteIp ( ) const {
if ( isIPv6 ( ) ) {
2019-03-01 08:07:49 +08:00
IPAddress : : IPAddressStore store ;
memcpy ( store . data ( ) , canonicalRemoteIp6 , sizeof ( canonicalRemoteIp6 ) ) ;
return IPAddress ( store ) ;
2019-02-27 10:04:03 +08:00
} else {
2019-03-01 08:07:49 +08:00
return IPAddress ( canonicalRemoteIp4 ) ;
2019-02-27 10:04:03 +08:00
}
}
2017-05-26 04:48:44 +08:00
2019-02-27 10:04:03 +08:00
void setCanonicalRemoteIp ( const IPAddress & ip ) {
if ( ip . isV6 ( ) ) {
2019-03-01 08:07:49 +08:00
flags = flags | FLAG_IPV6 ;
memcpy ( & canonicalRemoteIp6 , ip . toV6 ( ) . data ( ) , 16 ) ;
2019-02-27 10:04:03 +08:00
} else {
2019-03-01 08:07:49 +08:00
flags = flags & ~ FLAG_IPV6 ;
canonicalRemoteIp4 = ip . toV4 ( ) ;
2019-02-27 10:04:03 +08:00
}
}
2019-03-01 08:07:49 +08:00
bool isIPv6 ( ) const { return flags & FLAG_IPV6 ; }
2019-02-27 10:04:03 +08:00
uint32_t totalPacketSize ( ) const { return connectPacketLength + sizeof ( connectPacketLength ) ; }
template < class Ar >
void serialize ( Ar & ar ) {
2019-03-01 08:07:49 +08:00
serializer ( ar , connectPacketLength ) ;
2019-04-11 08:41:02 +08:00
if ( connectPacketLength > sizeof ( ConnectPacket ) - sizeof ( connectPacketLength ) ) {
ASSERT ( ! g_network - > isSimulated ( ) ) ;
throw serialization_failed ( ) ;
}
2019-03-01 08:07:49 +08:00
serializer ( ar , protocolVersion , canonicalRemotePort , connectionId , canonicalRemoteIp4 ) ;
2019-06-19 08:55:27 +08:00
if ( ar . isDeserializing & & ! ar . protocolVersion ( ) . hasIPv6 ( ) ) {
2019-03-01 08:07:49 +08:00
flags = 0 ;
2019-02-27 10:04:03 +08:00
} else {
2019-03-01 08:07:49 +08:00
// We can send everything in serialized packet, since the current version of ConnectPacket
// is backward compatible with CONNECT_PACKET_V0.
serializer ( ar , flags ) ;
ar . serializeBytes ( & canonicalRemoteIp6 , sizeof ( canonicalRemoteIp6 ) ) ;
2019-02-27 10:04:03 +08:00
}
2017-05-26 04:48:44 +08:00
}
} ;
# pragma pack( pop )
2019-08-10 02:52:12 +08:00
ACTOR static Future < Void > connectionReader ( TransportData * transport , Reference < IConnection > conn , Reference < struct Peer > peer ,
Promise < Reference < struct Peer > > onConnected ) ;
2017-07-18 01:40:36 +08:00
2019-08-30 07:49:57 +08:00
static void sendLocal ( TransportData * self , ISerializeSource const & what , const Endpoint & destination ) ;
static ReliablePacket * sendPacket ( TransportData * self , Reference < Peer > peer , ISerializeSource const & what , const Endpoint & destination , bool reliable ) ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
ACTOR Future < Void > connectionMonitor ( Reference < Peer > peer ) {
state Endpoint remotePingEndpoint ( { peer - > destination } , WLTOKEN_PING_PACKET ) ;
loop {
2020-04-23 14:36:40 +08:00
if ( ! FlowTransport : : isClient ( ) & & ! peer - > destination . isPublic ( ) & & peer - > compatible ) {
2019-08-30 07:49:57 +08:00
// Don't send ping messages to clients unless necessary. Instead monitor incoming client pings.
2019-08-31 02:17:22 +08:00
// We ignore this block for incompatible clients because pings from server would trigger the
// peer->resetPing and prevent 'connection_failed' due to ping timeout.
2019-08-30 07:49:57 +08:00
state double lastRefreshed = now ( ) ;
state int64_t lastBytesReceived = peer - > bytesReceived ;
loop {
2020-02-20 10:50:21 +08:00
wait ( delay ( FLOW_KNOBS - > CONNECTION_MONITOR_LOOP_TIME , TaskPriority : : ReadSocket ) ) ;
2019-08-30 07:49:57 +08:00
if ( lastBytesReceived < peer - > bytesReceived ) {
lastRefreshed = now ( ) ;
lastBytesReceived = peer - > bytesReceived ;
} else if ( lastRefreshed < now ( ) - FLOW_KNOBS - > CONNECTION_MONITOR_IDLE_TIMEOUT *
FLOW_KNOBS - > CONNECTION_MONITOR_INCOMING_IDLE_MULTIPLIER ) {
// If we have not received anything in this period, client must have closed
// connection by now. Break loop to check if it is still alive by sending a ping.
break ;
}
}
2017-05-26 04:48:44 +08:00
}
2019-02-01 10:20:14 +08:00
2019-08-30 07:49:57 +08:00
//We cannot let an error be thrown from connectionMonitor while still on the stack from scanPackets in connectionReader
//because then it would not call the destructor of connectionReader when connectionReader is cancelled.
2020-02-20 10:50:21 +08:00
wait ( delay ( 0 , TaskPriority : : ReadSocket ) ) ;
2019-08-30 07:49:57 +08:00
if ( peer - > reliable . empty ( ) & & peer - > unsent . empty ( ) & & peer - > outstandingReplies = = 0 ) {
if ( peer - > peerReferences = = 0 & &
( peer - > lastDataPacketSentTime < now ( ) - FLOW_KNOBS - > CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY ) ) {
// TODO: What about when peerReference == -1?
throw connection_unreferenced ( ) ;
2020-04-23 14:36:40 +08:00
} else if ( FlowTransport : : isClient ( ) & & peer - > compatible & & peer - > destination . isPublic ( ) & &
2020-01-07 14:13:49 +08:00
( peer - > lastConnectTime < now ( ) - FLOW_KNOBS - > CONNECTION_MONITOR_IDLE_TIMEOUT ) & &
( peer - > lastDataPacketSentTime < now ( ) - FLOW_KNOBS - > CONNECTION_MONITOR_IDLE_TIMEOUT ) ) {
2019-08-30 07:49:57 +08:00
// First condition is necessary because we may get here if we are server.
throw connection_idle ( ) ;
}
2019-06-19 05:49:04 +08:00
}
2017-05-26 04:48:44 +08:00
2020-02-20 10:50:21 +08:00
wait ( delayJittered ( FLOW_KNOBS - > CONNECTION_MONITOR_LOOP_TIME , TaskPriority : : ReadSocket ) ) ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
// TODO: Stop monitoring and close the connection with no onDisconnect requests outstanding
state ReplyPromise < Void > reply ;
FlowTransport : : transport ( ) . sendUnreliable ( SerializeSource < ReplyPromise < Void > > ( reply ) , remotePingEndpoint , true ) ;
state int64_t startingBytes = peer - > bytesReceived ;
state int timeouts = 0 ;
loop {
choose {
when ( wait ( delay ( FLOW_KNOBS - > CONNECTION_MONITOR_TIMEOUT ) ) ) {
if ( startingBytes = = peer - > bytesReceived ) {
TraceEvent ( " ConnectionTimeout " ) . suppressFor ( 1.0 ) . detail ( " WithAddr " , peer - > destination ) ;
throw connection_failed ( ) ;
}
if ( timeouts > 1 ) {
TraceEvent ( SevWarnAlways , " ConnectionSlowPing " )
. suppressFor ( 1.0 )
. detail ( " WithAddr " , peer - > destination )
. detail ( " Timeouts " , timeouts ) ;
}
startingBytes = peer - > bytesReceived ;
timeouts + + ;
}
when ( wait ( reply . getFuture ( ) ) ) {
break ;
}
when ( wait ( peer - > resetPing . onTrigger ( ) ) ) {
break ;
}
}
2018-07-07 10:44:30 +08:00
}
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
ACTOR Future < Void > connectionWriter ( Reference < Peer > self , Reference < IConnection > conn ) {
state double lastWriteTime = now ( ) ;
loop {
//wait( delay(0, TaskPriority::WriteSocket) );
wait ( delayJittered ( std : : max < double > ( FLOW_KNOBS - > MIN_COALESCE_DELAY , FLOW_KNOBS - > MAX_COALESCE_DELAY - ( now ( ) - lastWriteTime ) ) , TaskPriority : : WriteSocket ) ) ;
//wait( delay(500e-6, TaskPriority::WriteSocket) );
//wait( yield(TaskPriority::WriteSocket) );
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
// Send until there is nothing left to send
loop {
lastWriteTime = now ( ) ;
2017-05-26 04:48:44 +08:00
2020-07-16 17:31:17 +08:00
int sent = conn - > write ( self - > unsent . getUnsent ( ) , FLOW_KNOBS - > MAX_PACKET_SEND_BYTES ) ;
2020-06-26 11:44:43 +08:00
2020-07-16 17:31:17 +08:00
if ( sent ! = 0 ) {
2019-08-30 07:49:57 +08:00
self - > transport - > bytesSent + = sent ;
self - > unsent . sent ( sent ) ;
}
2020-06-26 11:44:43 +08:00
if ( self - > unsent . empty ( ) ) {
break ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
TEST ( true ) ; // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull.
wait ( conn - > onWritable ( ) ) ;
wait ( yield ( TaskPriority : : WriteSocket ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
// Wait until there is something to send
while ( self - > unsent . empty ( ) )
wait ( self - > dataToSend . onTrigger ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
}
2017-05-26 04:48:44 +08:00
2020-03-20 14:21:41 +08:00
ACTOR Future < Void > delayedHealthUpdate ( NetworkAddress address ) {
2020-04-09 13:31:34 +08:00
state double start = now ( ) ;
2020-04-15 02:30:40 +08:00
state bool delayed = false ;
2020-04-09 13:31:34 +08:00
loop {
if ( FLOW_KNOBS - > HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS & &
FlowTransport : : transport ( ) . healthMonitor ( ) - > tooManyConnectionsClosed ( address ) & & address . isPublic ( ) ) {
2020-04-15 02:30:40 +08:00
if ( ! delayed ) {
2020-04-09 13:31:34 +08:00
TraceEvent ( " TooManyConnectionsClosedMarkFailed " )
. detail ( " Dest " , address )
. detail ( " StartTime " , start )
. detail ( " ClosedCount " , FlowTransport : : transport ( ) . healthMonitor ( ) - > closedConnectionsCount ( address ) ) ;
IFailureMonitor : : failureMonitor ( ) . setStatus ( address , FailureStatus ( true ) ) ;
2020-03-20 14:21:41 +08:00
}
2020-04-15 02:30:40 +08:00
delayed = true ;
2020-04-09 13:31:34 +08:00
wait ( delayJittered ( FLOW_KNOBS - > MAX_RECONNECTION_TIME * 2.0 ) ) ;
} else {
2020-04-16 10:39:56 +08:00
if ( delayed ) {
TraceEvent ( " TooManyConnectionsClosedMarkAvailable " )
2020-04-09 13:31:34 +08:00
. detail ( " Dest " , address )
. detail ( " StartTime " , start )
. detail ( " TimeElapsed " , now ( ) - start )
. detail ( " ClosedCount " , FlowTransport : : transport ( ) . healthMonitor ( ) - > closedConnectionsCount ( address ) ) ;
2020-04-16 10:39:56 +08:00
}
2020-04-09 13:31:34 +08:00
IFailureMonitor : : failureMonitor ( ) . setStatus ( address , FailureStatus ( false ) ) ;
break ;
2020-03-20 14:21:41 +08:00
}
}
2020-04-09 13:31:34 +08:00
return Void ( ) ;
2020-03-20 14:21:41 +08:00
}
2019-08-30 07:49:57 +08:00
ACTOR Future < Void > connectionKeeper ( Reference < Peer > self ,
Reference < IConnection > conn = Reference < IConnection > ( ) ,
Future < Void > reader = Void ( ) ) {
TraceEvent ( SevDebug , " ConnectionKeeper " , conn ? conn - > getDebugID ( ) : UID ( ) )
. detail ( " PeerAddr " , self - > destination )
. detail ( " ConnSet " , ( bool ) conn ) ;
2020-04-16 10:39:56 +08:00
ASSERT_WE_THINK ( FlowTransport : : transport ( ) . getLocalAddress ( ) ! = self - > destination ) ;
2019-08-30 07:49:57 +08:00
2020-05-02 08:07:56 +08:00
state Future < Void > delayedHealthUpdateF ;
2020-01-09 05:31:58 +08:00
state Optional < double > firstConnFailedTime = Optional < double > ( ) ;
2020-04-23 10:38:01 +08:00
state int retryConnect = false ;
2019-08-30 07:49:57 +08:00
loop {
try {
2020-05-02 08:07:56 +08:00
delayedHealthUpdateF = Future < Void > ( ) ;
2020-04-22 03:16:39 +08:00
2019-08-30 07:49:57 +08:00
if ( ! conn ) { // Always, except for the first loop with an incoming connection
2019-10-11 03:48:35 +08:00
self - > outgoingConnectionIdle = true ;
2019-08-30 07:49:57 +08:00
// Wait until there is something to send.
while ( self - > unsent . empty ( ) ) {
2020-04-23 10:38:01 +08:00
// Override waiting, if we are in failed state to update failure monitoring status.
2020-04-24 00:44:30 +08:00
Future < Void > retryConnectF = Never ( ) ;
if ( retryConnect ) {
retryConnectF = IFailureMonitor : : failureMonitor ( ) . getState ( self - > destination ) . isAvailable ( )
? delay ( FLOW_KNOBS - > FAILURE_DETECTION_DELAY )
: delay ( FLOW_KNOBS - > SERVER_REQUEST_INTERVAL ) ;
2019-07-06 07:27:17 +08:00
}
2020-01-07 14:13:49 +08:00
2020-04-23 10:38:01 +08:00
choose {
when ( wait ( self - > dataToSend . onTrigger ( ) ) ) { }
when ( wait ( retryConnectF ) ) { break ; }
}
2019-06-28 15:39:51 +08:00
}
2017-05-26 04:48:44 +08:00
2020-04-17 05:04:32 +08:00
ASSERT ( self - > destination . isPublic ( ) ) ;
2019-08-30 07:49:57 +08:00
self - > outgoingConnectionIdle = false ;
wait ( delayJittered (
std : : max ( 0.0 , self - > lastConnectTime + self - > reconnectionDelay -
now ( ) ) ) ) ; // Don't connect() to the same peer more than once per 2 sec
self - > lastConnectTime = now ( ) ;
2020-03-21 11:50:59 +08:00
TraceEvent ( " ConnectingTo " , conn ? conn - > getDebugID ( ) : UID ( ) )
. suppressFor ( 1.0 )
. detail ( " PeerAddr " , self - > destination )
2020-04-22 03:16:39 +08:00
. detail ( " PeerReferences " , self - > peerReferences )
. detail ( " FailureStatus " , IFailureMonitor : : failureMonitor ( ) . getState ( self - > destination ) . isAvailable ( )
? " OK "
: " FAILED " ) ;
2019-11-22 05:08:59 +08:00
try {
choose {
2020-03-20 14:21:41 +08:00
when ( Reference < IConnection > _conn =
wait ( INetworkConnections : : net ( ) - > connect ( self - > destination ) ) ) {
2020-02-07 08:45:54 +08:00
conn = _conn ;
wait ( conn - > connectHandshake ( ) ) ;
2019-11-22 08:08:32 +08:00
if ( self - > unsent . empty ( ) ) {
2020-04-15 02:30:40 +08:00
delayedHealthUpdateF = delayedHealthUpdate ( self - > destination ) ;
2020-04-07 14:05:29 +08:00
choose {
2020-04-15 02:30:40 +08:00
when ( wait ( delayedHealthUpdateF ) ) {
2020-04-07 14:05:29 +08:00
conn - > close ( ) ;
conn = Reference < IConnection > ( ) ;
2020-04-23 10:38:01 +08:00
retryConnect = false ;
2020-04-07 14:05:29 +08:00
continue ;
}
when ( wait ( self - > dataToSend . onTrigger ( ) ) ) { }
}
2019-11-22 08:08:32 +08:00
}
2020-04-07 14:05:29 +08:00
TraceEvent ( " ConnectionExchangingConnectPacket " , conn - > getDebugID ( ) )
. suppressFor ( 1.0 )
. detail ( " PeerAddr " , self - > destination ) ;
self - > prependConnectPacket ( ) ;
reader = connectionReader ( self - > transport , conn , self , Promise < Reference < Peer > > ( ) ) ;
2019-11-22 08:08:32 +08:00
}
when ( wait ( delay ( FLOW_KNOBS - > CONNECTION_MONITOR_TIMEOUT ) ) ) {
throw connection_failed ( ) ;
}
2019-11-22 05:08:59 +08:00
}
2020-03-20 14:21:41 +08:00
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_connection_failed ) {
2019-11-22 05:08:59 +08:00
throw ;
}
2020-01-07 14:13:49 +08:00
TraceEvent ( " ConnectionTimedOut " , conn ? conn - > getDebugID ( ) : UID ( ) )
. suppressFor ( 1.0 )
2020-01-08 07:02:41 +08:00
. detail ( " PeerAddr " , self - > destination ) ;
2020-01-07 14:13:49 +08:00
2019-11-22 08:08:32 +08:00
throw ;
2019-05-17 08:26:48 +08:00
}
2019-08-30 07:49:57 +08:00
} else {
self - > outgoingConnectionIdle = false ;
2020-09-01 00:10:30 +08:00
self - > lastConnectTime = now ( ) ;
2017-05-26 04:48:44 +08:00
}
2020-01-09 05:31:58 +08:00
firstConnFailedTime . reset ( ) ;
2019-08-30 07:49:57 +08:00
try {
self - > transport - > countConnEstablished + + ;
2020-04-15 02:30:40 +08:00
if ( ! delayedHealthUpdateF . isValid ( ) )
delayedHealthUpdateF = delayedHealthUpdate ( self - > destination ) ;
2020-07-11 06:06:34 +08:00
wait ( connectionWriter ( self , conn ) | | reader | | connectionMonitor ( self ) | | self - > resetConnection . onTrigger ( ) ) ;
2020-07-10 13:50:47 +08:00
TraceEvent ( " ConnectionReset " , conn ? conn - > getDebugID ( ) : UID ( ) ) . suppressFor ( 1.0 ) . detail ( " PeerAddr " , self - > destination ) ;
throw connection_failed ( ) ;
2019-08-30 07:49:57 +08:00
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_connection_failed | | e . code ( ) = = error_code_actor_cancelled | |
e . code ( ) = = error_code_connection_unreferenced | |
( g_network - > isSimulated ( ) & & e . code ( ) = = error_code_checksum_failed ) )
self - > transport - > countConnClosedWithoutError + + ;
else
self - > transport - > countConnClosedWithError + + ;
2020-01-07 14:13:49 +08:00
2019-08-30 07:49:57 +08:00
throw e ;
}
} catch ( Error & e ) {
2020-03-20 14:21:41 +08:00
delayedHealthUpdateF . cancel ( ) ;
2019-08-30 07:49:57 +08:00
if ( now ( ) - self - > lastConnectTime > FLOW_KNOBS - > RECONNECTION_RESET_TIME ) {
self - > reconnectionDelay = FLOW_KNOBS - > INITIAL_RECONNECTION_TIME ;
} else {
self - > reconnectionDelay = std : : min ( FLOW_KNOBS - > MAX_RECONNECTION_TIME , self - > reconnectionDelay * FLOW_KNOBS - > RECONNECTION_TIME_GROWTH_RATE ) ;
}
2020-01-09 05:31:58 +08:00
if ( firstConnFailedTime . present ( ) ) {
if ( now ( ) - firstConnFailedTime . get ( ) > FLOW_KNOBS - > PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT ) {
TraceEvent ( SevWarnAlways , " PeerUnavailableForLongTime " , conn ? conn - > getDebugID ( ) : UID ( ) )
2020-05-12 03:53:19 +08:00
. suppressFor ( 1.0 )
2020-01-09 05:31:58 +08:00
. detail ( " PeerAddr " , self - > destination ) ;
firstConnFailedTime = now ( ) - FLOW_KNOBS - > PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT / 2.0 ;
}
} else {
firstConnFailedTime = now ( ) ;
}
2020-04-23 10:38:01 +08:00
// Don't immediately mark connection as failed. To stay closed to earlier behaviour of centralized
// failure monitoring, wait until connection stays failed for FLOW_KNOBS->FAILURE_DETECTION_DELAY timeout.
2020-05-07 14:24:58 +08:00
retryConnect = true ;
2020-04-23 10:38:01 +08:00
if ( e . code ( ) = = error_code_connection_failed ) {
if ( ! self - > destination . isPublic ( ) ) {
// Can't connect back to non-public addresses.
IFailureMonitor : : failureMonitor ( ) . setStatus ( self - > destination , FailureStatus ( true ) ) ;
} else if ( now ( ) - firstConnFailedTime . get ( ) > FLOW_KNOBS - > FAILURE_DETECTION_DELAY ) {
IFailureMonitor : : failureMonitor ( ) . setStatus ( self - > destination , FailureStatus ( true ) ) ;
}
}
2019-08-30 07:49:57 +08:00
self - > discardUnreliablePackets ( ) ;
reader = Future < Void > ( ) ;
bool ok = e . code ( ) = = error_code_connection_failed | | e . code ( ) = = error_code_actor_cancelled | |
e . code ( ) = = error_code_connection_unreferenced | | e . code ( ) = = error_code_connection_idle | |
( g_network - > isSimulated ( ) & & e . code ( ) = = error_code_checksum_failed ) ;
if ( self - > compatible ) {
TraceEvent ( ok ? SevInfo : SevWarnAlways , " ConnectionClosed " , conn ? conn - > getDebugID ( ) : UID ( ) )
. error ( e , true )
. suppressFor ( 1.0 )
. detail ( " PeerAddr " , self - > destination ) ;
}
else {
TraceEvent ( ok ? SevInfo : SevWarnAlways , " IncompatibleConnectionClosed " ,
conn ? conn - > getDebugID ( ) : UID ( ) )
. error ( e , true )
. suppressFor ( 1.0 )
. detail ( " PeerAddr " , self - > destination ) ;
}
2017-05-26 04:48:44 +08:00
2019-11-14 05:00:43 +08:00
if ( self - > destination . isPublic ( )
& & IFailureMonitor : : failureMonitor ( ) . getState ( self - > destination ) . isAvailable ( )
2020-04-23 14:36:40 +08:00
& & ! FlowTransport : : isClient ( ) )
2019-11-14 05:00:43 +08:00
{
2019-08-30 07:49:57 +08:00
auto & it = self - > transport - > closedPeers [ self - > destination ] ;
if ( now ( ) - it . second > FLOW_KNOBS - > TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY ) {
it . first = now ( ) ;
} else if ( now ( ) - it . first > FLOW_KNOBS - > TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT ) {
TraceEvent ( SevWarnAlways , " TooManyConnectionsClosed " , conn ? conn - > getDebugID ( ) : UID ( ) )
. suppressFor ( 5.0 )
. detail ( " PeerAddr " , self - > destination ) ;
self - > transport - > degraded - > set ( true ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
it . second = now ( ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
if ( conn ) {
2020-03-20 14:21:41 +08:00
if ( self - > destination . isPublic ( ) & & e . code ( ) = = error_code_connection_failed ) {
FlowTransport : : transport ( ) . healthMonitor ( ) - > reportPeerClosed ( self - > destination ) ;
}
2019-08-30 07:49:57 +08:00
conn - > close ( ) ;
conn = Reference < IConnection > ( ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
// Clients might send more packets in response, which needs to go out on the next connection
IFailureMonitor : : failureMonitor ( ) . notifyDisconnect ( self - > destination ) ;
if ( e . code ( ) = = error_code_actor_cancelled ) throw ;
// Try to recover, even from serious errors, by retrying
if ( self - > peerReferences < = 0 & & self - > reliable . empty ( ) & & self - > unsent . empty ( ) & & self - > outstandingReplies = = 0 ) {
TraceEvent ( " PeerDestroy " ) . error ( e ) . suppressFor ( 1.0 ) . detail ( " PeerAddr " , self - > destination ) ;
self - > connect . cancel ( ) ;
self - > transport - > peers . erase ( self - > destination ) ;
return Void ( ) ;
}
2017-05-26 04:48:44 +08:00
}
}
2019-08-30 07:49:57 +08:00
}
2017-05-26 04:48:44 +08:00
2020-04-23 10:38:01 +08:00
Peer : : Peer ( TransportData * transport , NetworkAddress const & destination )
: transport ( transport ) , destination ( destination ) , outgoingConnectionIdle ( true ) , lastConnectTime ( 0.0 ) ,
reconnectionDelay ( FLOW_KNOBS - > INITIAL_RECONNECTION_TIME ) , compatible ( true ) , outstandingReplies ( 0 ) ,
incompatibleProtocolVersionNewer ( false ) , peerReferences ( - 1 ) , bytesReceived ( 0 ) , lastDataPacketSentTime ( now ( ) ) {
IFailureMonitor : : failureMonitor ( ) . setStatus ( destination , FailureStatus ( false ) ) ;
}
2019-08-30 07:49:57 +08:00
void Peer : : send ( PacketBuffer * pb , ReliablePacket * rp , bool firstUnsent ) {
unsent . setWriteBuffer ( pb ) ;
if ( rp ) reliable . insert ( rp ) ;
if ( firstUnsent ) dataToSend . trigger ( ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
void Peer : : prependConnectPacket ( ) {
// Send the ConnectPacket expected at the beginning of a new connection
ConnectPacket pkt ;
if ( transport - > localAddresses . address . isTLS ( ) = = destination . isTLS ( ) ) {
pkt . canonicalRemotePort = transport - > localAddresses . address . port ;
pkt . setCanonicalRemoteIp ( transport - > localAddresses . address . ip ) ;
} else if ( transport - > localAddresses . secondaryAddress . present ( ) ) {
pkt . canonicalRemotePort = transport - > localAddresses . secondaryAddress . get ( ) . port ;
pkt . setCanonicalRemoteIp ( transport - > localAddresses . secondaryAddress . get ( ) . ip ) ;
} else {
// a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
pkt . canonicalRemotePort = 0 ;
pkt . setCanonicalRemoteIp ( IPAddress ( 0 ) ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
pkt . connectPacketLength = sizeof ( pkt ) - sizeof ( pkt . connectPacketLength ) ;
pkt . protocolVersion = currentProtocolVersion ;
2020-02-13 02:41:52 +08:00
pkt . protocolVersion . addObjectSerializerFlag ( ) ;
2019-08-30 07:49:57 +08:00
pkt . connectionId = transport - > transportId ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
PacketBuffer * pb_first = PacketBuffer : : create ( ) ;
PacketWriter wr ( pb_first , nullptr , Unversioned ( ) ) ;
pkt . serialize ( wr ) ;
unsent . prependWriteBuffer ( pb_first , wr . finish ( ) ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
void Peer : : discardUnreliablePackets ( ) {
// Throw away the current unsent list, dropping the reference count on each PacketBuffer that accounts for presence in the unsent list
unsent . discardAll ( ) ;
2019-04-05 05:11:12 +08:00
2019-08-30 07:49:57 +08:00
// If there are reliable packets, compact reliable packets into a new unsent range
if ( ! reliable . empty ( ) ) {
PacketBuffer * pb = unsent . getWriteBuffer ( ) ;
pb = reliable . compact ( pb , nullptr ) ;
unsent . setWriteBuffer ( pb ) ;
}
}
2019-06-28 15:39:51 +08:00
2019-08-30 07:49:57 +08:00
void Peer : : onIncomingConnection ( Reference < Peer > self , Reference < IConnection > conn , Future < Void > reader ) {
// In case two processes are trying to connect to each other simultaneously, the process with the larger canonical NetworkAddress
// gets to keep its outgoing connection.
if ( ! destination . isPublic ( ) & & ! outgoingConnectionIdle ) throw address_in_use ( ) ;
NetworkAddress compatibleAddr = transport - > localAddresses . address ;
if ( transport - > localAddresses . secondaryAddress . present ( ) & & transport - > localAddresses . secondaryAddress . get ( ) . isTLS ( ) = = destination . isTLS ( ) ) {
compatibleAddr = transport - > localAddresses . secondaryAddress . get ( ) ;
}
2019-06-28 15:39:51 +08:00
2020-08-31 09:49:49 +08:00
if ( ! destination . isPublic ( ) | | outgoingConnectionIdle | | destination > compatibleAddr | | ( lastConnectTime > 1.0 & & now ( ) - lastConnectTime > FLOW_KNOBS - > ALWAYS_ACCEPT_DELAY ) ) {
2019-08-30 07:49:57 +08:00
// Keep the new connection
TraceEvent ( " IncomingConnection " , conn - > getDebugID ( ) )
. suppressFor ( 1.0 )
. detail ( " FromAddr " , conn - > getPeerAddress ( ) )
. detail ( " CanonicalAddr " , destination )
. detail ( " IsPublic " , destination . isPublic ( ) ) ;
connect . cancel ( ) ;
prependConnectPacket ( ) ;
connect = connectionKeeper ( self , conn , reader ) ;
} else {
TraceEvent ( " RedundantConnection " , conn - > getDebugID ( ) )
. suppressFor ( 1.0 )
. detail ( " FromAddr " , conn - > getPeerAddress ( ) . toString ( ) )
. detail ( " CanonicalAddr " , destination )
. detail ( " LocalAddr " , compatibleAddr ) ;
// Keep our prior connection
reader . cancel ( ) ;
conn - > close ( ) ;
2018-07-09 01:26:41 +08:00
2019-08-30 07:49:57 +08:00
// Send an (ignored) packet to make sure that, if our outgoing connection died before the peer made this connection attempt,
// we eventually find out that our connection is dead, close it, and then respond to the next connection reattempt from peer.
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
}
2017-05-26 04:48:44 +08:00
2018-07-28 11:46:54 +08:00
TransportData : : ~ TransportData ( ) {
for ( auto & p : peers ) {
p . second - > connect . cancel ( ) ;
}
}
2019-04-12 04:24:00 +08:00
ACTOR static void deliver ( TransportData * self , Endpoint destination , ArenaReader reader , bool inReadSocket ) {
2019-06-25 17:47:35 +08:00
TaskPriority priority = self - > endpoints . getPriority ( destination . token ) ;
if ( priority < TaskPriority : : ReadSocket | | ! inReadSocket ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 0 , priority ) ) ;
2017-05-26 04:48:44 +08:00
} else {
g_network - > setCurrentTask ( priority ) ;
}
auto receiver = self - > endpoints . get ( destination . token ) ;
if ( receiver ) {
try {
2019-01-07 19:38:35 +08:00
g_currentDeliveryPeerAddress = destination . addresses ;
2020-02-13 02:41:52 +08:00
StringRef data = reader . arenaReadAll ( ) ;
ASSERT ( data . size ( ) > 8 ) ;
ArenaObjectReader objReader ( reader . arena ( ) , reader . arenaReadAll ( ) , AssumeVersion ( reader . protocolVersion ( ) ) ) ;
receiver - > receive ( objReader ) ;
2019-02-23 01:33:54 +08:00
g_currentDeliveryPeerAddress = { NetworkAddress ( ) } ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
2019-01-07 19:38:35 +08:00
g_currentDeliveryPeerAddress = { NetworkAddress ( ) } ;
2018-10-31 04:44:37 +08:00
TraceEvent ( SevError , " ReceiverError " ) . error ( e ) . detail ( " Token " , destination . token . toString ( ) ) . detail ( " Peer " , destination . getPrimaryAddress ( ) ) ;
2020-04-23 14:38:46 +08:00
if ( ! FlowTransport : : isClient ( ) ) {
flushAndExit ( FDB_EXIT_ERROR ) ;
}
2017-05-26 04:48:44 +08:00
throw ;
}
} else if ( destination . token . first ( ) & TOKEN_STREAM_FLAG ) {
// We don't have the (stream) endpoint 'token', notify the remote machine
2019-03-02 03:45:46 +08:00
if ( destination . token . first ( ) ! = - 1 ) {
2019-08-30 07:49:57 +08:00
if ( self - > isLocalAddress ( destination . getPrimaryAddress ( ) ) ) {
sendLocal ( self , SerializeSource < Endpoint > ( Endpoint ( self - > localAddresses , destination . token ) ) , Endpoint ( destination . addresses , WLTOKEN_ENDPOINT_NOT_FOUND ) ) ;
} else {
2019-10-11 01:34:44 +08:00
Reference < Peer > peer = self - > getOrOpenPeer ( destination . getPrimaryAddress ( ) ) ;
2019-08-30 07:49:57 +08:00
sendPacket ( self , peer , SerializeSource < Endpoint > ( Endpoint ( self - > localAddresses , destination . token ) ) , Endpoint ( destination . addresses , WLTOKEN_ENDPOINT_NOT_FOUND ) , false ) ;
}
2019-03-02 03:45:46 +08:00
}
2017-05-26 04:48:44 +08:00
}
if ( inReadSocket )
2019-06-25 17:47:35 +08:00
g_network - > setCurrentTask ( TaskPriority : : ReadSocket ) ;
2017-05-26 04:48:44 +08:00
}
2019-06-12 07:44:00 +08:00
static void scanPackets ( TransportData * transport , uint8_t * & unprocessed_begin , const uint8_t * e , Arena & arena ,
2019-06-19 08:55:27 +08:00
NetworkAddress const & peerAddress , ProtocolVersion peerProtocolVersion ) {
2017-05-26 04:48:44 +08:00
// Find each complete packet in the given byte range and queue a ready task to deliver it.
// Remove the complete packets from the range by increasing unprocessed_begin.
// There won't be more than 64K of data plus one packet, so this shouldn't take a long time.
uint8_t * p = unprocessed_begin ;
2017-05-27 06:28:52 +08:00
2019-06-12 07:44:00 +08:00
const bool checksumEnabled = ! peerAddress . isTLS ( ) ;
2017-05-26 04:48:44 +08:00
loop {
2017-05-27 06:28:52 +08:00
uint32_t packetLen , packetChecksum ;
2017-05-26 04:48:44 +08:00
//Retrieve packet length and checksum
2017-05-27 06:28:52 +08:00
if ( checksumEnabled ) {
if ( e - p < sizeof ( uint32_t ) * 2 ) break ;
packetLen = * ( uint32_t * ) p ; p + = sizeof ( uint32_t ) ;
packetChecksum = * ( uint32_t * ) p ; p + = sizeof ( uint32_t ) ;
} else {
if ( e - p < sizeof ( uint32_t ) ) break ;
packetLen = * ( uint32_t * ) p ; p + = sizeof ( uint32_t ) ;
}
2017-05-26 04:48:44 +08:00
if ( packetLen > FLOW_KNOBS - > PACKET_LIMIT ) {
2019-11-13 01:23:46 +08:00
TraceEvent ( SevError , " PacketLimitExceeded " ) . detail ( " FromPeer " , peerAddress . toString ( ) ) . detail ( " Length " , ( int ) packetLen ) ;
2017-05-26 04:48:44 +08:00
throw platform_error ( ) ;
}
if ( e - p < packetLen ) break ;
ASSERT ( packetLen > = sizeof ( UID ) ) ;
2017-05-27 06:28:52 +08:00
if ( checksumEnabled ) {
bool isBuggifyEnabled = false ;
2017-09-19 07:54:49 +08:00
if ( g_network - > isSimulated ( ) & & g_network - > now ( ) - g_simulator . lastConnectionFailure > g_simulator . connectionFailuresDisableDuration & & BUGGIFY_WITH_PROB ( 0.0001 ) ) {
2017-09-19 03:46:29 +08:00
g_simulator . lastConnectionFailure = g_network - > now ( ) ;
2017-05-27 06:28:52 +08:00
isBuggifyEnabled = true ;
TraceEvent ( SevInfo , " BitsFlip " ) ;
2019-05-11 05:01:52 +08:00
int flipBits = 32 - ( int ) floor ( log2 ( deterministicRandom ( ) - > randomUInt32 ( ) ) ) ;
2017-05-27 06:28:52 +08:00
2019-05-11 05:01:52 +08:00
uint32_t firstFlipByteLocation = deterministicRandom ( ) - > randomUInt32 ( ) % packetLen ;
int firstFlipBitLocation = deterministicRandom ( ) - > randomInt ( 0 , 8 ) ;
2017-05-27 06:28:52 +08:00
* ( p + firstFlipByteLocation ) ^ = 1 < < firstFlipBitLocation ;
flipBits - - ;
for ( int i = 0 ; i < flipBits ; i + + ) {
2019-05-11 05:01:52 +08:00
uint32_t byteLocation = deterministicRandom ( ) - > randomUInt32 ( ) % packetLen ;
int bitLocation = deterministicRandom ( ) - > randomInt ( 0 , 8 ) ;
2017-05-27 06:28:52 +08:00
if ( byteLocation ! = firstFlipByteLocation | | bitLocation ! = firstFlipBitLocation ) {
* ( p + byteLocation ) ^ = 1 < < bitLocation ;
}
2017-05-26 04:48:44 +08:00
}
}
2017-05-27 06:28:52 +08:00
uint32_t calculatedChecksum = crc32c_append ( 0 , p , packetLen ) ;
if ( calculatedChecksum ! = packetChecksum ) {
if ( isBuggifyEnabled ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevInfo , " ChecksumMismatchExp " ) . detail ( " PacketChecksum " , ( int ) packetChecksum ) . detail ( " CalculatedChecksum " , ( int ) calculatedChecksum ) ;
2017-05-27 06:28:52 +08:00
} else {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevWarnAlways , " ChecksumMismatchUnexp " ) . detail ( " PacketChecksum " , ( int ) packetChecksum ) . detail ( " CalculatedChecksum " , ( int ) calculatedChecksum ) ;
2017-05-27 06:28:52 +08:00
}
throw checksum_failed ( ) ;
2017-05-26 04:48:44 +08:00
} else {
2017-05-27 06:28:52 +08:00
if ( isBuggifyEnabled ) {
2018-06-09 02:11:08 +08:00
TraceEvent ( SevError , " ChecksumMatchUnexp " ) . detail ( " PacketChecksum " , ( int ) packetChecksum ) . detail ( " CalculatedChecksum " , ( int ) calculatedChecksum ) ;
2017-05-27 06:28:52 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2017-10-28 07:54:44 +08:00
# if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED ( p , packetLen ) ;
# endif
2019-04-12 04:24:00 +08:00
ArenaReader reader ( arena , StringRef ( p , packetLen ) , AssumeVersion ( currentProtocolVersion ) ) ;
2019-01-26 07:10:20 +08:00
UID token ;
reader > > token ;
2017-05-26 04:48:44 +08:00
+ + transport - > countPacketsReceived ;
2018-02-16 06:54:35 +08:00
if ( packetLen > FLOW_KNOBS - > PACKET_WARNING ) {
2019-11-13 01:23:46 +08:00
TraceEvent ( transport - > warnAlwaysForLargePacket ? SevWarnAlways : SevWarn , " LargePacketReceived " )
2018-08-02 05:30:57 +08:00
. suppressFor ( 1.0 )
2018-02-16 06:54:35 +08:00
. detail ( " FromPeer " , peerAddress . toString ( ) )
. detail ( " Length " , ( int ) packetLen )
2018-08-02 05:30:57 +08:00
. detail ( " Token " , token ) ;
2018-02-16 06:54:35 +08:00
if ( g_network - > isSimulated ( ) )
transport - > warnAlwaysForLargePacket = false ;
}
2019-04-12 04:24:00 +08:00
ASSERT ( ! reader . empty ( ) ) ;
deliver ( transport , Endpoint ( { peerAddress } , token ) , std : : move ( reader ) , true ) ;
2017-05-26 04:48:44 +08:00
unprocessed_begin = p = p + packetLen ;
}
}
2019-06-12 07:44:00 +08:00
// Given unprocessed buffer [begin, end), check if next packet size is known and return
2019-06-26 00:52:05 +08:00
// enough size for the next packet, whose format is: {size, optional_checksum, data} +
// next_packet_size.
2019-06-14 02:56:06 +08:00
static int getNewBufferSize ( const uint8_t * begin , const uint8_t * end , const NetworkAddress & peerAddress ) {
2019-06-12 07:44:00 +08:00
const int len = end - begin ;
if ( len < sizeof ( uint32_t ) ) {
2019-06-22 01:51:20 +08:00
return FLOW_KNOBS - > MIN_PACKET_BUFFER_BYTES ;
2019-06-12 07:44:00 +08:00
}
const uint32_t packetLen = * ( uint32_t * ) begin ;
if ( packetLen > FLOW_KNOBS - > PACKET_LIMIT ) {
2019-11-13 01:23:46 +08:00
TraceEvent ( SevError , " PacketLimitExceeded " ) . detail ( " FromPeer " , peerAddress . toString ( ) ) . detail ( " Length " , ( int ) packetLen ) ;
2019-06-12 07:44:00 +08:00
throw platform_error ( ) ;
}
2019-06-12 08:52:25 +08:00
return std : : max < uint32_t > ( FLOW_KNOBS - > MIN_PACKET_BUFFER_BYTES ,
2019-06-26 00:52:05 +08:00
packetLen + sizeof ( uint32_t ) * ( peerAddress . isTLS ( ) ? 2 : 3 ) ) ;
2019-06-12 07:44:00 +08:00
}
2017-05-26 04:48:44 +08:00
ACTOR static Future < Void > connectionReader (
TransportData * transport ,
2019-01-25 09:28:26 +08:00
Reference < IConnection > conn ,
2019-08-10 02:52:12 +08:00
Reference < Peer > peer ,
Promise < Reference < Peer > > onConnected )
2017-07-18 01:40:36 +08:00
{
2017-05-26 04:48:44 +08:00
// This actor exists whenever there is an open or opening connection, whether incoming or outgoing
2019-06-12 07:44:00 +08:00
// For incoming connections conn is set and peer is initially nullptr; for outgoing connections it is the reverse
2017-05-26 04:48:44 +08:00
state Arena arena ;
2019-06-12 07:44:00 +08:00
state uint8_t * unprocessed_begin = nullptr ;
state uint8_t * unprocessed_end = nullptr ;
state uint8_t * buffer_end = nullptr ;
2017-05-26 04:48:44 +08:00
state bool expectConnectPacket = true ;
2017-07-18 01:40:36 +08:00
state bool compatible = false ;
2018-07-18 02:36:05 +08:00
state bool incompatiblePeerCounted = false ;
2018-07-28 11:42:06 +08:00
state bool incompatibleProtocolVersionNewer = false ;
2017-05-26 04:48:44 +08:00
state NetworkAddress peerAddress ;
2019-06-19 08:55:27 +08:00
state ProtocolVersion peerProtocolVersion ;
2017-05-26 04:48:44 +08:00
peerAddress = conn - > getPeerAddress ( ) ;
2019-08-10 02:52:12 +08:00
if ( ! peer ) {
2017-05-26 04:48:44 +08:00
ASSERT ( ! peerAddress . isPublic ( ) ) ;
2017-07-18 01:40:36 +08:00
}
2017-11-14 07:07:39 +08:00
try {
2017-05-26 04:48:44 +08:00
loop {
2017-11-14 07:07:39 +08:00
loop {
2019-06-15 06:07:57 +08:00
state int readAllBytes = buffer_end - unprocessed_end ;
2019-06-26 00:52:05 +08:00
if ( readAllBytes < FLOW_KNOBS - > MIN_PACKET_BUFFER_FREE_BYTES ) {
2017-11-14 07:07:39 +08:00
Arena newArena ;
2019-06-12 07:44:00 +08:00
const int unproc_len = unprocessed_end - unprocessed_begin ;
const int len = getNewBufferSize ( unprocessed_begin , unprocessed_end , peerAddress ) ;
uint8_t * const newBuffer = new ( newArena ) uint8_t [ len ] ;
2019-12-04 05:09:29 +08:00
if ( unproc_len > 0 ) {
memcpy ( newBuffer , unprocessed_begin , unproc_len ) ;
}
2017-11-14 07:07:39 +08:00
arena = newArena ;
unprocessed_begin = newBuffer ;
unprocessed_end = newBuffer + unproc_len ;
buffer_end = newBuffer + len ;
readAllBytes = buffer_end - unprocessed_end ;
}
2017-05-26 04:48:44 +08:00
2019-06-15 06:07:57 +08:00
state int totalReadBytes = 0 ;
while ( true ) {
const int len = std : : min < int > ( buffer_end - unprocessed_end , FLOW_KNOBS - > MAX_PACKET_SEND_BYTES ) ;
if ( len = = 0 ) break ;
state int readBytes = conn - > read ( unprocessed_end , unprocessed_end + len ) ;
if ( readBytes = = 0 ) break ;
2019-06-29 08:32:54 +08:00
wait ( yield ( TaskPriority : : ReadSocket ) ) ;
2019-06-15 06:07:57 +08:00
totalReadBytes + = readBytes ;
unprocessed_end + = readBytes ;
2019-05-17 08:26:48 +08:00
}
2019-06-15 06:07:57 +08:00
if ( peer ) {
peer - > bytesReceived + = totalReadBytes ;
2019-05-17 08:26:48 +08:00
}
2019-06-15 06:07:57 +08:00
if ( totalReadBytes = = 0 ) break ;
state bool readWillBlock = totalReadBytes ! = readAllBytes ;
2018-10-31 04:44:37 +08:00
2017-11-14 07:07:39 +08:00
if ( expectConnectPacket & & unprocessed_end - unprocessed_begin > = CONNECT_PACKET_V0_SIZE ) {
// At the beginning of a connection, we expect to receive a packet containing the protocol version and the listening port of the remote process
2019-03-01 08:07:49 +08:00
int32_t connectPacketSize = ( ( ConnectPacket * ) unprocessed_begin ) - > totalPacketSize ( ) ;
2017-11-14 07:07:39 +08:00
if ( unprocessed_end - unprocessed_begin > = connectPacketSize ) {
2019-06-19 05:49:04 +08:00
auto protocolVersion = ( ( ConnectPacket * ) unprocessed_begin ) - > protocolVersion ;
2019-03-01 08:07:49 +08:00
BinaryReader pktReader ( unprocessed_begin , connectPacketSize , AssumeVersion ( protocolVersion ) ) ;
ConnectPacket pkt ;
serializer ( pktReader , pkt ) ;
uint64_t connectionId = pkt . connectionId ;
2020-02-13 02:41:52 +08:00
if ( ! pkt . protocolVersion . hasObjectSerializerFlag ( ) | |
! pkt . protocolVersion . isCompatible ( currentProtocolVersion ) ) {
2019-03-01 08:07:49 +08:00
incompatibleProtocolVersionNewer = pkt . protocolVersion > currentProtocolVersion ;
NetworkAddress addr = pkt . canonicalRemotePort
? NetworkAddress ( pkt . canonicalRemoteIp ( ) , pkt . canonicalRemotePort )
2019-02-27 10:04:03 +08:00
: conn - > getPeerAddress ( ) ;
2017-11-14 07:07:39 +08:00
if ( connectionId ! = 1 ) addr . port = 0 ;
2018-10-31 04:44:37 +08:00
2017-11-14 07:07:39 +08:00
if ( ! transport - > multiVersionConnections . count ( connectionId ) ) {
if ( now ( ) - transport - > lastIncompatibleMessage > FLOW_KNOBS - > CONNECTION_REJECTED_MESSAGE_DELAY ) {
TraceEvent ( SevWarn , " ConnectionRejected " , conn - > getDebugID ( ) )
2019-02-27 10:04:03 +08:00
. detail ( " Reason " , " IncompatibleProtocolVersion " )
2019-06-19 05:49:04 +08:00
. detail ( " LocalVersion " , currentProtocolVersion . version ( ) )
. detail ( " RejectedVersion " , pkt . protocolVersion . version ( ) )
. detail ( " VersionMask " , ProtocolVersion : : compatibleProtocolVersionMask )
2019-03-01 08:07:49 +08:00
. detail ( " Peer " , pkt . canonicalRemotePort ? NetworkAddress ( pkt . canonicalRemoteIp ( ) , pkt . canonicalRemotePort )
: conn - > getPeerAddress ( ) )
2019-02-27 10:04:03 +08:00
. detail ( " ConnectionId " , connectionId ) ;
2017-11-14 07:07:39 +08:00
transport - > lastIncompatibleMessage = now ( ) ;
}
if ( ! transport - > incompatiblePeers . count ( addr ) ) {
transport - > incompatiblePeers [ addr ] = std : : make_pair ( connectionId , now ( ) ) ;
}
} else if ( connectionId > 1 ) {
transport - > multiVersionConnections [ connectionId ] = now ( ) + FLOW_KNOBS - > CONNECTION_ID_TIMEOUT ;
2017-05-26 04:48:44 +08:00
}
2017-11-14 07:07:39 +08:00
compatible = false ;
2019-06-19 05:49:04 +08:00
if ( ! protocolVersion . hasMultiVersionClient ( ) ) {
2017-11-14 07:07:39 +08:00
// Older versions expected us to hang up. It may work even if we don't hang up here, but it's safer to keep the old behavior.
throw incompatible_protocol_version ( ) ;
2017-05-26 04:48:44 +08:00
}
2020-02-13 02:41:52 +08:00
} else {
2017-11-14 07:07:39 +08:00
compatible = true ;
TraceEvent ( " ConnectionEstablished " , conn - > getDebugID ( ) )
2018-08-02 05:30:57 +08:00
. suppressFor ( 1.0 )
2017-11-14 07:07:39 +08:00
. detail ( " Peer " , conn - > getPeerAddress ( ) )
2019-08-07 00:25:40 +08:00
. detail ( " ConnectionId " , connectionId ) ;
2017-07-18 01:40:36 +08:00
}
2017-05-26 04:48:44 +08:00
2017-11-14 07:07:39 +08:00
if ( connectionId > 1 ) {
transport - > multiVersionConnections [ connectionId ] = now ( ) + FLOW_KNOBS - > CONNECTION_ID_TIMEOUT ;
}
unprocessed_begin + = connectPacketSize ;
expectConnectPacket = false ;
2019-08-10 02:52:12 +08:00
if ( peer ) {
2019-04-12 04:24:00 +08:00
peerProtocolVersion = protocolVersion ;
2017-11-14 07:07:39 +08:00
// Outgoing connection; port information should be what we expect
2019-02-27 10:04:03 +08:00
TraceEvent ( " ConnectedOutgoing " )
. suppressFor ( 1.0 )
2019-03-01 08:07:49 +08:00
. detail ( " PeerAddr " , NetworkAddress ( pkt . canonicalRemoteIp ( ) , pkt . canonicalRemotePort ) ) ;
2017-11-14 07:07:39 +08:00
peer - > compatible = compatible ;
2018-07-28 11:42:06 +08:00
peer - > incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer ;
2018-07-18 02:36:05 +08:00
if ( ! compatible ) {
2017-11-15 10:37:29 +08:00
peer - > transport - > numIncompatibleConnections + + ;
2018-07-18 02:36:05 +08:00
incompatiblePeerCounted = true ;
}
2019-03-01 08:07:49 +08:00
ASSERT ( pkt . canonicalRemotePort = = peerAddress . port ) ;
2019-04-12 04:24:00 +08:00
onConnected . send ( peer ) ;
2017-11-14 07:07:39 +08:00
} else {
2019-04-12 04:24:00 +08:00
peerProtocolVersion = protocolVersion ;
2019-03-01 08:07:49 +08:00
if ( pkt . canonicalRemotePort ) {
peerAddress = NetworkAddress ( pkt . canonicalRemoteIp ( ) , pkt . canonicalRemotePort , true ,
2019-02-27 10:04:03 +08:00
peerAddress . isTLS ( ) ) ;
2017-11-14 07:07:39 +08:00
}
2019-10-11 01:34:44 +08:00
peer = transport - > getOrOpenPeer ( peerAddress , false ) ;
2017-11-14 07:07:39 +08:00
peer - > compatible = compatible ;
2018-07-28 11:42:06 +08:00
peer - > incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer ;
2018-07-18 02:36:05 +08:00
if ( ! compatible ) {
2017-11-15 10:37:29 +08:00
peer - > transport - > numIncompatibleConnections + + ;
2018-07-18 02:36:05 +08:00
incompatiblePeerCounted = true ;
}
2017-11-14 07:07:39 +08:00
onConnected . send ( peer ) ;
2018-08-11 04:57:10 +08:00
wait ( delay ( 0 ) ) ; // Check for cancellation
2017-05-26 04:48:44 +08:00
}
}
}
2017-11-14 07:07:39 +08:00
if ( compatible ) {
scanPackets ( transport , unprocessed_begin , unprocessed_end , arena , peerAddress , peerProtocolVersion ) ;
}
else if ( ! expectConnectPacket ) {
unprocessed_begin = unprocessed_end ;
2019-05-17 08:26:48 +08:00
peer - > resetPing . trigger ( ) ;
2017-11-14 07:07:39 +08:00
}
2017-05-26 04:48:44 +08:00
2017-11-14 07:07:39 +08:00
if ( readWillBlock )
break ;
2017-05-26 04:48:44 +08:00
2019-06-25 17:47:35 +08:00
wait ( yield ( TaskPriority : : ReadSocket ) ) ;
2017-11-14 07:07:39 +08:00
}
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( conn - > onReadable ( ) ) ;
2019-06-25 17:47:35 +08:00
wait ( delay ( 0 , TaskPriority : : ReadSocket ) ) ; // We don't want to call conn->read directly from the reactor - we could get stuck in the reactor reading 1 packet at a time
2017-11-14 07:07:39 +08:00
}
}
catch ( Error & e ) {
2018-07-18 02:36:05 +08:00
if ( incompatiblePeerCounted ) {
ASSERT ( peer & & peer - > transport - > numIncompatibleConnections > 0 ) ;
2017-11-15 10:37:29 +08:00
peer - > transport - > numIncompatibleConnections - - ;
2017-11-14 07:07:39 +08:00
}
throw ;
2017-05-26 04:48:44 +08:00
}
}
ACTOR static Future < Void > connectionIncoming ( TransportData * self , Reference < IConnection > conn ) {
try {
2020-02-07 08:45:54 +08:00
wait ( conn - > acceptHandshake ( ) ) ;
2019-08-10 02:52:12 +08:00
state Promise < Reference < Peer > > onConnected ;
state Future < Void > reader = connectionReader ( self , conn , Reference < Peer > ( ) , onConnected ) ;
2017-05-26 04:48:44 +08:00
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( reader ) ) { ASSERT ( false ) ; return Void ( ) ; }
2019-08-10 02:52:12 +08:00
when ( Reference < Peer > p = wait ( onConnected . getFuture ( ) ) ) {
p - > onIncomingConnection ( p , conn , reader ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
when ( wait ( delayJittered ( FLOW_KNOBS - > CONNECTION_MONITOR_TIMEOUT ) ) ) {
2017-05-26 04:48:44 +08:00
TEST ( true ) ; // Incoming connection timed out
throw timed_out ( ) ;
}
}
return Void ( ) ;
} catch ( Error & e ) {
2018-08-02 05:30:57 +08:00
TraceEvent ( " IncomingConnectionError " , conn - > getDebugID ( ) ) . error ( e ) . suppressFor ( 1.0 ) . detail ( " FromAddress " , conn - > getPeerAddress ( ) ) ;
2017-05-26 04:48:44 +08:00
conn - > close ( ) ;
return Void ( ) ;
}
}
ACTOR static Future < Void > listen ( TransportData * self , NetworkAddress listenAddr ) {
state ActorCollectionNoErrors incoming ; // Actors monitoring incoming connections that haven't yet been associated with a peer
state Reference < IListener > listener = INetworkConnections : : net ( ) - > listen ( listenAddr ) ;
2020-03-05 08:13:59 +08:00
state uint64_t connectionCount = 0 ;
2017-05-26 04:48:44 +08:00
try {
loop {
Reference < IConnection > conn = wait ( listener - > accept ( ) ) ;
2020-02-05 06:56:40 +08:00
if ( conn ) {
TraceEvent ( " ConnectionFrom " , conn - > getDebugID ( ) ) . suppressFor ( 1.0 )
. detail ( " FromAddress " , conn - > getPeerAddress ( ) )
. detail ( " ListenAddress " , listenAddr . toString ( ) ) ;
incoming . add ( connectionIncoming ( self , conn ) ) ;
}
2020-03-05 06:24:57 +08:00
connectionCount + + ;
if ( connectionCount % ( FLOW_KNOBS - > ACCEPT_BATCH_SIZE ) = = 0 ) {
wait ( delay ( 0 , TaskPriority : : AcceptSocket ) ) ;
}
2017-05-26 04:48:44 +08:00
}
} catch ( Error & e ) {
TraceEvent ( SevError , " ListenError " ) . error ( e ) ;
throw ;
}
}
2019-10-11 01:34:44 +08:00
Reference < Peer > TransportData : : getPeer ( NetworkAddress const & address ) {
2018-07-09 01:26:41 +08:00
auto peer = peers . find ( address ) ;
if ( peer ! = peers . end ( ) ) {
return peer - > second ;
}
2019-10-11 01:34:44 +08:00
return Reference < Peer > ( ) ;
}
Reference < Peer > TransportData : : getOrOpenPeer ( NetworkAddress const & address , bool startConnectionKeeper ) {
auto peer = getPeer ( address ) ;
if ( ! peer ) {
peer = Reference < Peer > ( new Peer ( this , address ) ) ;
2020-04-17 05:04:32 +08:00
if ( startConnectionKeeper & & ! isLocalAddress ( address ) ) {
2019-10-11 01:34:44 +08:00
peer - > connect = connectionKeeper ( peer ) ;
}
peers [ address ] = peer ;
2018-07-09 01:26:41 +08:00
}
2019-10-11 01:34:44 +08:00
return peer ;
2017-05-26 04:48:44 +08:00
}
2018-10-31 04:44:37 +08:00
bool TransportData : : isLocalAddress ( const NetworkAddress & address ) const {
2019-03-24 08:54:46 +08:00
return address = = localAddresses . address | | ( localAddresses . secondaryAddress . present ( ) & & address = = localAddresses . secondaryAddress . get ( ) ) ;
2018-10-31 04:44:37 +08:00
}
2017-05-26 04:48:44 +08:00
ACTOR static Future < Void > multiVersionCleanupWorker ( TransportData * self ) {
loop {
2018-08-11 04:57:10 +08:00
wait ( delay ( FLOW_KNOBS - > CONNECTION_CLEANUP_DELAY ) ) ;
2020-04-06 14:09:36 +08:00
bool foundIncompatible = false ;
2017-05-26 04:48:44 +08:00
for ( auto it = self - > incompatiblePeers . begin ( ) ; it ! = self - > incompatiblePeers . end ( ) ; ) {
if ( self - > multiVersionConnections . count ( it - > second . first ) ) {
it = self - > incompatiblePeers . erase ( it ) ;
} else {
2020-04-11 08:02:11 +08:00
if ( now ( ) - it - > second . second > FLOW_KNOBS - > INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING ) {
2020-04-06 14:09:36 +08:00
foundIncompatible = true ;
}
2017-05-26 04:48:44 +08:00
it + + ;
}
}
for ( auto it = self - > multiVersionConnections . begin ( ) ; it ! = self - > multiVersionConnections . end ( ) ; ) {
if ( it - > second < now ( ) ) {
it = self - > multiVersionConnections . erase ( it ) ;
} else {
it + + ;
}
}
2020-04-06 14:09:36 +08:00
if ( foundIncompatible ) {
self - > incompatiblePeersChanged . trigger ( ) ;
}
2017-05-26 04:48:44 +08:00
}
}
FlowTransport : : FlowTransport ( uint64_t transportId ) : self ( new TransportData ( transportId ) ) {
self - > multiVersionCleanup = multiVersionCleanupWorker ( self ) ;
}
FlowTransport : : ~ FlowTransport ( ) { delete self ; }
void FlowTransport : : initMetrics ( ) { self - > initMetrics ( ) ; }
2019-02-01 10:20:14 +08:00
NetworkAddressList FlowTransport : : getLocalAddresses ( ) const {
return self - > localAddresses ;
}
2018-10-31 04:44:37 +08:00
NetworkAddress FlowTransport : : getLocalAddress ( ) const {
2019-03-24 08:54:46 +08:00
return self - > localAddresses . address ;
2018-10-31 04:44:37 +08:00
}
2017-05-26 04:48:44 +08:00
2018-10-31 04:44:37 +08:00
std : : map < NetworkAddress , std : : pair < uint64_t , double > > * FlowTransport : : getIncompatiblePeers ( ) {
2017-05-26 04:48:44 +08:00
for ( auto it = self - > incompatiblePeers . begin ( ) ; it ! = self - > incompatiblePeers . end ( ) ; ) {
if ( self - > multiVersionConnections . count ( it - > second . first ) ) {
it = self - > incompatiblePeers . erase ( it ) ;
} else {
it + + ;
}
}
2018-10-31 04:44:37 +08:00
return & self - > incompatiblePeers ;
2017-05-26 04:48:44 +08:00
}
2020-04-06 14:09:36 +08:00
Future < Void > FlowTransport : : onIncompatibleChanged ( ) {
return self - > incompatiblePeersChanged . onTrigger ( ) ;
}
2017-05-26 04:48:44 +08:00
Future < Void > FlowTransport : : bind ( NetworkAddress publicAddress , NetworkAddress listenAddress ) {
ASSERT ( publicAddress . isPublic ( ) ) ;
2019-03-24 08:54:46 +08:00
if ( self - > localAddresses . address = = NetworkAddress ( ) ) {
self - > localAddresses . address = publicAddress ;
} else {
self - > localAddresses . secondaryAddress = publicAddress ;
}
2017-05-26 04:48:44 +08:00
TraceEvent ( " Binding " ) . detail ( " PublicAddress " , publicAddress ) . detail ( " ListenAddress " , listenAddress ) ;
2018-10-31 04:44:37 +08:00
Future < Void > listenF = listen ( self , listenAddress ) ;
2019-02-01 10:20:14 +08:00
self - > listeners . push_back ( listenF ) ;
2018-10-31 04:44:37 +08:00
return listenF ;
2017-05-26 04:48:44 +08:00
}
2019-03-27 03:05:43 +08:00
Endpoint FlowTransport : : loadedEndpoint ( const UID & token ) {
return Endpoint ( g_currentDeliveryPeerAddress , token ) ;
2017-05-26 04:48:44 +08:00
}
2019-06-25 08:37:57 +08:00
void FlowTransport : : addPeerReference ( const Endpoint & endpoint , bool isStream ) {
2019-07-10 05:57:38 +08:00
if ( ! isStream | | ! endpoint . getPrimaryAddress ( ) . isValid ( ) )
return ;
2019-10-11 01:34:44 +08:00
Reference < Peer > peer = self - > getOrOpenPeer ( endpoint . getPrimaryAddress ( ) ) ;
2020-04-23 10:38:01 +08:00
if ( peer - > peerReferences = = - 1 ) {
2019-01-09 23:41:02 +08:00
peer - > peerReferences = 1 ;
} else {
peer - > peerReferences + + ;
2018-07-11 04:10:29 +08:00
}
}
2019-06-25 08:37:57 +08:00
void FlowTransport : : removePeerReference ( const Endpoint & endpoint , bool isStream ) {
if ( ! isStream | | ! endpoint . getPrimaryAddress ( ) . isValid ( ) ) return ;
2019-10-11 01:34:44 +08:00
Reference < Peer > peer = self - > getPeer ( endpoint . getPrimaryAddress ( ) ) ;
2019-01-09 23:41:02 +08:00
if ( peer ) {
peer - > peerReferences - - ;
if ( peer - > peerReferences < 0 ) {
TraceEvent ( SevError , " InvalidPeerReferences " )
. detail ( " References " , peer - > peerReferences )
. detail ( " Address " , endpoint . getPrimaryAddress ( ) )
. detail ( " Token " , endpoint . token ) ;
}
2020-02-20 07:17:50 +08:00
if ( peer - > peerReferences = = 0 & & peer - > reliable . empty ( ) & & peer - > unsent . empty ( ) & & peer - > outstandingReplies = = 0 & & peer - > lastDataPacketSentTime < now ( ) - FLOW_KNOBS - > CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY ) {
2019-05-17 08:26:48 +08:00
peer - > resetPing . trigger ( ) ;
2018-07-11 04:10:29 +08:00
}
}
}
2020-04-13 13:18:51 +08:00
void FlowTransport : : addEndpoint ( Endpoint & endpoint , NetworkMessageReceiver * receiver , TaskPriority taskID ) {
endpoint . token = deterministicRandom ( ) - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
if ( receiver - > isStream ( ) ) {
2019-03-24 08:54:46 +08:00
endpoint . addresses = self - > localAddresses ;
2017-05-26 04:48:44 +08:00
endpoint . token = UID ( endpoint . token . first ( ) | TOKEN_STREAM_FLAG , endpoint . token . second ( ) ) ;
} else {
2019-03-24 08:54:46 +08:00
endpoint . addresses = NetworkAddressList ( ) ;
2017-05-26 04:48:44 +08:00
endpoint . token = UID ( endpoint . token . first ( ) & ~ TOKEN_STREAM_FLAG , endpoint . token . second ( ) ) ;
}
2019-07-04 12:03:58 +08:00
self - > endpoints . insert ( receiver , endpoint . token , taskID ) ;
2017-05-26 04:48:44 +08:00
}
2020-05-21 04:52:22 +08:00
void FlowTransport : : addEndpoints ( std : : vector < std : : pair < FlowReceiver * , TaskPriority > > const & streams ) {
self - > endpoints . insert ( self - > localAddresses , streams ) ;
2020-04-13 13:18:51 +08:00
}
2017-05-26 04:48:44 +08:00
void FlowTransport : : removeEndpoint ( const Endpoint & endpoint , NetworkMessageReceiver * receiver ) {
self - > endpoints . remove ( endpoint . token , receiver ) ;
}
2019-06-25 17:47:35 +08:00
void FlowTransport : : addWellKnownEndpoint ( Endpoint & endpoint , NetworkMessageReceiver * receiver , TaskPriority taskID ) {
2019-03-24 08:54:46 +08:00
endpoint . addresses = self - > localAddresses ;
2017-05-26 04:48:44 +08:00
ASSERT ( ( ( endpoint . token . first ( ) & TOKEN_STREAM_FLAG ) ! = 0 ) = = receiver - > isStream ( ) ) ;
Endpoint : : Token otoken = endpoint . token ;
2019-07-04 12:03:58 +08:00
self - > endpoints . insert ( receiver , endpoint . token , taskID ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( endpoint . token = = otoken ) ;
}
2019-08-30 07:49:57 +08:00
static void sendLocal ( TransportData * self , ISerializeSource const & what , const Endpoint & destination ) {
TEST ( true ) ; // "Loopback" delivery
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
Standalone < StringRef > copy ;
2020-02-13 02:41:52 +08:00
ObjectWriter wr ( AssumeVersion ( currentProtocolVersion ) ) ;
what . serializeObjectWriter ( wr ) ;
copy = wr . toStringRef ( ) ;
2017-10-28 07:54:44 +08:00
# if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED ( copy . begin ( ) , copy . size ( ) ) ;
# endif
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
ASSERT ( copy . size ( ) > 0 ) ;
deliver ( self , destination , ArenaReader ( copy . arena ( ) , copy , AssumeVersion ( currentProtocolVersion ) ) , false ) ;
}
2017-05-26 04:48:44 +08:00
2020-03-20 14:21:41 +08:00
static ReliablePacket * sendPacket ( TransportData * self , Reference < Peer > peer , ISerializeSource const & what ,
const Endpoint & destination , bool reliable ) {
2019-08-30 07:49:57 +08:00
const bool checksumEnabled = ! destination . getPrimaryAddress ( ) . isTLS ( ) ;
+ + self - > countPacketsGenerated ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if ( ! peer | | ( peer - > outgoingConnectionIdle & & ! destination . getPrimaryAddress ( ) . isPublic ( ) ) | | ( peer - > incompatibleProtocolVersionNewer & & destination . token ! = WLTOKEN_PING_PACKET ) ) {
TEST ( true ) ; // Can't send to private address without a compatible open connection
return nullptr ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
bool firstUnsent = peer - > unsent . empty ( ) ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
PacketBuffer * pb = peer - > unsent . getWriteBuffer ( ) ;
ReliablePacket * rp = reliable ? new ReliablePacket : 0 ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
int prevBytesWritten = pb - > bytes_written ;
PacketBuffer * checksumPb = pb ;
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
PacketWriter wr ( pb , rp , AssumeVersion ( currentProtocolVersion ) ) ; // SOMEDAY: Can we downgrade to talk to older peers?
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
// Reserve some space for packet length and checksum, write them after serializing data
SplitBuffer packetInfoBuffer ;
uint32_t len , checksum = 0 ;
int packetInfoSize = sizeof ( len ) ;
if ( checksumEnabled ) {
packetInfoSize + = sizeof ( checksum ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
wr . writeAhead ( packetInfoSize , & packetInfoBuffer ) ;
wr < < destination . token ;
2020-02-13 02:41:52 +08:00
what . serializePacketWriter ( wr ) ;
2019-08-30 07:49:57 +08:00
pb = wr . finish ( ) ;
len = wr . size ( ) - packetInfoSize ;
if ( checksumEnabled ) {
// Find the correct place to start calculating checksum
uint32_t checksumUnprocessedLength = len ;
prevBytesWritten + = packetInfoSize ;
if ( prevBytesWritten > = checksumPb - > bytes_written ) {
prevBytesWritten - = checksumPb - > bytes_written ;
checksumPb = checksumPb - > nextPacketBuffer ( ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
// Checksum calculation
while ( checksumUnprocessedLength > 0 ) {
uint32_t processLength =
std : : min ( checksumUnprocessedLength , ( uint32_t ) ( checksumPb - > bytes_written - prevBytesWritten ) ) ;
checksum = crc32c_append ( checksum , checksumPb - > data ( ) + prevBytesWritten , processLength ) ;
checksumUnprocessedLength - = processLength ;
checksumPb = checksumPb - > nextPacketBuffer ( ) ;
prevBytesWritten = 0 ;
2017-05-27 06:28:52 +08:00
}
2019-08-30 07:49:57 +08:00
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
// Write packet length and checksum into packet buffer
packetInfoBuffer . write ( & len , sizeof ( len ) ) ;
if ( checksumEnabled ) {
packetInfoBuffer . write ( & checksum , sizeof ( checksum ) , sizeof ( len ) ) ;
}
2017-05-26 04:48:44 +08:00
2019-08-30 07:49:57 +08:00
if ( len > FLOW_KNOBS - > PACKET_LIMIT ) {
2019-11-13 01:23:46 +08:00
TraceEvent ( SevError , " PacketLimitExceeded " ) . detail ( " ToPeer " , destination . getPrimaryAddress ( ) ) . detail ( " Length " , ( int ) len ) ;
2019-08-30 07:49:57 +08:00
// throw platform_error(); // FIXME: How to recover from this situation?
}
else if ( len > FLOW_KNOBS - > PACKET_WARNING ) {
2019-11-13 01:23:46 +08:00
TraceEvent ( self - > warnAlwaysForLargePacket ? SevWarnAlways : SevWarn , " LargePacketSent " )
2019-08-30 07:49:57 +08:00
. suppressFor ( 1.0 )
. detail ( " ToPeer " , destination . getPrimaryAddress ( ) )
. detail ( " Length " , ( int ) len )
. detail ( " Token " , destination . token )
. backtrace ( ) ;
if ( g_network - > isSimulated ( ) )
self - > warnAlwaysForLargePacket = false ;
}
2017-05-26 04:48:44 +08:00
2017-10-28 07:54:44 +08:00
# if VALGRIND
2019-08-30 07:49:57 +08:00
SendBuffer * checkbuf = pb ;
while ( checkbuf ) {
int size = checkbuf - > bytes_written ;
2020-07-09 01:23:32 +08:00
const uint8_t * data = checkbuf - > data ( ) ;
2019-08-30 07:49:57 +08:00
VALGRIND_CHECK_MEM_IS_DEFINED ( data , size ) ;
checkbuf = checkbuf - > next ;
}
2017-10-28 07:54:44 +08:00
# endif
2019-08-30 07:49:57 +08:00
peer - > send ( pb , rp , firstUnsent ) ;
if ( destination . token ! = WLTOKEN_PING_PACKET ) {
peer - > lastDataPacketSentTime = now ( ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
return rp ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
ReliablePacket * FlowTransport : : sendReliable ( ISerializeSource const & what , const Endpoint & destination ) {
if ( self - > isLocalAddress ( destination . getPrimaryAddress ( ) ) ) {
sendLocal ( self , what , destination ) ;
return nullptr ;
}
2019-10-11 01:34:44 +08:00
Reference < Peer > peer = self - > getOrOpenPeer ( destination . getPrimaryAddress ( ) ) ;
2019-08-30 07:49:57 +08:00
return sendPacket ( self , peer , what , destination , true ) ;
2017-05-26 04:48:44 +08:00
}
2019-08-30 07:49:57 +08:00
void FlowTransport : : cancelReliable ( ReliablePacket * p ) {
2017-05-26 04:48:44 +08:00
if ( p ) p - > remove ( ) ;
// SOMEDAY: Call reliable.compact() if a lot of memory is wasted in PacketBuffers by formerly reliable packets mixed with a few reliable ones. Don't forget to delref the new PacketBuffers since they are unsent.
}
2019-08-30 07:49:57 +08:00
Reference < Peer > FlowTransport : : sendUnreliable ( ISerializeSource const & what , const Endpoint & destination , bool openConnection ) {
if ( self - > isLocalAddress ( destination . getPrimaryAddress ( ) ) ) {
sendLocal ( self , what , destination ) ;
return Reference < Peer > ( ) ;
}
2019-10-11 01:34:44 +08:00
Reference < Peer > peer ;
if ( openConnection ) {
peer = self - > getOrOpenPeer ( destination . getPrimaryAddress ( ) ) ;
}
else {
peer = self - > getPeer ( destination . getPrimaryAddress ( ) ) ;
}
2019-08-30 07:49:57 +08:00
sendPacket ( self , peer , what , destination , false ) ;
return peer ;
2017-05-26 04:48:44 +08:00
}
2019-04-05 05:11:12 +08:00
Reference < AsyncVar < bool > > FlowTransport : : getDegraded ( ) {
return self - > degraded ;
}
2020-07-10 13:50:47 +08:00
void FlowTransport : : resetConnection ( NetworkAddress address ) {
auto peer = self - > getPeer ( address ) ;
if ( peer ) {
peer - > resetConnection . trigger ( ) ;
}
}
2017-11-10 03:20:35 +08:00
bool FlowTransport : : incompatibleOutgoingConnectionsPresent ( ) {
2019-03-16 01:34:57 +08:00
return self - > numIncompatibleConnections > 0 ;
2017-11-10 03:20:35 +08:00
}
2019-05-30 04:43:21 +08:00
void FlowTransport : : createInstance ( bool isClient , uint64_t transportId ) {
2017-05-26 04:48:44 +08:00
g_network - > setGlobal ( INetwork : : enFlowTransport , ( flowGlobalType ) new FlowTransport ( transportId ) ) ;
g_network - > setGlobal ( INetwork : : enNetworkAddressFunc , ( flowGlobalType ) & FlowTransport : : getGlobalLocalAddress ) ;
2019-02-01 10:20:14 +08:00
g_network - > setGlobal ( INetwork : : enNetworkAddressesFunc , ( flowGlobalType ) & FlowTransport : : getGlobalLocalAddresses ) ;
2020-05-12 02:24:19 +08:00
g_network - > setGlobal ( INetwork : : enFailureMonitor , ( flowGlobalType ) new SimpleFailureMonitor ( ) ) ;
g_network - > setGlobal ( INetwork : : enClientFailureMonitor , isClient ? ( flowGlobalType ) 1 : nullptr ) ;
2017-05-26 04:48:44 +08:00
}
2020-02-18 13:54:05 +08:00
HealthMonitor * FlowTransport : : healthMonitor ( ) {
return & self - > healthMonitor ;
2017-05-26 04:48:44 +08:00
}