diff --git a/fdbclient/ClientWorkerInterface.h b/fdbclient/ClientWorkerInterface.h index 15c16465ef..5e80ccb83f 100644 --- a/fdbclient/ClientWorkerInterface.h +++ b/fdbclient/ClientWorkerInterface.h @@ -36,7 +36,7 @@ struct ClientWorkerInterface { bool operator == (ClientWorkerInterface const& r) const { return id() == r.id(); } bool operator != (ClientWorkerInterface const& r) const { return id() != r.id(); } UID id() const { return reboot.getEndpoint().token; } - NetworkAddress address() const { return reboot.getEndpoint().address[0]; } + NetworkAddress address() const { return reboot.getEndpoint().getPrimaryAddress(); } template void serialize( Ar& ar ) { diff --git a/fdbclient/ClusterInterface.h b/fdbclient/ClusterInterface.h index b9c2b463f8..1c779501fc 100755 --- a/fdbclient/ClusterInterface.h +++ b/fdbclient/ClusterInterface.h @@ -39,7 +39,7 @@ struct ClusterInterface { bool operator == (ClusterInterface const& r) const { return id() == r.id(); } bool operator != (ClusterInterface const& r) const { return id() != r.id(); } UID id() const { return openDatabase.getEndpoint().token; } - NetworkAddress address() const { return openDatabase.getEndpoint().address[0]; } + NetworkAddress address() const { return openDatabase.getEndpoint().getPrimaryAddress(); } void initEndpoints() { openDatabase.getEndpoint( TaskClusterController ); diff --git a/fdbclient/FailureMonitorClient.actor.cpp b/fdbclient/FailureMonitorClient.actor.cpp index bb16866a76..9532ff4072 100644 --- a/fdbclient/FailureMonitorClient.actor.cpp +++ b/fdbclient/FailureMonitorClient.actor.cpp @@ -44,10 +44,10 @@ ACTOR Future failureMonitorClientLoop( state double before = now(); state double waitfor = 0; - monitor->setStatus(controller.failureMonitoring.getEndpoint().address[0], FailureStatus(false)); - fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address[0] ); + monitor->setStatus(controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false)); + fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); - //The cluster controller's address (controller.failureMonitoring.getEndpoint().address[0]) is treated specially because we can declare that it is down independently + //The cluster controller's address (controller.failureMonitoring.getEndpoint().getPrimaryAddress()) is treated specially because we can declare that it is down independently //of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state try { @@ -59,7 +59,7 @@ ACTOR Future failureMonitorClientLoop( requestTimeout = Never(); if (reply.allOthersFailed) { // Reset all systems *not* mentioned in the reply to the default (failed) state - fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address[0] ); + fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); std::set changedAddresses; for(int c=0; c failureMonitorClientLoop( if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() ) TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id()); - monitor->setStatus( controller.failureMonitoring.getEndpoint().address[0], FailureStatus(false) ); - fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().address[0] ); + monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(false) ); + fmState->knownAddrs.insert( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); //if (version != reply.failureInformationVersion) // printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed); @@ -88,7 +88,7 @@ ACTOR Future failureMonitorClientLoop( fmState->knownAddrs.insert( reply.changes[c].address ); else fmState->knownAddrs.erase( reply.changes[c].address ); - ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().address[0] || !reply.changes[c].status.failed ); + ASSERT( reply.changes[c].address != controller.failureMonitoring.getEndpoint().getPrimaryAddress() || !reply.changes[c].status.failed ); } before = now(); waitfor = reply.clientRequestIntervalMS * .001; @@ -98,8 +98,8 @@ ACTOR Future failureMonitorClientLoop( g_network->setCurrentTask(TaskDefaultDelay); requestTimeout = Never(); TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id()); - monitor->setStatus( controller.failureMonitoring.getEndpoint().address[0], FailureStatus(true) ); - fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().address[0] ); + monitor->setStatus( controller.failureMonitoring.getEndpoint().getPrimaryAddress(), FailureStatus(true) ); + fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().getPrimaryAddress() ); } when( wait( nextRequest ) ) { g_network->setCurrentTask(TaskDefaultDelay); diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index cf5d21b4de..52a56d7136 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -46,7 +46,7 @@ struct MasterProxyInterface { std::string toString() const { return id().shortString(); } bool operator == (MasterProxyInterface const& r) const { return id() == r.id(); } bool operator != (MasterProxyInterface const& r) const { return id() != r.id(); } - NetworkAddress address() const { return commit.getEndpoint().address[0]; } + NetworkAddress address() const { return commit.getEndpoint().getPrimaryAddress(); } template void serialize(Archive& ar) { diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index a3ac3cea2d..b10cd189cd 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -312,7 +312,7 @@ ACTOR Future monitorNominee( Key key, ClientLeaderRegInterface coord, Asyn state Optional li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) ); wait( Future(Void()) ); // Make sure we weren't cancelled - TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().address[0]).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation); + TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation); if (li != *info) { *info = li; diff --git a/fdbclient/StatusClient.actor.cpp b/fdbclient/StatusClient.actor.cpp index c4aeea3e1c..89ee605230 100644 --- a/fdbclient/StatusClient.actor.cpp +++ b/fdbclient/StatusClient.actor.cpp @@ -300,7 +300,7 @@ ACTOR Future> clientCoordinatorsStatusFetcher(ReferencerandomUniqueID() ) {} - NetworkAddress address() const { return getVersion.getEndpoint().address[0]; } + NetworkAddress address() const { return getVersion.getEndpoint().getPrimaryAddress(); } UID id() const { return uniqueID; } std::string toString() const { return id().shortString(); } template diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp index ea2e9ffdf6..6274a03422 100644 --- a/fdbrpc/FailureMonitor.actor.cpp +++ b/fdbrpc/FailureMonitor.actor.cpp @@ -92,7 +92,7 @@ void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStat void SimpleFailureMonitor::endpointNotFound( Endpoint const& endpoint ) { // SOMEDAY: Expiration (this "leaks" memory) - TraceEvent("EndpointNotFound").suppressFor(1.0).detail("Address", endpoint.address[0]).detail("Token", endpoint.token); + TraceEvent("EndpointNotFound").suppressFor(1.0).detail("Address", endpoint.getPrimaryAddress()).detail("Token", endpoint.token); endpointKnownFailed.set( endpoint, true ); } @@ -103,9 +103,9 @@ void SimpleFailureMonitor::notifyDisconnect( NetworkAddress const& address ) { Future SimpleFailureMonitor::onDisconnectOrFailure( Endpoint const& endpoint ) { // If the endpoint or address is already failed, return right away - auto i = addressStatus.find(endpoint.address[0]); + auto i = addressStatus.find(endpoint.getPrimaryAddress()); if (i == addressStatus.end() || i->value.isFailed() || endpointKnownFailed.get(endpoint)) { - TraceEvent("AlreadyDisconnected").detail("Addr", endpoint.address[0]).detail("Tok", endpoint.token); + TraceEvent("AlreadyDisconnected").detail("Addr", endpoint.getPrimaryAddress()).detail("Tok", endpoint.token); return Void(); } @@ -131,17 +131,17 @@ FailureStatus SimpleFailureMonitor::getState( Endpoint const& endpoint ) { if (endpointKnownFailed.get(endpoint)) return FailureStatus(true); else { - auto a = addressStatus.find(endpoint.address[0]); + auto a = addressStatus.find(endpoint.getPrimaryAddress()); if (a == addressStatus.end()) return FailureStatus(); else return a->value; - //printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.address[0].toString(), a.failed ? "FAILED" : "OK", this); + //printf("%s.getState(%s) = %s %p\n", g_network->getLocalAddress().toString(), endpoint.getPrimaryAddress().toString(), a.failed ? "FAILED" : "OK", this); } } bool SimpleFailureMonitor::onlyEndpointFailed( Endpoint const& endpoint ) { if(!endpointKnownFailed.get(endpoint)) return false; - auto a = addressStatus.find(endpoint.address[0]); + auto a = addressStatus.find(endpoint.getPrimaryAddress()); if (a == addressStatus.end()) return true; else return !a->value.failed; } diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index 0681efb78b..1f5688ce35 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -89,7 +89,7 @@ public: // The next time the known status for the endpoint changes, returns the new status. virtual Future onStateChanged( Endpoint const& endpoint ) = 0; - // Returns when onFailed(endpoint) || transport().onDisconnect( endpoint.address[0] ), but more efficiently + // Returns when onFailed(endpoint) || transport().onDisconnect( endpoint.getPrimaryAddress() ), but more efficiently virtual Future onDisconnectOrFailure( Endpoint const& endpoint ) = 0; // Returns true if the endpoint is failed but the address of the endpoint is not failed. diff --git a/fdbrpc/FlowTests.actor.cpp b/fdbrpc/FlowTests.actor.cpp index 67a5e03aac..14f445cc11 100644 --- a/fdbrpc/FlowTests.actor.cpp +++ b/fdbrpc/FlowTests.actor.cpp @@ -309,7 +309,7 @@ TEST_CASE("/flow/flow/networked futures") BinaryWriter wr(IncludeVersion()); wr << locInt; - ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().address[0] == FlowTransport::transport().getLocalAddress()); + ASSERT(locInt.getEndpoint().isValid() && locInt.getEndpoint().isLocal() && locInt.getEndpoint().getPrimaryAddress() == FlowTransport::transport().getLocalAddress()); BinaryReader rd(wr.toStringRef(), IncludeVersion()); RequestStream remoteInt; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 692012f6a8..0bb7def651 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -162,10 +162,20 @@ public: } struct Peer* getPeer( NetworkAddress const& address, bool openConnection = true ); - - NetworkAddress localAddress; + + // TODO (Vishesh) : Now make it make sense! + const NetworkAddress& getFirstLocalAddress() const { + return localListeners.begin()->first; + } + + // Returns a vector of NetworkAddresses we are listening to. + NetworkAddressList getLocalAddresses() const; + + // Returns true if given network address 'address' is one of the address we are listening on. + bool isLocalAddress(const NetworkAddress& address) const; + + std::map> localListeners; std::map peers; - Future listen; bool warnAlwaysForLargePacket; // These declarations must be in exactly this order @@ -248,13 +258,14 @@ struct Peer : NonCopyable { void prependConnectPacket() { // Send the ConnectPacket expected at the beginning of a new connection ConnectPacket pkt; - if (transport->localAddress.isTLS() != destination.isTLS()) { + const NetworkAddress& localAddress = transport->getFirstLocalAddress(); + if (localAddress.isTLS() != destination.isTLS()) { pkt.canonicalRemotePort = 0; // a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it pkt.canonicalRemoteIp = 0; } else { - pkt.canonicalRemotePort = transport->localAddress.port; - pkt.canonicalRemoteIp = transport->localAddress.ip; + pkt.canonicalRemotePort = localAddress.port; + pkt.canonicalRemoteIp = localAddress.ip; } pkt.connectPacketLength = sizeof(pkt)-sizeof(pkt.connectPacketLength); pkt.protocolVersion = currentProtocolVersion; @@ -282,7 +293,7 @@ struct Peer : NonCopyable { // In case two processes are trying to connect to each other simultaneously, the process with the larger canonical NetworkAddress // gets to keep its outgoing connection. if ( !destination.isPublic() && !outgoingConnectionIdle ) throw address_in_use(); - if ( !destination.isPublic() || outgoingConnectionIdle || destination > transport->localAddress ) { + if ( !destination.isPublic() || outgoingConnectionIdle || destination > transport->getFirstLocalAddress() ) { //TODO (Vishesh): Last condition. // Keep the new connection TraceEvent("IncomingConnection", conn->getDebugID()) .suppressFor(1.0) @@ -463,20 +474,20 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade auto receiver = self->endpoints.get(destination.token); if (receiver) { try { - g_currentDeliveryPeerAddress = destination.address[0]; + g_currentDeliveryPeerAddress = destination.getPrimaryAddress(); receiver->receive( reader ); g_currentDeliveryPeerAddress = NetworkAddress(); } catch (Error& e) { g_currentDeliveryPeerAddress = NetworkAddress(); - TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.address[0]); + TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.getPrimaryAddress()); throw; } } else if (destination.token.first() & TOKEN_STREAM_FLAG) { // We don't have the (stream) endpoint 'token', notify the remote machine if (destination.token.first() != -1) - sendPacket( self, - SerializeSource( Endpoint( {self->localAddress}, destination.token ) ), - Endpoint( {destination.address[0]}, WLTOKEN_ENDPOINT_NOT_FOUND), + sendPacket( self, + SerializeSource( Endpoint( self->getLocalAddresses(), destination.token ) ), + Endpoint( destination.addresses, WLTOKEN_ENDPOINT_NOT_FOUND), false, true ); } @@ -491,7 +502,7 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin, uint8_t* p = unprocessed_begin; bool checksumEnabled = true; - if (transport->localAddress.isTLS() || peerAddress.isTLS()) { + if (transport->getFirstLocalAddress().isTLS() || peerAddress.isTLS()) { checksumEnabled = false; } @@ -599,7 +610,7 @@ ACTOR static Future connectionReader( state uint64_t peerProtocolVersion = 0; peerAddress = conn->getPeerAddress(); - if (peer == nullptr) { + if (peer == nullptr) { ASSERT( !peerAddress.isPublic() ); } try { @@ -623,23 +634,23 @@ ACTOR static Future connectionReader( if (!readBytes) break; state bool readWillBlock = readBytes != readAllBytes; unprocessed_end += readBytes; - + if (expectConnectPacket && unprocessed_end-unprocessed_begin>=CONNECT_PACKET_V0_SIZE) { // At the beginning of a connection, we expect to receive a packet containing the protocol version and the listening port of the remote process ConnectPacket* p = (ConnectPacket*)unprocessed_begin; - + uint64_t connectionId = 0; int32_t connectPacketSize = p->minimumSize(); if ( unprocessed_end-unprocessed_begin >= connectPacketSize ) { if(p->protocolVersion >= 0x0FDB00A444020001) { connectionId = p->connectionId; } - + if( (p->protocolVersion & compatibleProtocolVersionMask) != (currentProtocolVersion & compatibleProtocolVersionMask) ) { incompatibleProtocolVersionNewer = p->protocolVersion > currentProtocolVersion; NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress(); if(connectionId != 1) addr.port = 0; - + if(!transport->multiVersionConnections.count(connectionId)) { if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) { TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID()) @@ -688,7 +699,7 @@ ACTOR static Future connectionReader( peer->transport->numIncompatibleConnections++; incompatiblePeerCounted = true; } - ASSERT( p->canonicalRemotePort == peerAddress.port ); + // ASSERT( p->canonicalRemotePort == peerAddress.port ); } else { if (p->canonicalRemotePort) { peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() ); @@ -783,6 +794,18 @@ Peer* TransportData::getPeer( NetworkAddress const& address, bool openConnection return newPeer; } +NetworkAddressList TransportData::getLocalAddresses() const { + NetworkAddressList addresses; + for (const auto& addr_listener : localListeners) { + addresses.push_back(addr_listener.first); + } + return addresses; +} + +bool TransportData::isLocalAddress(const NetworkAddress& address) const { + return localListeners.find(address) != localListeners.end(); +} + ACTOR static Future multiVersionCleanupWorker( TransportData* self ) { loop { wait(delay(FLOW_KNOBS->CONNECTION_CLEANUP_DELAY)); @@ -812,9 +835,16 @@ FlowTransport::~FlowTransport() { delete self; } void FlowTransport::initMetrics() { self->initMetrics(); } -NetworkAddress FlowTransport::getLocalAddress() { return self->localAddress; } +NetworkAddress FlowTransport::getLocalAddress() const { + //TODO (Vishesh) : Consider all addresses. + const NetworkAddressList& addresses = self->getLocalAddresses(); + if (addresses.empty()) { + return NetworkAddress(); + } + return self->getLocalAddresses()[0]; +} -std::map>* FlowTransport::getIncompatiblePeers() { +std::map>* FlowTransport::getIncompatiblePeers() { for(auto it = self->incompatiblePeers.begin(); it != self->incompatiblePeers.end();) { if( self->multiVersionConnections.count(it->second.first) ) { it = self->incompatiblePeers.erase(it); @@ -822,44 +852,53 @@ std::map>* FlowTransport::getIncompa it++; } } - return &self->incompatiblePeers; + return &self->incompatiblePeers; } Future FlowTransport::bind( NetworkAddress publicAddress, NetworkAddress listenAddress ) { ASSERT( publicAddress.isPublic() ); - self->localAddress = publicAddress; + ASSERT( self->localListeners.find(publicAddress) == self->localListeners.end() ); TraceEvent("Binding").detail("PublicAddress", publicAddress).detail("ListenAddress", listenAddress); - self->listen = listen( self, listenAddress ); - return self->listen; + + Future listenF = listen( self, listenAddress ); + self->localListeners[publicAddress] = listenF; + return listenF; } void FlowTransport::loadedEndpoint( Endpoint& endpoint ) { - if (endpoint.address.size() > 0 && endpoint.address[0].isValid()) return; + if (endpoint.getPrimaryAddress().isValid()) return; ASSERT( !(endpoint.token.first() & TOKEN_STREAM_FLAG) ); // Only reply promises are supposed to be unaddressed ASSERT( g_currentDeliveryPeerAddress.isValid() ); - endpoint.address = {g_currentDeliveryPeerAddress}; + endpoint.addresses = {g_currentDeliveryPeerAddress}; } void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { - if (!receiver->isStream() || !endpoint.address[0].isValid()) return; - Peer* peer = self->getPeer(endpoint.address[0]); - if(peer->peerReferences == -1) { - peer->peerReferences = 1; - } else { - peer->peerReferences++; + if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return; + for (const NetworkAddress& address : endpoint.addresses) { + Peer* peer = self->getPeer(address); + if(peer->peerReferences == -1) { + peer->peerReferences = 1; + } else { + peer->peerReferences++; + } } } void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { - if (!receiver->isStream() || !endpoint.address[0].isValid()) return; - Peer* peer = self->getPeer(endpoint.address[0], false); - if(peer) { - peer->peerReferences--; - if(peer->peerReferences < 0) { - TraceEvent(SevError, "InvalidPeerReferences").detail("References", peer->peerReferences).detail("Address", endpoint.address[0]).detail("Token", endpoint.token); - } - if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { - peer->incompatibleDataRead.trigger(); + if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return; + for (const NetworkAddress& address : endpoint.addresses) { + Peer* peer = self->getPeer(address, false); + if(peer) { + peer->peerReferences--; + if(peer->peerReferences < 0) { + TraceEvent(SevError, "InvalidPeerReferences") + .detail("References", peer->peerReferences) + .detail("Address", address) + .detail("Token", endpoint.token); + } + if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { + peer->incompatibleDataRead.trigger(); + } } } } @@ -867,10 +906,10 @@ void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessag void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) { endpoint.token = g_random->randomUniqueID(); if (receiver->isStream()) { - endpoint.address = {getLocalAddress()}; + endpoint.addresses = self->getLocalAddresses(); endpoint.token = UID( endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second() ); } else { - endpoint.address = {NetworkAddress()}; + endpoint.addresses = {NetworkAddress()}; endpoint.token = UID( endpoint.token.first() & ~TOKEN_STREAM_FLAG, endpoint.token.second() ); } self->endpoints.insert( receiver, endpoint.token, taskID ); @@ -881,7 +920,7 @@ void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageRece } void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) { - endpoint.address = {getLocalAddress()}; + endpoint.addresses = self->getLocalAddresses(); ASSERT( ((endpoint.token.first() & TOKEN_STREAM_FLAG)!=0) == receiver->isStream() ); Endpoint::Token otoken = endpoint.token; self->endpoints.insert( receiver, endpoint.token, taskID ); @@ -889,7 +928,7 @@ void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageRece } static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ) { - if (destination.address[0] == self->localAddress) { + if (self->isLocalAddress(destination.getPrimaryAddress())) { TEST(true); // "Loopback" delivery // SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow? @@ -905,16 +944,16 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c return (PacketID)NULL; } else { bool checksumEnabled = true; - if (self->localAddress.isTLS() || destination.address[0].isTLS()) { + if (self->getFirstLocalAddress().isTLS() || destination.getPrimaryAddress().isTLS()) { checksumEnabled = false; } ++self->countPacketsGenerated; - Peer* peer = self->getPeer(destination.address[0], openConnection); + Peer* peer = self->getPeer(destination.getRandomAddress(), openConnection); // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send - if (!peer || (peer->outgoingConnectionIdle && !destination.address[0].isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) { + if (!peer || (peer->outgoingConnectionIdle && !destination.getPrimaryAddress().isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) { TEST(true); // Can't send to private address without a compatible open connection return (PacketID)NULL; } @@ -970,13 +1009,13 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c } if (len > FLOW_KNOBS->PACKET_LIMIT) { - TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.address[0]).detail("Length", (int)len); + TraceEvent(SevError, "Net2_PacketLimitExceeded").detail("ToPeer", destination.getPrimaryAddress()).detail("Length", (int)len); // throw platform_error(); // FIXME: How to recover from this situation? } else if (len > FLOW_KNOBS->PACKET_WARNING) { TraceEvent(self->warnAlwaysForLargePacket ? SevWarnAlways : SevWarn, "Net2_LargePacket") .suppressFor(1.0) - .detail("ToPeer", destination.address[0]) + .detail("ToPeer", destination.getPrimaryAddress()) .detail("Length", (int)len) .detail("Token", destination.token) .backtrace(); diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 43ecaefda9..02226a48c3 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -31,17 +31,29 @@ public: // Endpoint represents a particular service (e.g. a serialized Promise or PromiseStream) // An endpoint is either "local" (used for receiving data) or "remote" (used for sending data) typedef UID Token; - NetworkAddressList address; + NetworkAddressList addresses; Token token; - Endpoint() {} - Endpoint(const NetworkAddressList& addresses, Token token) : address(addresses), token(token) {} + Endpoint() : addresses({NetworkAddress()}) {} + Endpoint(const NetworkAddressList& addresses, Token token) : addresses(addresses), token(token) { + ASSERT(addresses.size() > 0); + } bool isValid() const { return token.isValid(); } bool isLocal() const; + // Return the primary network address, which is the first network address among + // all addresses this endpoint listens to. + const NetworkAddress& getPrimaryAddress() const { + return addresses[0]; + } + + const NetworkAddress& getRandomAddress() const { + return addresses[g_random->randomChoice(addresses)]; + } + bool operator == (Endpoint const& r) const { - return address == r.address && token == r.token; + return addresses == r.addresses && token == r.token; } bool operator != (Endpoint const& r) const { return !(*this == r); @@ -49,8 +61,8 @@ public: //TODO: (Vishesh) Figure out what to do for vector of addresses this. bool operator < (Endpoint const& r) const { - const NetworkAddress& left = address.empty() ? NetworkAddress() : address[0]; - const NetworkAddress& right = r.address.empty() ? NetworkAddress() : r.address[0]; + const NetworkAddress& left = addresses[0]; + const NetworkAddress& right = r.addresses[0]; if (left != right) return left < right; else @@ -60,10 +72,9 @@ public: template void serialize(Ar& ar) { if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061020002LL) { - address.emplace_back(); - ar & address[0] & token; + ar & addresses[0] & token; } else { - ar & address & token; + ar & addresses & token; } } }; @@ -97,8 +108,8 @@ public: // Starts a server listening on the given listenAddress, and sets publicAddress to be the public // address of this server. Returns only errors. - NetworkAddress getLocalAddress(); - // Returns the NetworkAddress that would be assigned by addEndpoint (the public address) + NetworkAddress getLocalAddress() const; + // Returns first local NetworkAddress. std::map>* getIncompatiblePeers(); // Returns the same of all peers that have attempted to connect, but have incompatible protocol versions @@ -151,7 +162,7 @@ private: }; inline bool Endpoint::isLocal() const { - return address[0] == FlowTransport::transport().getLocalAddress(); + return addresses[0] == FlowTransport::transport().getLocalAddress(); } #endif diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 5b3438db88..27a9d7f3e8 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -30,9 +30,6 @@ struct FlowReceiver : private NetworkMessageReceiver { // Common endpoint code for NetSAV<> and NetNotifiedQueue<> - Endpoint endpoint; - bool m_isLocalEndpoint; - FlowReceiver() : m_isLocalEndpoint(false) {} FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) { FlowTransport::transport().addPeerReference(endpoint, this); @@ -64,6 +61,10 @@ struct FlowReceiver : private NetworkMessageReceiver { endpoint.token = token; FlowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID); } + +private: + Endpoint endpoint; + bool m_isLocalEndpoint; }; template @@ -151,7 +152,7 @@ template void save(Ar& ar, const ReplyPromise& value) { auto const& ep = value.getEndpoint(); ar << ep; - ASSERT(!ep.address[0].isValid() || ep.address[0].isPublic()); // No re-serializing non-public addresses (the reply connection won't be available to any other process) + ASSERT(!ep.getPrimaryAddress().isValid() || ep.getPrimaryAddress().isPublic()); // No re-serializing non-public addresses (the reply connection won't be available to any other process) } template @@ -357,7 +358,7 @@ template void save(Ar& ar, const RequestStream& value) { auto const& ep = value.getEndpoint(); ar << ep; - UNSTOPPABLE_ASSERT(ep.address[0].isValid()); // No serializing PromiseStreams on a client with no public address + UNSTOPPABLE_ASSERT(ep.getPrimaryAddress().isValid()); // No serializing PromiseStreams on a client with no public address } template diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 91cc335082..be8a0b310e 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -139,7 +139,7 @@ public: return m; } - ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.address[0]); } + ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.getPrimaryAddress()); } ProcessInfo* getCurrentProcess() { return currentProcess; } virtual Future onProcess( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0; virtual Future onMachine( ISimulator::ProcessInfo *process, int taskID = -1 ) = 0; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index fa4932a706..63c002051a 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1156,7 +1156,7 @@ ACTOR Future clusterGetServerInfo( ReplyPromise reply) { state UID issueID; - addIssue( db->workersWithIssues, reply.getEndpoint().address[0], issues, issueID ); + addIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); for(auto it : incompatiblePeers) { db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL; } @@ -1168,7 +1168,7 @@ ACTOR Future clusterGetServerInfo( } } - removeIssue( db->workersWithIssues, reply.getEndpoint().address[0], issues, issueID ); + removeIssue( db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); reply.send( db->serverInfo->get() ); return Void(); @@ -1184,13 +1184,13 @@ ACTOR Future clusterOpenDatabase( { // NOTE: The client no longer expects this function to return errors state UID issueID; - addIssue( db->clientsWithIssues, reply.getEndpoint().address[0], issues, issueID ); + addIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); if(supportedVersions.size() > 0) { - db->clientVersionMap[reply.getEndpoint().address[0]] = supportedVersions; + db->clientVersionMap[reply.getEndpoint().getPrimaryAddress()] = supportedVersions; } - db->traceLogGroupMap[reply.getEndpoint().address[0]] = traceLogGroup.toString(); + db->traceLogGroupMap[reply.getEndpoint().getPrimaryAddress()] = traceLogGroup.toString(); while (db->clientInfo->get().id == knownClientInfoID) { choose { @@ -1199,9 +1199,9 @@ ACTOR Future clusterOpenDatabase( } } - removeIssue( db->clientsWithIssues, reply.getEndpoint().address[0], issues, issueID ); - db->clientVersionMap.erase(reply.getEndpoint().address[0]); - db->traceLogGroupMap.erase(reply.getEndpoint().address[0]); + removeIssue( db->clientsWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID ); + db->clientVersionMap.erase(reply.getEndpoint().getPrimaryAddress()); + db->traceLogGroupMap.erase(reply.getEndpoint().getPrimaryAddress()); reply.send( db->clientInfo->get() ); return Void(); @@ -1371,7 +1371,7 @@ ACTOR Future failureDetectionServer( UID uniqueID, ClusterControllerData:: when ( FailureMonitoringRequest req = waitNext( requests ) ) { if ( req.senderStatus.present() ) { // Update the status of requester, if necessary - auto& address = req.reply.getEndpoint().address[0]; + auto& address = req.reply.getEndpoint().getPrimaryAddress(); auto& stat = currentStatus[ address ]; auto& newStat = req.senderStatus.get(); diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 851eae177f..6076ab0c31 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -128,7 +128,7 @@ ACTOR Future localGenerationReg( GenerationRegInterface interf, OnDemandSt // SOMEDAY: concurrent access to different keys? loop choose { when ( GenerationRegReadRequest _req = waitNext( interf.read.getFuture() ) ) { - TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().address[0]).detail("K", printable(_req.key)); + TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().getPrimaryAddress()).detail("K", printable(_req.key)); state GenerationRegReadRequest req = _req; Optional rawV = wait( store->readValue( req.key ) ); v = rawV.present() ? BinaryReader::fromStringRef( rawV.get(), IncludeVersion() ) : GenerationRegVal(); @@ -149,11 +149,11 @@ ACTOR Future localGenerationReg( GenerationRegInterface interf, OnDemandSt v.val = wrq.kv.value; store->set( KeyValueRef( wrq.kv.key, BinaryWriter::toValue(v, IncludeVersion()) ) ); wait(store->commit()); - TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().address[0]).detail("Key", printable(wrq.kv.key)) + TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", printable(wrq.kv.key)) .detail("ReqGen", wrq.gen.generation).detail("Returning", v.writeGen.generation); wrq.reply.send( v.writeGen ); } else { - TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().address[0]).detail("Key", printable(wrq.kv.key)) + TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", printable(wrq.kv.key)) .detail("ReqGen", wrq.gen.generation).detail("ReadGen", v.readGen.generation).detail("WriteGen", v.writeGen.generation); wrq.reply.send( std::max( v.readGen, v.writeGen ) ); } @@ -449,7 +449,7 @@ ACTOR Future coordinationServer(std::string dataFolder) { state GenerationRegInterface myInterface( g_network ); state OnDemandStore store( dataFolder, myID ); - TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().address[0]).detail("Folder", dataFolder); + TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder); try { wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() ); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 3f8f2ee693..6408b82801 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1814,7 +1814,7 @@ struct DDTeamCollection : ReferenceCounted { } allServers.push_back( newServer.id() ); - TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("Address", newServer.waitFailure.getEndpoint().address[0]); + TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("Address", newServer.waitFailure.getEndpoint().getPrimaryAddress()); auto &r = server_info[newServer.id()] = Reference( new TCServerInfo( newServer, processClass, includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end(), storageServerSet ) ); // Establish the relation between server and machine @@ -2606,7 +2606,7 @@ ACTOR Future storageServerTracker( return Void(); } when( std::pair newInterface = wait( interfaceChanged ) ) { - bool restartRecruiting = newInterface.first.waitFailure.getEndpoint().address[0] != server->lastKnownInterface.waitFailure.getEndpoint().address[0]; + bool restartRecruiting = newInterface.first.waitFailure.getEndpoint().getPrimaryAddress() != server->lastKnownInterface.waitFailure.getEndpoint().getPrimaryAddress(); bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality; bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() != newInterface.first.locality.zoneId().get(); diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 18cf7deab3..473498a445 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -199,11 +199,11 @@ ACTOR Future tryBecomeLeaderInternal( ServerCoordinators coordinators, Val when ( wait( delay(SERVER_KNOBS->POLLING_FREQUENCY) ) ) { for(int i = 0; i < coordinators.leaderElectionServers.size(); ++i) { if(true_heartbeats[i].isReady()) - TraceEvent("LeaderTrueHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]); + TraceEvent("LeaderTrueHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress()); else if(false_heartbeats[i].isReady()) - TraceEvent("LeaderFalseHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]); + TraceEvent("LeaderFalseHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress()); else - TraceEvent("LeaderNoHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().address[0]); + TraceEvent("LeaderNoHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress()); } TraceEvent("ReleasingLeadership", myInfo.changeID); break; diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 7813da240f..67d31fcc09 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -284,7 +284,7 @@ Version poppedVersion( LogRouterData* self, Tag tag) { ACTOR Future logRouterPeekMessages( LogRouterData* self, TLogPeekRequest req ) { state BinaryWriter messages(Unversioned()); - //TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().address[0]).detail("Ver", self->version.get()).detail("Begin", req.begin); + //TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().getPrimaryAddress()).detail("Ver", self->version.get()).detail("Begin", req.begin); if( req.returnIfBlocked && self->version.get() < req.begin ) { //TraceEvent("LogRouterPeek2", self->dbgid); req.reply.sendError(end_of_stream()); diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index e761f94d6c..54a55d9ede 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -37,7 +37,7 @@ struct MasterInterface { RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators; RequestStream< struct GetCommitVersionRequest > getCommitVersion; - NetworkAddress address() const { return changeCoordinators.getEndpoint().address[0]; } + NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); } UID id() const { return changeCoordinators.getEndpoint().token; } template diff --git a/fdbserver/OldTLogServer.actor.cpp b/fdbserver/OldTLogServer.actor.cpp index 675a1e063d..d687cd0983 100644 --- a/fdbserver/OldTLogServer.actor.cpp +++ b/fdbserver/OldTLogServer.actor.cpp @@ -921,7 +921,7 @@ namespace oldTLog { persistTagMessagesKey(logData->logId, oldTag, req.begin), persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto &kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -945,7 +945,7 @@ namespace oldTLog { messages.serializeBytes( messages2.toStringRef() ); } else { peekMessagesFromMemory( logData, req, messages, endVersion ); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } Version poppedVer = poppedVersion(logData, oldTag); @@ -960,7 +960,7 @@ namespace oldTLog { reply.messages = messages.toStringRef(); reply.end = endVersion; } - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address[0]); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); if(req.sequence.present()) { auto& trackerData = self->peekTracker[peekId]; diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index fd5a2b212f..8dc0b5f7b7 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -75,7 +75,7 @@ ACTOR Future resolveBatch( state Optional debugID; // The first request (prevVersion < 0) comes from the master - state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().address[0] : NetworkAddress(); + state NetworkAddress proxyAddress = req.prevVersion >= 0 ? req.reply.getEndpoint().getPrimaryAddress() : NetworkAddress(); state ProxyRequestsInfo &proxyInfo = self->proxyInfoMap[proxyAddress]; if(req.debugID.present()) { diff --git a/fdbserver/ResolverInterface.h b/fdbserver/ResolverInterface.h index 89ee4cb556..63ba30a2d2 100644 --- a/fdbserver/ResolverInterface.h +++ b/fdbserver/ResolverInterface.h @@ -41,7 +41,7 @@ struct ResolverInterface { std::string toString() const { return id().shortString(); } bool operator == ( ResolverInterface const& r ) const { return id() == r.id(); } bool operator != ( ResolverInterface const& r ) const { return id() != r.id(); } - NetworkAddress address() const { return resolve.getEndpoint().address[0]; } + NetworkAddress address() const { return resolve.getEndpoint().getPrimaryAddress(); } void initEndpoints() { metrics.getEndpoint( TaskResolutionMetrics ); split.getEndpoint( TaskResolutionMetrics ); diff --git a/fdbserver/RestoreInterface.h b/fdbserver/RestoreInterface.h index 97fa637663..080135015c 100644 --- a/fdbserver/RestoreInterface.h +++ b/fdbserver/RestoreInterface.h @@ -33,7 +33,7 @@ struct RestoreInterface { bool operator == (RestoreInterface const& r) const { return id() == r.id(); } bool operator != (RestoreInterface const& r) const { return id() != r.id(); } UID id() const { return test.getEndpoint().token; } - NetworkAddress address() const { return test.getEndpoint().address[0]; } + NetworkAddress address() const { return test.getEndpoint().getPrimaryAddress(); } void initEndpoints() { test.getEndpoint( TaskClusterController ); diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index bff1697a29..99b7744a8c 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -53,7 +53,7 @@ struct TLogInterface { UID getSharedTLogID() const { return sharedTLogID; } std::string toString() const { return id().shortString(); } bool operator == ( TLogInterface const& r ) const { return id() == r.id(); } - NetworkAddress address() const { return peekMessages.getEndpoint().address[0]; } + NetworkAddress address() const { return peekMessages.getEndpoint().getPrimaryAddress(); } void initEndpoints() { getQueuingMetrics.getEndpoint( TaskTLogQueuingMetrics ); popMessages.getEndpoint( TaskTLogPop ); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index cd97e7121f..daa04983bf 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1036,7 +1036,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere persistTagMessagesKey(logData->logId, req.tag, req.begin), persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? printable(kv1[0].key) : "").detail("Tag2ResultsLast", kv2.size() ? printable(kv2[0].key) : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto &kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -1050,7 +1050,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere messages.serializeBytes( messages2.toStringRef() ); } else { peekMessagesFromMemory( logData, req, messages, endVersion ); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address[0]).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; @@ -1059,7 +1059,7 @@ ACTOR Future tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere reply.messages = messages.toStringRef(); reply.end = endVersion; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address[0]); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); if(req.sequence.present()) { auto& trackerData = self->peekTracker[peekId]; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 36dade8a19..a47504778f 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -2075,14 +2075,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedget().present() || req.myInterface.commit.getEndpoint() != logServers[pos]->get().interf().commit.getEndpoint()) logServers[pos]->setUnconditional( OptionalInterface(req.myInterface) ); lastReply[req.myInterface.id()].send(false); lastReply[req.myInterface.id()] = req.reply; } else { - TraceEvent("TLogJoinedMeUnknown", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().address[0].toString()); + TraceEvent("TLogJoinedMeUnknown", dbgid).detail("TLog", req.myInterface.id()).detail("Address", req.myInterface.commit.getEndpoint().getPrimaryAddress().toString()); req.reply.send(true); } } diff --git a/fdbserver/WorkerInterface.h b/fdbserver/WorkerInterface.h index eca2d05658..b7eb777774 100644 --- a/fdbserver/WorkerInterface.h +++ b/fdbserver/WorkerInterface.h @@ -55,7 +55,7 @@ struct WorkerInterface { TesterInterface testerInterface; UID id() const { return tLog.getEndpoint().token; } - NetworkAddress address() const { return tLog.getEndpoint().address[0]; } + NetworkAddress address() const { return tLog.getEndpoint().getPrimaryAddress(); } WorkerInterface() {} WorkerInterface( LocalityData locality ) : locality( locality ) {} diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index f5253ae4f2..2e84668b4c 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1456,15 +1456,21 @@ int main(int argc, char* argv[]) { tlsOptions->register_network(); #endif - if (role == FDBD || role == NetworkTestServer || role == Restore) { - try { - listenErrors.push_back(FlowTransport::transport().bind(publicAddresses[0], listenAddresses[0])); - if (listenErrors[0].isReady()) listenErrors[0].get(); - } catch (Error& e) { - TraceEvent("BindError").error(e); - fprintf(stderr, "Error initializing networking with public address %s and listen address %s (%s)\n", publicAddresses[0].toString().c_str(), listenAddresses[0].toString().c_str(), e.what()); - printHelpTeaser(argv[0]); - flushAndExit(FDB_EXIT_ERROR); + if (role == FDBD || role == NetworkTestServer) { + for (int ii = 0; ii < publicAddresses.size(); ++ii) { + const NetworkAddress& publicAddress = publicAddresses[ii]; + const NetworkAddress& listenAddress = listenAddresses[ii]; + try { + const Future& errorF = FlowTransport::transport().bind(publicAddress, listenAddress); + listenErrors.push_back(errorF); + if (errorF.isReady()) errorF.get(); + } catch (Error& e) { + TraceEvent("BindError").error(e); + fprintf(stderr, "Error initializing networking with public address %s and listen address %s (%s)\n", + publicAddress.toString().c_str(), listenAddress.toString().c_str(), e.what()); + printHelpTeaser(argv[0]); + flushAndExit(FDB_EXIT_ERROR); + } } } diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 7d413dbcbe..aa57a63ffd 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -704,7 +704,7 @@ ACTOR Future runWorkload( Database cx, std::vector< Test TraceEvent(SevError, "TestFailure") .error(metricTasks[i].getError()) .detail("Reason", "Metrics not retrieved") - .detail("From", workloads[i].metrics.getEndpoint().address[0]); + .detail("From", workloads[i].metrics.getEndpoint().getPrimaryAddress()); } } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 6ab2b1e8dc..71936ecc8c 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -511,7 +511,7 @@ ACTOR Future monitorServerDBInfo( ReferenceonChange() ) ) { if(ccInterface->get().present()) - TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().address[0]); + TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress()); } } } diff --git a/fdbserver/workloads/Ping.actor.cpp b/fdbserver/workloads/Ping.actor.cpp index 1f0d6e533e..149970df29 100644 --- a/fdbserver/workloads/Ping.actor.cpp +++ b/fdbserver/workloads/Ping.actor.cpp @@ -150,7 +150,7 @@ struct PingWorkload : TestWorkload { loop { wait( poisson( &lastTime, self->actorCount / self->operationsPerSecond ) ); auto& peer = g_random->randomChoice(peers); - state NetworkAddress addr = peer.getEndpoint().address[0]; + state NetworkAddress addr = peer.getEndpoint().getPrimaryAddress(); state double before = now(); LoadedPingRequest req; @@ -251,7 +251,7 @@ struct PingWorkload : TestWorkload { req.payload = self->payloadOut; req.loadReply = true; replies.push_back( success( peers[i].getReply( req ) ) ); - // replies.push_back( self->receptionLogger( self, peers[i].payloadPing.getReply( req ), peers[i].payloadPing.getEndpoint().address[0], pingId ) ); + // replies.push_back( self->receptionLogger( self, peers[i].payloadPing.getReply( req ), peers[i].payloadPing.getEndpoint().getPrimaryAddress(), pingId ) ); // peers[i].payloadPing.send( req ); // replies.push_back( self->payloadDelayer( req, peers[i].payloadPing ) ); } diff --git a/fdbserver/workloads/TargetedKill.actor.cpp b/fdbserver/workloads/TargetedKill.actor.cpp index 87ae7d3e50..5fe3b60cdd 100644 --- a/fdbserver/workloads/TargetedKill.actor.cpp +++ b/fdbserver/workloads/TargetedKill.actor.cpp @@ -65,8 +65,8 @@ struct TargetedKillWorkload : TestWorkload { int killed = 0; for( int i = 0; i < workers.size(); i++ ) { - if( workers[i].first.master.getEndpoint().address[0] == address || - ( self->killAllMachineProcesses && workers[i].first.master.getEndpoint().address[0].ip == address.ip && workers[i].second != ProcessClass::TesterClass ) ) { + if( workers[i].first.master.getEndpoint().getPrimaryAddress() == address || + ( self->killAllMachineProcesses && workers[i].first.master.getEndpoint().getPrimaryAddress().ip == address.ip && workers[i].second != ProcessClass::TesterClass ) ) { TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].first.id()); workers[i].first.clientInterface.reboot.send( RebootRequest() ); } @@ -94,7 +94,7 @@ struct TargetedKillWorkload : TestWorkload { for( int i = 0; i < proxies->size(); i++) { MasterProxyInterface mpi = proxies->getInterface(o); machine = mpi.address(); - if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0]) + if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress()) break; o = ++o%proxies->size(); } @@ -105,7 +105,7 @@ struct TargetedKillWorkload : TestWorkload { for( int i = 0; i < tlogs.size(); i++) { TLogInterface tli = tlogs[o]; machine = tli.address(); - if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0]) + if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress()) break; o = ++o%tlogs.size(); } @@ -115,13 +115,13 @@ struct TargetedKillWorkload : TestWorkload { for( int i = 0; i < storageServers.size(); i++) { StorageServerInterface ssi = storageServers[o]; machine = ssi.address(); - if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0]) + if(machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress()) break; o = ++o%storageServers.size(); } } else if( self->machineToKill == "clustercontroller" || self->machineToKill == "cc" ) { - machine = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().address[0]; + machine = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress(); } TraceEvent("IsolatedMark").detail("TargetedMachine", machine).detail("Role", self->machineToKill);