Merge pull request #2048 from etschannen/feature-fix-connections

Fixed two different ways useful connections were being closed
This commit is contained in:
A.J. Beamon 2019-08-30 11:05:02 -07:00 committed by GitHub
commit 1fdabe62c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 487 additions and 452 deletions

View File

@ -288,353 +288,331 @@ struct ConnectPacket {
ACTOR static Future<Void> connectionReader(TransportData* transport, Reference<IConnection> conn, Reference<struct Peer> peer, ACTOR static Future<Void> connectionReader(TransportData* transport, Reference<IConnection> conn, Reference<struct Peer> peer,
Promise<Reference<struct Peer>> onConnected); Promise<Reference<struct Peer>> onConnected);
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ); static void sendLocal( TransportData* self, ISerializeSource const& what, const Endpoint& destination );
static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, ISerializeSource const& what, const Endpoint& destination, bool reliable );
struct Peer : public ReferenceCounted<Peer> { ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
TransportData* transport; state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
NetworkAddress destination; loop {
UnsentPacketQueue unsent; if (!FlowTransport::transport().isClient() && !peer->destination.isPublic() && peer->compatible) {
ReliablePacketList reliable; // Don't send ping messages to clients unless necessary. Instead monitor incoming client pings.
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false state double lastRefreshed = now();
Future<Void> connect; state int64_t lastBytesReceived = peer->bytesReceived;
AsyncTrigger resetPing;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
double reconnectionDelay;
int peerReferences;
bool incompatibleProtocolVersionNewer;
int64_t bytesReceived;
double lastDataPacketSentTime;
explicit Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {}
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
unsent.setWriteBuffer(pb);
if (rp) reliable.insert(rp);
if (firstUnsent) dataToSend.trigger();
}
void prependConnectPacket() {
// Send the ConnectPacket expected at the beginning of a new connection
ConnectPacket pkt;
if(transport->localAddresses.address.isTLS() == destination.isTLS()) {
pkt.canonicalRemotePort = transport->localAddresses.address.port;
pkt.setCanonicalRemoteIp(transport->localAddresses.address.ip);
} else if(transport->localAddresses.secondaryAddress.present()) {
pkt.canonicalRemotePort = transport->localAddresses.secondaryAddress.get().port;
pkt.setCanonicalRemoteIp(transport->localAddresses.secondaryAddress.get().ip);
} else {
// a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
pkt.canonicalRemotePort = 0;
pkt.setCanonicalRemoteIp(IPAddress(0));
}
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
pkt.protocolVersion = currentProtocolVersion;
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
pkt.protocolVersion.addObjectSerializerFlag();
}
pkt.connectionId = transport->transportId;
PacketBuffer* pb_first = PacketBuffer::create();
PacketWriter wr( pb_first, nullptr, Unversioned() );
pkt.serialize(wr);
unsent.prependWriteBuffer(pb_first, wr.finish());
}
void discardUnreliablePackets() {
// Throw away the current unsent list, dropping the reference count on each PacketBuffer that accounts for presence in the unsent list
unsent.discardAll();
// If there are reliable packets, compact reliable packets into a new unsent range
if(!reliable.empty()) {
PacketBuffer* pb = unsent.getWriteBuffer();
pb = reliable.compact(pb, nullptr);
unsent.setWriteBuffer(pb);
}
}
void onIncomingConnection( Reference<Peer> self, Reference<IConnection> conn, Future<Void> reader ) {
// In case two processes are trying to connect to each other simultaneously, the process with the larger canonical NetworkAddress
// gets to keep its outgoing connection.
if ( !destination.isPublic() && !outgoingConnectionIdle ) throw address_in_use();
NetworkAddress compatibleAddr = transport->localAddresses.address;
if(transport->localAddresses.secondaryAddress.present() && transport->localAddresses.secondaryAddress.get().isTLS() == destination.isTLS()) {
compatibleAddr = transport->localAddresses.secondaryAddress.get();
}
if ( !destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr ) {
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress())
.detail("CanonicalAddr", destination)
.detail("IsPublic", destination.isPublic());
connect.cancel();
prependConnectPacket();
connect = connectionKeeper( self, conn, reader );
} else {
TraceEvent("RedundantConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress().toString())
.detail("CanonicalAddr", destination)
.detail("LocalAddr", compatibleAddr);
// Keep our prior connection
reader.cancel();
conn->close();
// Send an (ignored) packet to make sure that, if our outgoing connection died before the peer made this connection attempt,
// we eventually find out that our connection is dead, close it, and then respond to the next connection reattempt from peer.
}
}
ACTOR static Future<Void> connectionMonitor( Reference<Peer> peer ) {
state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
loop {
if (!FlowTransport::transport().isClient() && !peer->destination.isPublic()) {
// Don't send ping messages to clients unless necessary. Instead monitor incoming client pings.
state double lastRefreshed = now();
state int64_t lastBytesReceived = peer->bytesReceived;
loop {
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
if (lastBytesReceived < peer->bytesReceived) {
lastRefreshed = now();
lastBytesReceived = peer->bytesReceived;
} else if (lastRefreshed < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT *
FLOW_KNOBS->CONNECTION_MONITOR_INCOMING_IDLE_MULTIPLIER) {
// If we have not received anything in this period, client must have closed
// connection by now. Break loop to check if it is still alive by sending a ping.
break;
}
}
}
//We cannot let an error be thrown from connectionMonitor while still on the stack from scanPackets in connectionReader
//because then it would not call the destructor of connectionReader when connectionReader is cancelled.
wait(delay(0));
if (peer->reliable.empty() && peer->unsent.empty()) {
if (peer->peerReferences == 0 &&
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY)) {
// TODO: What about when peerReference == -1?
throw connection_unreferenced();
} else if (FlowTransport::transport().isClient() && peer->compatible && peer->destination.isPublic() &&
(peer->lastConnectTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT) &&
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT)) {
// First condition is necessary because we may get here if we are server.
throw connection_idle();
}
}
wait (delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
// TODO: Stop monitoring and close the connection with no onDisconnect requests outstanding
state ReplyPromise<Void> reply;
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint );
state int64_t startingBytes = peer->bytesReceived;
state int timeouts = 0;
loop { loop {
choose { wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { if (lastBytesReceived < peer->bytesReceived) {
if(startingBytes == peer->bytesReceived) { lastRefreshed = now();
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination); lastBytesReceived = peer->bytesReceived;
throw connection_failed(); } else if (lastRefreshed < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT *
} FLOW_KNOBS->CONNECTION_MONITOR_INCOMING_IDLE_MULTIPLIER) {
if(timeouts > 1) { // If we have not received anything in this period, client must have closed
TraceEvent(SevWarnAlways, "ConnectionSlowPing") // connection by now. Break loop to check if it is still alive by sending a ping.
.suppressFor(1.0) break;
.detail("WithAddr", peer->destination)
.detail("Timeouts", timeouts);
}
startingBytes = peer->bytesReceived;
timeouts++;
}
when (wait( reply.getFuture() )) {
break;
}
when (wait( peer->resetPing.onTrigger())) {
break;
}
} }
} }
} }
}
ACTOR static Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection> conn ) { //We cannot let an error be thrown from connectionMonitor while still on the stack from scanPackets in connectionReader
state double lastWriteTime = now(); //because then it would not call the destructor of connectionReader when connectionReader is cancelled.
loop { wait(delay(0));
//wait( delay(0, TaskPriority::WriteSocket) );
wait( delayJittered(std::max<double>(FLOW_KNOBS->MIN_COALESCE_DELAY, FLOW_KNOBS->MAX_COALESCE_DELAY - (now() - lastWriteTime)), TaskPriority::WriteSocket) );
//wait( delay(500e-6, TaskPriority::WriteSocket) );
//wait( yield(TaskPriority::WriteSocket) );
// Send until there is nothing left to send if (peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0) {
loop { if (peer->peerReferences == 0 &&
lastWriteTime = now(); (peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY)) {
// TODO: What about when peerReference == -1?
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES); throw connection_unreferenced();
if (sent) { } else if (FlowTransport::transport().isClient() && peer->compatible && peer->destination.isPublic() &&
self->transport->bytesSent += sent; (peer->lastConnectTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT) &&
self->unsent.sent(sent); (peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT)) {
} // First condition is necessary because we may get here if we are server.
if (self->unsent.empty()) break; throw connection_idle();
TEST(true); // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull.
wait( conn->onWritable() );
wait( yield(TaskPriority::WriteSocket) );
} }
// Wait until there is something to send
while ( self->unsent.empty() )
wait( self->dataToSend.onTrigger() );
} }
}
ACTOR static Future<Void> connectionKeeper( Reference<Peer> self, wait (delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME));
Reference<IConnection> conn = Reference<IConnection>(),
Future<Void> reader = Void()) {
TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID())
.detail("PeerAddr", self->destination)
.detail("ConnSet", (bool)conn);
// This is used only at client side and is used to override waiting for unsent data to update failure monitoring // TODO: Stop monitoring and close the connection with no onDisconnect requests outstanding
// status. At client, if an existing connection fails, we retry making a connection and if that fails, then only state ReplyPromise<Void> reply;
// we report that address as failed. FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint, true );
state bool clientReconnectDelay = false; state int64_t startingBytes = peer->bytesReceived;
state int timeouts = 0;
loop { loop {
try { choose {
if (!conn) { // Always, except for the first loop with an incoming connection when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
self->outgoingConnectionIdle = true; if(startingBytes == peer->bytesReceived) {
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination);
// Wait until there is something to send.
while (self->unsent.empty()) {
if (FlowTransport::transport().isClient() && self->destination.isPublic() &&
clientReconnectDelay) {
break;
}
wait(self->dataToSend.onTrigger());
}
ASSERT( self->destination.isPublic() );
self->outgoingConnectionIdle = false;
wait(delayJittered(
std::max(0.0, self->lastConnectTime + self->reconnectionDelay -
now()))); // Don't connect() to the same peer more than once per 2 sec
self->lastConnectTime = now();
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
self->prependConnectPacket();
}
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
throw connection_failed(); throw connection_failed();
} }
if(timeouts > 1) {
reader = connectionReader( self->transport, conn, self, Promise<Reference<Peer>>()); TraceEvent(SevWarnAlways, "ConnectionSlowPing")
} else { .suppressFor(1.0)
self->outgoingConnectionIdle = false; .detail("WithAddr", peer->destination)
} .detail("Timeouts", timeouts);
try {
self->transport->countConnEstablished++;
wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) );
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed))
self->transport->countConnClosedWithoutError++;
else
self->transport->countConnClosedWithError++;
throw e;
}
ASSERT( false );
} catch (Error& e) {
if(now() - self->lastConnectTime > FLOW_KNOBS->RECONNECTION_RESET_TIME) {
self->reconnectionDelay = FLOW_KNOBS->INITIAL_RECONNECTION_TIME;
} else {
self->reconnectionDelay = std::min(FLOW_KNOBS->MAX_RECONNECTION_TIME, self->reconnectionDelay * FLOW_KNOBS->RECONNECTION_TIME_GROWTH_RATE);
}
self->discardUnreliablePackets();
reader = Future<Void>();
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced || e.code() == error_code_connection_idle ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed);
if(self->compatible) {
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID())
.error(e, true)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
}
else {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed",
conn ? conn->getDebugID() : UID())
.error(e, true)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
it.first = now();
} else if(now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) {
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID())
.suppressFor(5.0)
.detail("PeerAddr", self->destination);
self->transport->degraded->set(true);
} }
it.second = now(); startingBytes = peer->bytesReceived;
timeouts++;
} }
when (wait( reply.getFuture() )) {
if (conn) { break;
if (FlowTransport::transport().isClient() && e.code() != error_code_connection_idle) {
clientReconnectDelay = true;
}
conn->close();
conn = Reference<IConnection>();
} }
when (wait( peer->resetPing.onTrigger())) {
// Clients might send more packets in response, which needs to go out on the next connection break;
IFailureMonitor::failureMonitor().notifyDisconnect( self->destination );
if (e.code() == error_code_actor_cancelled) throw;
// Try to recover, even from serious errors, by retrying
if(self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty()) {
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
return Void();
} }
} }
} }
} }
}; }
ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection> conn ) {
state double lastWriteTime = now();
loop {
//wait( delay(0, TaskPriority::WriteSocket) );
wait( delayJittered(std::max<double>(FLOW_KNOBS->MIN_COALESCE_DELAY, FLOW_KNOBS->MAX_COALESCE_DELAY - (now() - lastWriteTime)), TaskPriority::WriteSocket) );
//wait( delay(500e-6, TaskPriority::WriteSocket) );
//wait( yield(TaskPriority::WriteSocket) );
// Send until there is nothing left to send
loop {
lastWriteTime = now();
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent) {
self->transport->bytesSent += sent;
self->unsent.sent(sent);
}
if (self->unsent.empty()) break;
TEST(true); // We didn't write everything, so apparently the write buffer is full. Wait for it to be nonfull.
wait( conn->onWritable() );
wait( yield(TaskPriority::WriteSocket) );
}
// Wait until there is something to send
while ( self->unsent.empty() )
wait( self->dataToSend.onTrigger() );
}
}
ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
Reference<IConnection> conn = Reference<IConnection>(),
Future<Void> reader = Void()) {
TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID())
.detail("PeerAddr", self->destination)
.detail("ConnSet", (bool)conn);
// This is used only at client side and is used to override waiting for unsent data to update failure monitoring
// status. At client, if an existing connection fails, we retry making a connection and if that fails, then only
// we report that address as failed.
state bool clientReconnectDelay = false;
loop {
try {
if (!conn) { // Always, except for the first loop with an incoming connection
self->outgoingConnectionIdle = true;
// Wait until there is something to send.
while (self->unsent.empty()) {
if (FlowTransport::transport().isClient() && self->destination.isPublic() &&
clientReconnectDelay) {
break;
}
wait(self->dataToSend.onTrigger());
}
ASSERT( self->destination.isPublic() );
self->outgoingConnectionIdle = false;
wait(delayJittered(
std::max(0.0, self->lastConnectTime + self->reconnectionDelay -
now()))); // Don't connect() to the same peer more than once per 2 sec
self->lastConnectTime = now();
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
self->prependConnectPacket();
}
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
throw connection_failed();
}
reader = connectionReader( self->transport, conn, self, Promise<Reference<Peer>>());
} else {
self->outgoingConnectionIdle = false;
}
try {
self->transport->countConnEstablished++;
wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) );
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed))
self->transport->countConnClosedWithoutError++;
else
self->transport->countConnClosedWithError++;
throw e;
}
ASSERT( false );
} catch (Error& e) {
if(now() - self->lastConnectTime > FLOW_KNOBS->RECONNECTION_RESET_TIME) {
self->reconnectionDelay = FLOW_KNOBS->INITIAL_RECONNECTION_TIME;
} else {
self->reconnectionDelay = std::min(FLOW_KNOBS->MAX_RECONNECTION_TIME, self->reconnectionDelay * FLOW_KNOBS->RECONNECTION_TIME_GROWTH_RATE);
}
self->discardUnreliablePackets();
reader = Future<Void>();
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced || e.code() == error_code_connection_idle ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed);
if(self->compatible) {
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID())
.error(e, true)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
}
else {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed",
conn ? conn->getDebugID() : UID())
.error(e, true)
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
it.first = now();
} else if(now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) {
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID())
.suppressFor(5.0)
.detail("PeerAddr", self->destination);
self->transport->degraded->set(true);
}
it.second = now();
}
if (conn) {
if (FlowTransport::transport().isClient() && e.code() != error_code_connection_idle) {
clientReconnectDelay = true;
}
conn->close();
conn = Reference<IConnection>();
}
// Clients might send more packets in response, which needs to go out on the next connection
IFailureMonitor::failureMonitor().notifyDisconnect( self->destination );
if (e.code() == error_code_actor_cancelled) throw;
// Try to recover, even from serious errors, by retrying
if(self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty() && self->outstandingReplies==0) {
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
return Void();
}
}
}
}
void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
unsent.setWriteBuffer(pb);
if (rp) reliable.insert(rp);
if (firstUnsent) dataToSend.trigger();
}
void Peer::prependConnectPacket() {
// Send the ConnectPacket expected at the beginning of a new connection
ConnectPacket pkt;
if(transport->localAddresses.address.isTLS() == destination.isTLS()) {
pkt.canonicalRemotePort = transport->localAddresses.address.port;
pkt.setCanonicalRemoteIp(transport->localAddresses.address.ip);
} else if(transport->localAddresses.secondaryAddress.present()) {
pkt.canonicalRemotePort = transport->localAddresses.secondaryAddress.get().port;
pkt.setCanonicalRemoteIp(transport->localAddresses.secondaryAddress.get().ip);
} else {
// a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it
pkt.canonicalRemotePort = 0;
pkt.setCanonicalRemoteIp(IPAddress(0));
}
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
pkt.protocolVersion = currentProtocolVersion;
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
pkt.protocolVersion.addObjectSerializerFlag();
}
pkt.connectionId = transport->transportId;
PacketBuffer* pb_first = PacketBuffer::create();
PacketWriter wr( pb_first, nullptr, Unversioned() );
pkt.serialize(wr);
unsent.prependWriteBuffer(pb_first, wr.finish());
}
void Peer::discardUnreliablePackets() {
// Throw away the current unsent list, dropping the reference count on each PacketBuffer that accounts for presence in the unsent list
unsent.discardAll();
// If there are reliable packets, compact reliable packets into a new unsent range
if(!reliable.empty()) {
PacketBuffer* pb = unsent.getWriteBuffer();
pb = reliable.compact(pb, nullptr);
unsent.setWriteBuffer(pb);
}
}
void Peer::onIncomingConnection( Reference<Peer> self, Reference<IConnection> conn, Future<Void> reader ) {
// In case two processes are trying to connect to each other simultaneously, the process with the larger canonical NetworkAddress
// gets to keep its outgoing connection.
if ( !destination.isPublic() && !outgoingConnectionIdle ) throw address_in_use();
NetworkAddress compatibleAddr = transport->localAddresses.address;
if(transport->localAddresses.secondaryAddress.present() && transport->localAddresses.secondaryAddress.get().isTLS() == destination.isTLS()) {
compatibleAddr = transport->localAddresses.secondaryAddress.get();
}
if ( !destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr ) {
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress())
.detail("CanonicalAddr", destination)
.detail("IsPublic", destination.isPublic());
connect.cancel();
prependConnectPacket();
connect = connectionKeeper( self, conn, reader );
} else {
TraceEvent("RedundantConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress().toString())
.detail("CanonicalAddr", destination)
.detail("LocalAddr", compatibleAddr);
// Keep our prior connection
reader.cancel();
conn->close();
// Send an (ignored) packet to make sure that, if our outgoing connection died before the peer made this connection attempt,
// we eventually find out that our connection is dead, close it, and then respond to the next connection reattempt from peer.
}
}
TransportData::~TransportData() { TransportData::~TransportData() {
for(auto &p : peers) { for(auto &p : peers) {
@ -671,9 +649,12 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
} else if (destination.token.first() & TOKEN_STREAM_FLAG) { } else if (destination.token.first() & TOKEN_STREAM_FLAG) {
// We don't have the (stream) endpoint 'token', notify the remote machine // We don't have the (stream) endpoint 'token', notify the remote machine
if (destination.token.first() != -1) { if (destination.token.first() != -1) {
sendPacket(self, if (self->isLocalAddress(destination.getPrimaryAddress())) {
SerializeSource<Endpoint>(Endpoint(self->localAddresses, destination.token)), sendLocal(self, SerializeSource<Endpoint>(Endpoint(self->localAddresses, destination.token)), Endpoint(destination.addresses, WLTOKEN_ENDPOINT_NOT_FOUND));
Endpoint(destination.addresses, WLTOKEN_ENDPOINT_NOT_FOUND), false, true); } else {
Reference<Peer> peer = self->getPeer(destination.getPrimaryAddress());
sendPacket(self, peer, SerializeSource<Endpoint>(Endpoint(self->localAddresses, destination.token)), Endpoint(destination.addresses, WLTOKEN_ENDPOINT_NOT_FOUND), false);
}
} }
} }
@ -1013,7 +994,7 @@ Reference<Peer> TransportData::getPeer( NetworkAddress const& address, bool open
return Reference<Peer>(); return Reference<Peer>();
} }
Reference<Peer> newPeer = Reference<Peer>( new Peer(this, address) ); Reference<Peer> newPeer = Reference<Peer>( new Peer(this, address) );
newPeer->connect = Peer::connectionKeeper(newPeer); newPeer->connect = connectionKeeper(newPeer);
peers[address] = newPeer; peers[address] = newPeer;
return newPeer; return newPeer;
} }
@ -1113,7 +1094,7 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
.detail("Address", endpoint.getPrimaryAddress()) .detail("Address", endpoint.getPrimaryAddress())
.detail("Token", endpoint.token); .detail("Token", endpoint.token);
} }
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0) {
peer->resetPing.trigger(); peer->resetPing.trigger();
} }
} }
@ -1143,137 +1124,143 @@ void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageRece
ASSERT( endpoint.token == otoken ); ASSERT( endpoint.token == otoken );
} }
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ) { static void sendLocal( TransportData* self, ISerializeSource const& what, const Endpoint& destination ) {
if (self->isLocalAddress(destination.getPrimaryAddress())) { TEST(true); // "Loopback" delivery
TEST(true); // "Loopback" delivery // SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
Standalone<StringRef> copy; Standalone<StringRef> copy;
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) { if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
ObjectWriter wr(AssumeVersion(currentProtocolVersion)); ObjectWriter wr(AssumeVersion(currentProtocolVersion));
what.serializeObjectWriter(wr); what.serializeObjectWriter(wr);
copy = wr.toStringRef(); copy = wr.toStringRef();
} else { } else {
BinaryWriter wr( AssumeVersion(currentProtocolVersion) ); BinaryWriter wr( AssumeVersion(currentProtocolVersion) );
what.serializeBinaryWriter(wr); what.serializeBinaryWriter(wr);
copy = wr.toValue(); copy = wr.toValue();
} }
#if VALGRIND #if VALGRIND
VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size()); VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size());
#endif #endif
ASSERT(copy.size() > 0); ASSERT(copy.size() > 0);
deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false); deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false);
}
return (PacketID)nullptr; static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, ISerializeSource const& what, const Endpoint& destination, bool reliable ) {
} else { const bool checksumEnabled = !destination.getPrimaryAddress().isTLS();
const bool checksumEnabled = !destination.getPrimaryAddress().isTLS(); ++self->countPacketsGenerated;
++self->countPacketsGenerated;
Reference<Peer> peer = self->getPeer(destination.getPrimaryAddress(), openConnection); // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection
return nullptr;
}
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send bool firstUnsent = peer->unsent.empty();
if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection PacketBuffer* pb = peer->unsent.getWriteBuffer();
return (PacketID)nullptr; ReliablePacket* rp = reliable ? new ReliablePacket : 0;
int prevBytesWritten = pb->bytes_written;
PacketBuffer* checksumPb = pb;
PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion)); // SOMEDAY: Can we downgrade to talk to older peers?
// Reserve some space for packet length and checksum, write them after serializing data
SplitBuffer packetInfoBuffer;
uint32_t len, checksum = 0;
int packetInfoSize = sizeof(len);
if (checksumEnabled) {
packetInfoSize += sizeof(checksum);
}
wr.writeAhead(packetInfoSize , &packetInfoBuffer);
wr << destination.token;
what.serializePacketWriter(wr, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
pb = wr.finish();
len = wr.size() - packetInfoSize;
if (checksumEnabled) {
// Find the correct place to start calculating checksum
uint32_t checksumUnprocessedLength = len;
prevBytesWritten += packetInfoSize;
if (prevBytesWritten >= checksumPb->bytes_written) {
prevBytesWritten -= checksumPb->bytes_written;
checksumPb = checksumPb->nextPacketBuffer();
} }
bool firstUnsent = peer->unsent.empty(); // Checksum calculation
while (checksumUnprocessedLength > 0) {
PacketBuffer* pb = peer->unsent.getWriteBuffer(); uint32_t processLength =
ReliablePacket* rp = reliable ? new ReliablePacket : 0; std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
checksum = crc32c_append(checksum, checksumPb->data() + prevBytesWritten, processLength);
int prevBytesWritten = pb->bytes_written; checksumUnprocessedLength -= processLength;
PacketBuffer* checksumPb = pb; checksumPb = checksumPb->nextPacketBuffer();
prevBytesWritten = 0;
PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion)); // SOMEDAY: Can we downgrade to talk to older peers?
// Reserve some space for packet length and checksum, write them after serializing data
SplitBuffer packetInfoBuffer;
uint32_t len, checksum = 0;
int packetInfoSize = sizeof(len);
if (checksumEnabled) {
packetInfoSize += sizeof(checksum);
} }
}
wr.writeAhead(packetInfoSize , &packetInfoBuffer); // Write packet length and checksum into packet buffer
wr << destination.token; packetInfoBuffer.write(&len, sizeof(len));
what.serializePacketWriter(wr, FLOW_KNOBS->USE_OBJECT_SERIALIZER); if (checksumEnabled) {
pb = wr.finish(); packetInfoBuffer.write(&checksum, sizeof(checksum), sizeof(len));
len = wr.size() - packetInfoSize; }
if (checksumEnabled) { if (len > FLOW_KNOBS->PACKET_LIMIT) {
// Find the correct place to start calculating checksum TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
uint32_t checksumUnprocessedLength = len; // throw platform_error(); // FIXME: How to recover from this situation?
prevBytesWritten += packetInfoSize; }
if (prevBytesWritten >= checksumPb->bytes_written) { else if (len > FLOW_KNOBS->PACKET_WARNING) {
prevBytesWritten -= checksumPb->bytes_written; TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
checksumPb = checksumPb->nextPacketBuffer(); .suppressFor(1.0)
} .detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)
.detail("Token", destination.token)
.backtrace();
// Checksum calculation if(g_network->isSimulated())
while (checksumUnprocessedLength > 0) { self->warnAlwaysForLargePacket = false;
uint32_t processLength = }
std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
checksum = crc32c_append(checksum, checksumPb->data() + prevBytesWritten, processLength);
checksumUnprocessedLength -= processLength;
checksumPb = checksumPb->nextPacketBuffer();
prevBytesWritten = 0;
}
}
// Write packet length and checksum into packet buffer
packetInfoBuffer.write(&len, sizeof(len));
if (checksumEnabled) {
packetInfoBuffer.write(&checksum, sizeof(checksum), sizeof(len));
}
if (len > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len);
// throw platform_error(); // FIXME: How to recover from this situation?
}
else if (len > FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket")
.suppressFor(1.0)
.detail("ToPeer", destination.getPrimaryAddress())
.detail("Length", (int)len)
.detail("Token", destination.token)
.backtrace();
if(g_network->isSimulated())
self->warnAlwaysForLargePacket = false;
}
#if VALGRIND #if VALGRIND
SendBuffer *checkbuf = pb; SendBuffer *checkbuf = pb;
while (checkbuf) { while (checkbuf) {
int size = checkbuf->bytes_written; int size = checkbuf->bytes_written;
const uint8_t* data = checkbuf->data; const uint8_t* data = checkbuf->data;
VALGRIND_CHECK_MEM_IS_DEFINED(data, size); VALGRIND_CHECK_MEM_IS_DEFINED(data, size);
checkbuf = checkbuf -> next; checkbuf = checkbuf -> next;
} }
#endif #endif
peer->send(pb, rp, firstUnsent); peer->send(pb, rp, firstUnsent);
if (destination.token != WLTOKEN_PING_PACKET) { if (destination.token != WLTOKEN_PING_PACKET) {
peer->lastDataPacketSentTime = now(); peer->lastDataPacketSentTime = now();
}
return (PacketID)rp;
} }
return rp;
} }
PacketID FlowTransport::sendReliable( ISerializeSource const& what, const Endpoint& destination ) { ReliablePacket* FlowTransport::sendReliable( ISerializeSource const& what, const Endpoint& destination ) {
return sendPacket( self, what, destination, true, true ); if (self->isLocalAddress(destination.getPrimaryAddress())) {
sendLocal( self, what, destination );
return nullptr;
}
Reference<Peer> peer = self->getPeer(destination.getPrimaryAddress());
return sendPacket( self, peer, what, destination, true );
} }
void FlowTransport::cancelReliable( PacketID pid ) { void FlowTransport::cancelReliable( ReliablePacket* p ) {
ReliablePacket* p = (ReliablePacket*)pid;
if (p) p->remove(); if (p) p->remove();
// SOMEDAY: Call reliable.compact() if a lot of memory is wasted in PacketBuffers by formerly reliable packets mixed with a few reliable ones. Don't forget to delref the new PacketBuffers since they are unsent. // SOMEDAY: Call reliable.compact() if a lot of memory is wasted in PacketBuffers by formerly reliable packets mixed with a few reliable ones. Don't forget to delref the new PacketBuffers since they are unsent.
} }
void FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection ) { Reference<Peer> FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection ) {
sendPacket( self, what, destination, false, openConnection ); if (self->isLocalAddress(destination.getPrimaryAddress())) {
sendLocal( self, what, destination );
return Reference<Peer>();
}
Reference<Peer> peer = self->getPeer(destination.getPrimaryAddress(), openConnection);
sendPacket( self, peer, what, destination, false );
return peer;
} }
int FlowTransport::getEndpointCount() { int FlowTransport::getEndpointCount() {

View File

@ -26,6 +26,7 @@
#include "flow/genericactors.actor.h" #include "flow/genericactors.actor.h"
#include "flow/network.h" #include "flow/network.h"
#include "flow/FileIdentifier.h" #include "flow/FileIdentifier.h"
#include "flow/Net2Packet.h"
#pragma pack(push, 4) #pragma pack(push, 4)
class Endpoint { class Endpoint {
@ -103,7 +104,39 @@ public:
virtual bool isStream() const { return false; } virtual bool isStream() const { return false; }
}; };
typedef struct NetworkPacket* PacketID; struct TransportData;
struct Peer : public ReferenceCounted<Peer> {
TransportData* transport;
NetworkAddress destination;
UnsentPacketQueue unsent;
ReliablePacketList reliable;
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncTrigger resetPing;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
double reconnectionDelay;
int peerReferences;
bool incompatibleProtocolVersionNewer;
int64_t bytesReceived;
double lastDataPacketSentTime;
int outstandingReplies;
explicit Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {}
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);
void prependConnectPacket();
void discardUnreliablePackets();
void onIncomingConnection( Reference<Peer> self, Reference<IConnection> conn, Future<Void> reader );
};
class FlowTransport { class FlowTransport {
public: public:
@ -148,19 +181,19 @@ public:
// Sets endpoint to a new local endpoint (without changing its token) which delivers messages to the given receiver // Sets endpoint to a new local endpoint (without changing its token) which delivers messages to the given receiver
// Implementations may have limitations on when this function is called and what endpoint.token may be! // Implementations may have limitations on when this function is called and what endpoint.token may be!
PacketID sendReliable( ISerializeSource const& what, const Endpoint& destination ); ReliablePacket* sendReliable( ISerializeSource const& what, const Endpoint& destination );
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is // sendReliable will keep trying to deliver the data to the destination until cancelReliable is
// called. It will retry sending if the connection is closed or the failure manager reports // called. It will retry sending if the connection is closed or the failure manager reports
// the destination become available (edge triggered). // the destination become available (edge triggered).
void cancelReliable( PacketID ); void cancelReliable( ReliablePacket* );
// Makes PacketID "unreliable" (either the data or a connection close event will be delivered // Makes Packet "unreliable" (either the data or a connection close event will be delivered
// eventually). It can still be used safely to send a reply to a "reliable" request. // eventually). It can still be used safely to send a reply to a "reliable" request.
Reference<AsyncVar<bool>> getDegraded(); Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy. // This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy.
void sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection = true );// { cancelReliable(sendReliable(what,destination)); } Reference<Peer> sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection );// { cancelReliable(sendReliable(what,destination)); }
int getEndpointCount(); int getEndpointCount();
// for tracing only // for tracing only

View File

@ -265,7 +265,7 @@ public:
void send(const T& value) const { void send(const T& value) const {
if (queue->isRemoteEndpoint()) { if (queue->isRemoteEndpoint()) {
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint()); FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
} }
else else
queue->send(value); queue->send(value);
@ -317,9 +317,9 @@ public:
if (disc.isReady()) { if (disc.isReady()) {
return ErrorOr<REPLY_TYPE(X)>(request_maybe_delivered()); return ErrorOr<REPLY_TYPE(X)>(request_maybe_delivered());
} }
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(taskID)); Reference<Peer> peer = FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(taskID), true);
auto& p = getReplyPromise(value); auto& p = getReplyPromise(value);
return waitValueOrSignal(p.getFuture(), disc, getEndpoint(taskID), p); return waitValueOrSignal(p.getFuture(), disc, getEndpoint(taskID), p, peer);
} }
send(value); send(value);
auto& p = getReplyPromise(value); auto& p = getReplyPromise(value);
@ -333,9 +333,9 @@ public:
if (disc.isReady()) { if (disc.isReady()) {
return ErrorOr<REPLY_TYPE(X)>(request_maybe_delivered()); return ErrorOr<REPLY_TYPE(X)>(request_maybe_delivered());
} }
FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint()); Reference<Peer> peer = FlowTransport::transport().sendUnreliable(SerializeSource<T>(value), getEndpoint(), true);
auto& p = getReplyPromise(value); auto& p = getReplyPromise(value);
return waitValueOrSignal(p.getFuture(), disc, getEndpoint(), p); return waitValueOrSignal(p.getFuture(), disc, getEndpoint(), p, peer);
} }
else { else {
send(value); send(value);

View File

@ -152,9 +152,24 @@ ACTOR template <class T> Future<Void> incrementalBroadcast( Future<T> input, std
// Needed for the call to endpointNotFound() // Needed for the call to endpointNotFound()
#include "fdbrpc/FailureMonitor.h" #include "fdbrpc/FailureMonitor.h"
struct PeerHolder {
Reference<Peer> peer;
explicit PeerHolder(Reference<Peer> peer) : peer(peer) {
if(peer) {
peer->outstandingReplies++;
}
}
~PeerHolder() {
if(peer) {
peer->outstandingReplies--;
}
}
};
// Implements tryGetReply, getReplyUnlessFailedFor // Implements tryGetReply, getReplyUnlessFailedFor
ACTOR template <class X> ACTOR template <class X>
Future<ErrorOr<X>> waitValueOrSignal( Future<X> value, Future<Void> signal, Endpoint endpoint, ReplyPromise<X> holdme = ReplyPromise<X>() ) { Future<ErrorOr<X>> waitValueOrSignal( Future<X> value, Future<Void> signal, Endpoint endpoint, ReplyPromise<X> holdme = ReplyPromise<X>(), Reference<Peer> peer = Reference<Peer>() ) {
state PeerHolder holder = PeerHolder(peer);
loop { loop {
try { try {
choose { choose {
@ -185,7 +200,7 @@ Future<ErrorOr<X>> waitValueOrSignal( Future<X> value, Future<Void> signal, Endp
} }
ACTOR template <class T> ACTOR template <class T>
Future<T> sendCanceler( ReplyPromise<T> reply, PacketID send, Endpoint endpoint ) { Future<T> sendCanceler( ReplyPromise<T> reply, ReliablePacket* send, Endpoint endpoint ) {
try { try {
T t = wait( reply.getFuture() ); T t = wait( reply.getFuture() );
FlowTransport::transport().cancelReliable(send); FlowTransport::transport().cancelReliable(send);

View File

@ -35,7 +35,7 @@ void networkSender(Future<T> input, Endpoint endpoint) {
try { try {
T value = wait(input); T value = wait(input);
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) { if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint); FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
} else { } else {
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<T>(true, value), endpoint, false); FlowTransport::transport().sendUnreliable(SerializeBoolAnd<T>(true, value), endpoint, false);
} }
@ -43,7 +43,7 @@ void networkSender(Future<T> input, Endpoint endpoint) {
// if (err.code() == error_code_broken_promise) return; // if (err.code() == error_code_broken_promise) return;
ASSERT(err.code() != error_code_actor_cancelled); ASSERT(err.code() != error_code_actor_cancelled);
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) { if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint); FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
} else { } else {
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<Error>(false, err), endpoint, false); FlowTransport::transport().sendUnreliable(SerializeBoolAnd<Error>(false, err), endpoint, false);
} }

View File

@ -70,7 +70,7 @@ struct WorkloadRequest {
VectorRef< VectorRef<KeyValueRef> > options; VectorRef< VectorRef<KeyValueRef> > options;
int clientId; // the "id" of the client recieving the request (0 indexed) int clientId; // the "id" of the client receiving the request (0 indexed)
int clientCount; // the total number of test clients participating in the workload int clientCount; // the total number of test clients participating in the workload
ReplyPromise< struct WorkloadInterface > reply; ReplyPromise< struct WorkloadInterface > reply;

View File

@ -3322,7 +3322,7 @@ ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest r
when( StorageMetrics c = waitNext( change.getFuture() ) ) { when( StorageMetrics c = waitNext( change.getFuture() ) ) {
metrics += c; metrics += c;
// SOMEDAY: validation! The changes here are possibly partial changes (we recieve multiple messages per // SOMEDAY: validation! The changes here are possibly partial changes (we receive multiple messages per
// update to our requested range). This means that the validation would have to occur after all // update to our requested range). This means that the validation would have to occur after all
// the messages for one clear or set have been dispatched. // the messages for one clear or set have been dispatched.
@ -3510,7 +3510,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) { when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
if( req.debugID.present() ) if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.recieved"); //.detail("TaskID", g_network->getCurrentTask()); g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key)) if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
req.reply.send(GetValueReply()); req.reply.send(GetValueReply());