From 57832e625d956135644221e571f4a6405b208266 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Tue, 26 Feb 2019 18:04:03 -0800 Subject: [PATCH] net: Support IPv6 #963 - NetworkAddress now contains IPAddress object which can be either IPv4 or IPv6 address. 128bits are used even for IPv4 addresses, however only 32bits are used when using/serializing IPv4 address. - ConnectPacket is updated to store IPv6 address. Backward compatible with old format since the first 32bits of IP address field is used for serialization of IPv4. - Mainly updates rest of the code to use IPAddress structure instead of plain uint32_t. - IPv6 address/pair ports should be represented as `[ip]:port` as per convention. This applies to both cluster files and command line arguments. --- fdbcli/fdbcli.actor.cpp | 4 +- fdbclient/AutoPublicAddress.cpp | 20 +++- fdbclient/FDBTypes.h | 34 ++---- fdbclient/ManagementAPI.actor.cpp | 6 +- fdbclient/MonitorLeader.actor.cpp | 22 ++++ fdbclient/NativeAPI.actor.cpp | 23 ++-- fdbclient/SystemData.cpp | 2 +- fdbrpc/FlowTransport.actor.cpp | 79 +++++++++---- fdbrpc/TLSConnection.actor.cpp | 2 +- fdbrpc/sim2.actor.cpp | 32 ++--- fdbrpc/simulator.h | 12 +- fdbserver/Status.actor.cpp | 16 ++- fdbserver/fdbserver.actor.cpp | 4 +- fdbserver/workloads/CpuProfiler.actor.cpp | 3 +- .../workloads/RemoveServersSafely.actor.cpp | 2 +- fdbserver/workloads/SaveAndKill.actor.cpp | 6 +- flow/Net2.actor.cpp | 28 +++-- flow/Platform.cpp | 24 ++-- flow/Platform.h | 4 +- flow/SystemMonitor.cpp | 11 +- flow/SystemMonitor.h | 9 +- flow/TDMetric.actor.h | 3 +- flow/Trace.cpp | 9 +- flow/network.cpp | 110 ++++++++++++++++-- flow/network.h | 81 +++++++++++-- 25 files changed, 387 insertions(+), 159 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 59f964eae8..3cb37d6e06 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -2031,7 +2031,7 @@ ACTOR Future exclude( Database db, std::vector tokens, Referenc wait( makeInterruptable(waitForExcludedServers(db,addresses)) ); std::vector workers = wait( makeInterruptable(getWorkers(db)) ); - std::map> workerPorts; + std::map> workerPorts; for(auto addr : workers) workerPorts[addr.address.ip].insert(addr.address.port); @@ -2050,7 +2050,7 @@ ACTOR Future exclude( Database db, std::vector tokens, Referenc "excluded the correct machines or processes before removing them from the cluster:\n"); for(auto addr : absentExclusions) { if(addr.port == 0) - printf(" %s\n", toIPString(addr.ip).c_str()); + printf(" %s\n", addr.ip.toString().c_str()); else printf(" %s\n", addr.toString().c_str()); } diff --git a/fdbclient/AutoPublicAddress.cpp b/fdbclient/AutoPublicAddress.cpp index 11d536a181..93e5b25f92 100644 --- a/fdbclient/AutoPublicAddress.cpp +++ b/fdbclient/AutoPublicAddress.cpp @@ -28,13 +28,21 @@ #include "fdbclient/CoordinationInterface.h" -uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ) { +IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs) { try { - boost::asio::io_service ioService; - boost::asio::ip::udp::socket socket(ioService); - boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4(ccs.coordinators()[0].ip), ccs.coordinators()[0].port); + using namespace boost::asio; + + io_service ioService; + ip::udp::socket socket(ioService); + + const auto& coordAddr = ccs.coordinators()[0]; + const auto boostIp = coordAddr.ip.isV6() ? ip::address(ip::address_v6(coordAddr.ip.toV6())) + : ip::address(ip::address_v4(coordAddr.ip.toV4())); + + ip::udp::endpoint endpoint(boostIp, coordAddr.port); socket.connect(endpoint); - auto ip = socket.local_endpoint().address().to_v4().to_ulong(); + IPAddress ip = coordAddr.ip.isV6() ? IPAddress(socket.local_endpoint().address().to_v6().to_bytes()) + : IPAddress(socket.local_endpoint().address().to_v4().to_ulong()); socket.close(); return ip; @@ -43,4 +51,4 @@ uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ) { fprintf(stderr, "Error determining public address: %s\n", e.what()); throw bind_failed(); } -} \ No newline at end of file +} diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 090802472d..0dc1bccb83 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -633,33 +633,21 @@ struct LogMessageVersion { }; struct AddressExclusion { - uint32_t ip; + IPAddress ip; int port; AddressExclusion() : ip(0), port(0) {} - explicit AddressExclusion( uint32_t ip ) : ip(ip), port(0) {} - explicit AddressExclusion( uint32_t ip, int port ) : ip(ip), port(port) {} + explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {} + explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {} - explicit AddressExclusion (std::string s) { - int a,b,c,d,p,count=-1; - if (sscanf(s.c_str(), "%d.%d.%d.%d:%d%n", &a,&b,&c,&d, &p, &count) == 5 && count == s.size()) { - ip = (a<<24)+(b<<16)+(c<<8)+d; - port = p; - } - else if (sscanf(s.c_str(), "%d.%d.%d.%d%n", &a,&b,&c,&d, &count) == 4 && count == s.size()) { - ip = (a<<24)+(b<<16)+(c<<8)+d; - port = 0; - } - else { - throw connection_string_invalid(); - } + bool operator<(AddressExclusion const& r) const { + if (ip != r.ip) return ip < r.ip; + return port < r.port; } - - bool operator< (AddressExclusion const& r) const { if (ip != r.ip) return ip < r.ip; return port>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff ); - if (!isWholeMachine()) - as += format(":%d", port); + std::string as = format("%s", ip.toString().c_str()); + const char* formatPatt = ip.isV6() ? "[%s]:%d" : "%s:%d"; + if (!isWholeMachine()) return format(formatPatt, as.c_str(), port); return as; } diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index df17d6cc93..fa54b5b391 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1730,7 +1730,7 @@ TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") { data.locality.set(LiteralStringRef("rack"), StringRef(rack)); data.locality.set(LiteralStringRef("zoneid"), StringRef(rack)); data.locality.set(LiteralStringRef("machineid"), StringRef(machineId)); - data.address.ip = i; + data.address.ip = IPAddress(i); workers.push_back(data); } @@ -1749,8 +1749,8 @@ TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") { LiteralStringRef("machineid") }); for(auto worker = chosen.begin(); worker != chosen.end(); worker++) { - ASSERT(worker->ip < workers.size()); - LocalityData data = workers[worker->ip].locality; + ASSERT(worker->ip.toV4() < workers.size()); + LocalityData data = workers[worker->ip.toV4()].locality; for(auto field = fields.begin(); field != fields.end(); field++) { chosenValues[*field].insert(data.get(*field).get()); } diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 64482da09d..c07ed5ef73 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -214,6 +214,28 @@ TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") { ASSERT( input == cs.toString() ); } + { + input = "0xxdeadbeef:100100100@[::1]:1234,[::1]:1235"; + std::string commented("#start of comment\n"); + commented += input; + commented += "\n"; + commented += "# asdfasdf ##"; + + ClusterConnectionString cs(commented); + ASSERT(input == cs.toString()); + } + + { + input = "0xxdeadbeef:100100100@[abcd:dcba::1]:1234,[abcd:dcba::abcd:1]:1234"; + std::string commented("#start of comment\n"); + commented += input; + commented += "\n"; + commented += "# asdfasdf ##"; + + ClusterConnectionString cs(commented); + ASSERT(input == cs.toString()); + } + return Void(); } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 61e44640c5..f4c982523d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -749,7 +749,7 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, Loc return Database::createDatabase(rccf, apiVersion, clientLocality); } -extern uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ); +extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs); Cluster::Cluster( Reference connFile, int apiVersion ) : clusterInterface(new AsyncVar>()) @@ -791,7 +791,7 @@ void Cluster::init( Reference connFile, bool startClientI .detailf("ImageOffset", "%p", platform::getImageOffset()) .trackLatest("ClientStart"); - initializeSystemMonitorMachineState(SystemMonitorMachineState(publicIP)); + initializeSystemMonitorMachineState(SystemMonitorMachineState(IPAddress(publicIP))); systemMonitor(); uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) ); @@ -1066,24 +1066,15 @@ bool GetRangeLimits::hasSatisfiedMinRows() { return hasByteLimit() && minRows == 0; } - AddressExclusion AddressExclusion::parse( StringRef const& key ) { //Must not change: serialized to the database! - std::string s = key.toString(); - int a,b,c,d,port,count=-1; - if (sscanf(s.c_str(), "%d.%d.%d.%d%n", &a,&b,&c,&d, &count)<4) { + try { + auto addr = NetworkAddress::parse(key.toString()); + return AddressExclusion(addr.ip, addr.port); + } catch (Error& e) { TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", printable(key)); return AddressExclusion(); } - s = s.substr(count); - uint32_t ip = (a<<24)+(b<<16)+(c<<8)+d; - if (!s.size()) - return AddressExclusion( ip ); - if (sscanf( s.c_str(), ":%d%n", &port, &count ) < 1 || count != s.size()) { - TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", printable(key)); - return AddressExclusion(); - } - return AddressExclusion( ip, port ); } Future> getRange( @@ -2038,7 +2029,7 @@ ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key Standalone> addresses; for (auto i : ssi) { - std::string ipString = toIPString(i.address().ip); + std::string ipString = i.address().ip.toString(); char* c_string = new (addresses.arena()) char[ipString.length()+1]; strcpy(c_string, ipString.c_str()); addresses.push_back(addresses.arena(), c_string); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index ea4be9c8d8..cb54165469 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -374,7 +374,7 @@ const AddressExclusion decodeExcludedServersKey( KeyRef const& key ) { } std::string encodeExcludedServersKey( AddressExclusion const& addr ) { //FIXME: make sure what's persisted here is not affected by innocent changes elsewhere - std::string as = format( "%d.%d.%d.%d", (addr.ip>>24)&0xff, (addr.ip>>16)&0xff, (addr.ip>>8)&0xff, addr.ip&0xff ); + std::string as = format("%s", addr.ip.toString().c_str()); //ASSERT( StringRef(as).endsWith(LiteralStringRef(":0")) == (addr.port == 0) ); if (!addr.isWholeMachine()) as += format(":%d", addr.port); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 3e015d2995..930d5113ee 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -194,10 +194,7 @@ public: }; #define CONNECT_PACKET_V0 0x0FDB00A444020001LL -#define CONNECT_PACKET_V1 0x0FDB00A446030001LL #define CONNECT_PACKET_V0_SIZE 14 -#define CONNECT_PACKET_V1_SIZE 22 -#define CONNECT_PACKET_V2_SIZE 26 #pragma pack( push, 1 ) struct ConnectPacket { @@ -205,16 +202,44 @@ struct ConnectPacket { uint64_t protocolVersion; // Expect currentProtocolVersion uint16_t canonicalRemotePort; // Port number to reconnect to the originating process uint64_t connectionId; // Multi-version clients will use the same Id for both connections, other connections will set this to zero. Added at protocol Version 0x0FDB00A444020001. - uint32_t canonicalRemoteIp; // IP Address to reconnect to the originating process + union { + uint32_t v4; + uint8_t v6[16]; + } canonicalRemoteIp46; // IP Address to reconnect to the originating process - size_t minimumSize() { - if (protocolVersion < CONNECT_PACKET_V0) return CONNECT_PACKET_V0_SIZE; - if (protocolVersion < CONNECT_PACKET_V1) return CONNECT_PACKET_V1_SIZE; - return CONNECT_PACKET_V2_SIZE; + IPAddress canonicalRemoteIp() const { + if (isIPv6()) { + IPAddress::IPAddressStore ip; + memcpy(ip.data(), &canonicalRemoteIp46.v6, ip.size()); + return IPAddress(ip); + } else { + return IPAddress(canonicalRemoteIp46.v4); + } + } + + void setCanonicalRemoteIp(const IPAddress& ip) { + if (ip.isV6()) { + memcpy(&canonicalRemoteIp46.v6, ip.toV6().data(), 16); + } else { + canonicalRemoteIp46.v4 = ip.toV4(); + } + } + + bool isIPv6() const { return connectPacketLength == (sizeof(ConnectPacket) - sizeof(connectPacketLength)); } + + uint32_t totalPacketSize() const { return connectPacketLength + sizeof(connectPacketLength); } + + template + void serialize(Ar& ar) { + serializer(ar, connectPacketLength, protocolVersion, canonicalRemotePort, connectionId); + if (isIPv6()) { + ar.serializeBytes(&canonicalRemoteIp46.v6, sizeof(canonicalRemoteIp46.v6)); + } else { + serializer(ar, canonicalRemoteIp46.v4); + } } }; -static_assert( sizeof(ConnectPacket) == CONNECT_PACKET_V2_SIZE, "ConnectPacket packed incorrectly" ); #pragma pack( pop ) ACTOR static Future connectionReader(TransportData* transport, Reference conn, Peer* peer, @@ -256,23 +281,24 @@ struct Peer : NonCopyable { for(auto& addr : transport->localAddresses) { if(addr.isTLS() == destination.isTLS()) { pkt.canonicalRemotePort = addr.port; - pkt.canonicalRemoteIp = addr.ip; + pkt.setCanonicalRemoteIp(addr.ip); + pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength) - (addr.isV6() ? 0 : 12); found = true; break; } } if (!found) { pkt.canonicalRemotePort = 0; // a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it - pkt.canonicalRemoteIp = 0; + pkt.setCanonicalRemoteIp(IPAddress(0)); + pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength); } - pkt.connectPacketLength = sizeof(pkt)-sizeof(pkt.connectPacketLength); pkt.protocolVersion = currentProtocolVersion; pkt.connectionId = transport->transportId; PacketBuffer* pb_first = new PacketBuffer; PacketWriter wr( pb_first, NULL, Unversioned() ); - wr.serializeBinaryItem(pkt); + pkt.serialize(wr); unsent.prependWriteBuffer(pb_first, wr.finish()); } @@ -647,7 +673,7 @@ ACTOR static Future connectionReader( ConnectPacket* p = (ConnectPacket*)unprocessed_begin; uint64_t connectionId = 0; - int32_t connectPacketSize = p->minimumSize(); + int32_t connectPacketSize = p->totalPacketSize(); if ( unprocessed_end-unprocessed_begin >= connectPacketSize ) { if(p->protocolVersion >= 0x0FDB00A444020001) { connectionId = p->connectionId; @@ -655,18 +681,22 @@ ACTOR static Future connectionReader( if( (p->protocolVersion & compatibleProtocolVersionMask) != (currentProtocolVersion & compatibleProtocolVersionMask) ) { incompatibleProtocolVersionNewer = p->protocolVersion > currentProtocolVersion; - NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress(); + NetworkAddress addr = p->canonicalRemotePort + ? NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort) + : conn->getPeerAddress(); if(connectionId != 1) addr.port = 0; if(!transport->multiVersionConnections.count(connectionId)) { if(now() - transport->lastIncompatibleMessage > FLOW_KNOBS->CONNECTION_REJECTED_MESSAGE_DELAY) { TraceEvent(SevWarn, "ConnectionRejected", conn->getDebugID()) - .detail("Reason", "IncompatibleProtocolVersion") - .detail("LocalVersion", currentProtocolVersion) - .detail("RejectedVersion", p->protocolVersion) - .detail("VersionMask", compatibleProtocolVersionMask) - .detail("Peer", p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress()) - .detail("ConnectionId", connectionId); + .detail("Reason", "IncompatibleProtocolVersion") + .detail("LocalVersion", currentProtocolVersion) + .detail("RejectedVersion", p->protocolVersion) + .detail("VersionMask", compatibleProtocolVersionMask) + .detail("Peer", p->canonicalRemotePort ? NetworkAddress(p->canonicalRemoteIp(), + p->canonicalRemotePort) + : conn->getPeerAddress()) + .detail("ConnectionId", connectionId); transport->lastIncompatibleMessage = now(); } if(!transport->incompatiblePeers.count(addr)) { @@ -699,7 +729,9 @@ ACTOR static Future connectionReader( peerProtocolVersion = p->protocolVersion; if (peer != nullptr) { // Outgoing connection; port information should be what we expect - TraceEvent("ConnectedOutgoing").suppressFor(1.0).detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) ); + TraceEvent("ConnectedOutgoing") + .suppressFor(1.0) + .detail("PeerAddr", NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort)); peer->compatible = compatible; peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer; if (!compatible) { @@ -709,7 +741,8 @@ ACTOR static Future connectionReader( ASSERT( p->canonicalRemotePort == peerAddress.port ); } else { if (p->canonicalRemotePort) { - peerAddress = NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort, true, peerAddress.isTLS() ); + peerAddress = NetworkAddress(p->canonicalRemoteIp(), p->canonicalRemotePort, true, + peerAddress.isTLS()); } peer = transport->getPeer(peerAddress); peer->compatible = compatible; diff --git a/fdbrpc/TLSConnection.actor.cpp b/fdbrpc/TLSConnection.actor.cpp index 0b49e6ea3d..83f0a6cd2e 100644 --- a/fdbrpc/TLSConnection.actor.cpp +++ b/fdbrpc/TLSConnection.actor.cpp @@ -177,7 +177,7 @@ Future> TLSNetworkConnections::connect( NetworkAddress to // addresses against certificates, so we have our own peer verifying logic // to use. For FDB<->external system connections, we can use the standard // hostname-based certificate verification logic. - if (host.empty() || host == toIPString(toAddr.ip)) + if (host.empty() || host == toAddr.ip.toString()) return wrap(options->get_policy(TLSOptions::POLICY_VERIFY_PEERS), true, network->connect(clearAddr), std::string("")); else return wrap( options->get_policy(TLSOptions::POLICY_NO_VERIFY_PEERS), true, network->connect( clearAddr ), host ); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index 4d63b5496d..c729675769 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -135,28 +135,29 @@ struct SimClogging { return t - tnow; } - void clogPairFor( uint32_t from, uint32_t to, double t ) { + void clogPairFor(const IPAddress& from, const IPAddress& to, double t) { auto& u = clogPairUntil[ std::make_pair( from, to ) ]; u = std::max(u, now() + t); } - void clogSendFor( uint32_t from, double t ) { + void clogSendFor(const IPAddress& from, double t) { auto& u = clogSendUntil[from]; u = std::max(u, now() + t); } - void clogRecvFor( uint32_t from, double t ) { + void clogRecvFor(const IPAddress& from, double t) { auto& u = clogRecvUntil[from]; u = std::max(u, now() + t); } - double setPairLatencyIfNotSet( uint32_t from, uint32_t to, double t ) { + double setPairLatencyIfNotSet(const IPAddress& from, const IPAddress& to, double t) { auto i = clogPairLatency.find( std::make_pair(from,to) ); if (i == clogPairLatency.end()) i = clogPairLatency.insert( std::make_pair( std::make_pair(from,to), t ) ).first; return i->second; } + private: - std::map< uint32_t, double > clogSendUntil, clogRecvUntil; - std::map< std::pair, double > clogPairUntil; - std::map< std::pair, double > clogPairLatency; + std::map clogSendUntil, clogRecvUntil; + std::map, double> clogPairUntil; + std::map, double> clogPairLatency; double halfLatency() { double a = g_random->random01(); const double pFast = 0.999; @@ -789,9 +790,10 @@ public: Reference myc( new Sim2Conn( getCurrentProcess() ) ); Reference peerc( new Sim2Conn( peerp ) ); + // TODO Support IPv6 myc->connect(peerc, toAddr); - peerc->connect(myc, NetworkAddress( getCurrentProcess()->address.ip + g_random->randomInt(0,256), - g_random->randomInt(40000, 60000) )); + IPAddress localIp(getCurrentProcess()->address.ip.toV4() + g_random->randomInt(0, 256)); + peerc->connect(myc, NetworkAddress(localIp, g_random->randomInt(40000, 60000))); ((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*g_random->random01(), Reference(peerc) ); return onConnect( ::delay(0.5*g_random->random01()), myc ); @@ -1499,22 +1501,24 @@ public: return (kt == ktMin); } - virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) { + virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) { if (mode == ClogDefault) { double a = g_random->random01(); if ( a < 0.3 ) mode = ClogSend; else if (a < 0.6 ) mode = ClogReceive; else mode = ClogAll; } - TraceEvent("ClogInterface").detail("IP", toIPString(ip)).detail("Delay", seconds) - .detail("Queue", mode==ClogSend?"Send":mode==ClogReceive?"Receive":"All"); + TraceEvent("ClogInterface") + .detail("IP", ip.toString()) + .detail("Delay", seconds) + .detail("Queue", mode == ClogSend ? "Send" : mode == ClogReceive ? "Receive" : "All"); if (mode == ClogSend || mode==ClogAll) g_clogging.clogSendFor( ip, seconds ); if (mode == ClogReceive || mode==ClogAll) g_clogging.clogRecvFor( ip, seconds ); } - virtual void clogPair( uint32_t from, uint32_t to, double seconds ) { + virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) { g_clogging.clogPairFor( from, to, seconds ); } virtual std::vector getAllProcesses() const { @@ -1653,7 +1657,7 @@ public: INetwork *net2; //Map from machine IP -> machine disk space info - std::map diskSpaceMap; + std::map diskSpaceMap; //Whether or not yield has returned true during the current iteration of the run loop bool yielded; diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 8b76289f0a..829b2c3554 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -114,8 +114,12 @@ public: std::string toString() const { const NetworkAddress& address = addresses[0]; - return format("name: %s address: %d.%d.%d.%d:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d", - name, (address.ip>>24)&0xff, (address.ip>>16)&0xff, (address.ip>>8)&0xff, address.ip&0xff, address.port, (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), startingClass.toString().c_str(), excluded, cleared); + return format( + "name: %s address: %s:%d zone: %s datahall: %s class: %s excluded: %d cleared: %d", name, + address.ip.toString().c_str(), address.port, + (locality.zoneId().present() ? locality.zoneId().get().printable().c_str() : "[unset]"), + (locality.dataHallId().present() ? locality.dataHallId().get().printable().c_str() : "[unset]"), + startingClass.toString().c_str(), excluded, cleared); } // Members not for external use @@ -256,8 +260,8 @@ public: allSwapsDisabled = true; } - virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) = 0; - virtual void clogPair( uint32_t from, uint32_t to, double seconds ) = 0; + virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0; + virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0; virtual std::vector getAllProcesses() const = 0; virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) = 0; virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0; diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index d27358fa33..c14f18f5f0 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -289,7 +289,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vectorsecond; try { - std::string address = toIPString(it->first.ip); + std::string address = it->first.ip.toString(); // We will use the "physical" caluculated machine ID here to limit exposure to machineID repurposing std::string machineId = event.getValue("MachineID"); @@ -1254,9 +1254,15 @@ namespace std { size_t operator()(const NetworkAddress& na) const { - return (na.ip << 16) + na.port; - } - }; + int result = 0; + if (na.ip.isV6()) { + result = hashlittle(na.ip.toV6().data(), 16, 0); + } else { + result = na.ip.toV4(); + } + return (result << 16) + na.port; + } + }; } ACTOR template @@ -1667,7 +1673,7 @@ static JsonBuilderArray getClientIssuesAsMessages( ProcessIssuesMap const& _issu std::map> deduplicatedIssues; for(auto i : issues) { - deduplicatedIssues[i.second.first].push_back(format("%s:%d", toIPString(i.first.ip).c_str(), i.first.port)); + deduplicatedIssues[i.second.first].push_back(format("%s:%d", i.first.ip.toString().c_str(), i.first.port)); } for (auto i : deduplicatedIssues) { diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index abb80c817b..0ed260e8bc 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -170,7 +170,7 @@ extern void copyTest(); extern void versionedMapTest(); extern void createTemplateDatabase(); // FIXME: this really belongs in a header somewhere since it is actually used. -extern uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs ); +extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs); extern const char* getHGVersion(); @@ -776,7 +776,7 @@ std::pair buildNetworkAddresses(const Cl if (autoPublicAddress) { try { const NetworkAddress& parsedAddress = NetworkAddress::parse("0.0.0.0:" + publicAddressStr.substr(5)); - uint32_t publicIP = determinePublicIPAutomatically(connectionFile.getConnectionString()); + const IPAddress publicIP = determinePublicIPAutomatically(connectionFile.getConnectionString()); publicNetworkAddresses.emplace_back(publicIP, parsedAddress.port, true, parsedAddress.isTLS()); } catch (Error& e) { fprintf(stderr, "ERROR: could not determine public address automatically from `%s': %s\n", publicAddressStr.c_str(), e.what()); diff --git a/fdbserver/workloads/CpuProfiler.actor.cpp b/fdbserver/workloads/CpuProfiler.actor.cpp index a4d072203d..a8208d61d3 100644 --- a/fdbserver/workloads/CpuProfiler.actor.cpp +++ b/fdbserver/workloads/CpuProfiler.actor.cpp @@ -90,7 +90,8 @@ struct CpuProfilerWorkload : TestWorkload req.duration = 0; //unused //The profiler output name will be the ip.port.prof - req.outputFile = StringRef(toIPString(self->profilingWorkers[i].address().ip) + "." + format("%d", self->profilingWorkers[i].address().port) + ".profile.bin"); + req.outputFile = StringRef(self->profilingWorkers[i].address().ip.toString() + "." + + format("%d", self->profilingWorkers[i].address().port) + ".profile.bin"); replies.push_back(self->profilingWorkers[i].clientInterface.profiler.tryGetReply(req)); } diff --git a/fdbserver/workloads/RemoveServersSafely.actor.cpp b/fdbserver/workloads/RemoveServersSafely.actor.cpp index 6f6cda1f64..d852db53b6 100644 --- a/fdbserver/workloads/RemoveServersSafely.actor.cpp +++ b/fdbserver/workloads/RemoveServersSafely.actor.cpp @@ -65,7 +65,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { std::map>, AddressExclusion> machinesMap; // Locality Zone Id -> ip address std::vector processAddrs; // IF (killProcesses) THEN ip:port ELSE ip addresses unique list of the machines - std::map>> ip_dcid; + std::map>> ip_dcid; auto processes = getServers(); for(auto& it : processes) { AddressExclusion machineIp(it->address.ip); diff --git a/fdbserver/workloads/SaveAndKill.actor.cpp b/fdbserver/workloads/SaveAndKill.actor.cpp index b9ab47063b..3a6a8fdd69 100644 --- a/fdbserver/workloads/SaveAndKill.actor.cpp +++ b/fdbserver/workloads/SaveAndKill.actor.cpp @@ -91,13 +91,15 @@ struct SaveAndKillWorkload : TestWorkload { ini.SetValue(machineIdString, "dcUID", (process->locality.dcId().present()) ? process->locality.dcId().get().printable().c_str() : ""); ini.SetValue(machineIdString, "zoneId", (process->locality.zoneId().present()) ? process->locality.zoneId().get().printable().c_str() : ""); ini.SetValue(machineIdString, "mClass", format("%d", process->startingClass.classType()).c_str()); - ini.SetValue(machineIdString, format("ipAddr%d", process->address.port-1).c_str(), format("%d", process->address.ip).c_str()); + ini.SetValue(machineIdString, format("ipAddr%d", process->address.port - 1).c_str(), + format("%d", process->address.ip.toV4()).c_str()); ini.SetValue(machineIdString, format("%d", process->address.port-1).c_str(), process->dataFolder); ini.SetValue(machineIdString, format("c%d", process->address.port-1).c_str(), process->coordinationFolder); j++; } else { - ini.SetValue(machineIdString, format("ipAddr%d", process->address.port-1).c_str(), format("%d", process->address.ip).c_str()); + ini.SetValue(machineIdString, format("ipAddr%d", process->address.port - 1).c_str(), + format("%d", process->address.ip.toV4()).c_str()); int oldValue = machines.find(machineId)->second; ini.SetValue(machineIdString, format("%d", process->address.port-1).c_str(), process->dataFolder); ini.SetValue(machineIdString, format("c%d", process->address.port-1).c_str(), process->coordinationFolder); diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 1ea35070c4..9afea3140c 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -172,7 +172,7 @@ public: TDMetricCollection tdmetrics; double currentTime; bool stopped; - std::map< uint32_t, bool > addressOnHostCache; + std::map addressOnHostCache; uint64_t numYields; @@ -226,8 +226,16 @@ public: std::vector blobCredentialFiles; }; +static boost::asio::ip::address tcpAddress(IPAddress const& n) { + if (n.isV6()) { + return boost::asio::ip::address_v6(n.toV6()); + } else { + return boost::asio::ip::address_v4(n.toV4()); + } +} + static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) { - return tcp::endpoint( boost::asio::ip::address_v4( n.ip ), n.port ); + return tcp::endpoint(tcpAddress(n.ip), n.port); } class BindPromise { @@ -458,7 +466,9 @@ private: auto f = p.getFuture(); self->acceptor.async_accept( conn->getSocket(), peer_endpoint, std::move(p) ); wait( f ); - conn->accept( NetworkAddress(peer_endpoint.address().to_v4().to_ulong(), peer_endpoint.port()) ); + auto peer_address = peer_endpoint.address().is_v6() ? IPAddress(peer_endpoint.address().to_v6().to_bytes()) + : IPAddress(peer_endpoint.address().to_v4().to_ulong()); + conn->accept(NetworkAddress(peer_address, peer_endpoint.port())); return conn; } catch (...) { @@ -850,13 +860,14 @@ ACTOR static Future> resolveTCPEndpoint_impl( Net2 * } std::vector addrs; - + tcp::resolver::iterator end; while(iter != end) { auto endpoint = iter->endpoint(); - // Currently only ipv4 is supported by NetworkAddress auto addr = endpoint.address(); - if(addr.is_v4()) { + if (addr.is_v6()) { + addrs.push_back(NetworkAddress(IPAddress(addr.to_v6().to_bytes()), endpoint.port())); + } else { addrs.push_back(NetworkAddress(addr.to_v4().to_ulong(), endpoint.port())); } ++iter; @@ -890,9 +901,10 @@ bool Net2::isAddressOnThisHost( NetworkAddress const& addr ) { try { boost::asio::io_service ioService; boost::asio::ip::udp::socket socket(ioService); - boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4(addr.ip), 1); + boost::asio::ip::udp::endpoint endpoint(tcpAddress(addr.ip), 1); socket.connect(endpoint); - bool local = socket.local_endpoint().address().to_v4().to_ulong() == addr.ip; + bool local = addr.ip.isV6() ? socket.local_endpoint().address().to_v6().to_bytes() == addr.ip.toV6() + : socket.local_endpoint().address().to_v4().to_ulong() == addr.ip.toV4(); socket.close(); if (local) TraceEvent(SevInfo, "AddressIsOnHost").detail("Address", addr); return addressOnHostCache[ addr.ip ] = local; diff --git a/flow/Platform.cpp b/flow/Platform.cpp index 51e2e492c5..c0c9aecb44 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -499,7 +499,7 @@ void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) { } #ifdef __unixish__ -const char* getInterfaceName(uint32_t _ip) { +const char* getInterfaceName(const IPAddress& _ip) { INJECT_FAULT( platform_error, "getInterfaceName" ); static char iname[20]; @@ -514,9 +514,15 @@ const char* getInterfaceName(uint32_t _ip) { for (struct ifaddrs* iter = interfaces; iter; iter = iter->ifa_next) { if(!iter->ifa_addr) continue; - if (iter->ifa_addr->sa_family == AF_INET) { + if (iter->ifa_addr->sa_family == AF_INET && _ip.isV4()) { uint32_t ip = ntohl(((struct sockaddr_in*)iter->ifa_addr)->sin_addr.s_addr); - if (ip == _ip) { + if (ip == _ip.toV4()) { + ifa_name = iter->ifa_name; + break; + } + } else if (iter->ifa_addr->sa_family == AF_INET6 && _ip.isV6()) { + struct sockaddr_in6* ifa_addr = (struct sockaddr_in6*)iter->ifa_addr; + if (memcmp(_ip.toV6().data(), &ifa_addr->sin6_addr, 16) == 0) { ifa_name = iter->ifa_name; break; } @@ -538,8 +544,8 @@ const char* getInterfaceName(uint32_t _ip) { #endif #if defined(__linux__) -void getNetworkTraffic(uint32_t ip, uint64_t& bytesSent, uint64_t& bytesReceived, - uint64_t& outSegs, uint64_t& retransSegs) { +void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& outSegs, + uint64_t& retransSegs) { INJECT_FAULT( platform_error, "getNetworkTraffic" ); // Even though this function doesn't throw errors, the equivalents for other platforms do, and since all of our simulation testing is on Linux... const char* ifa_name = nullptr; try { @@ -746,8 +752,8 @@ dev_t getDeviceId(std::string path) { #endif #ifdef __APPLE__ -void getNetworkTraffic(uint32_t ip, uint64_t& bytesSent, uint64_t& bytesReceived, - uint64_t& outSegs, uint64_t& retransSegs) { +void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& outSegs, + uint64_t& retransSegs) { INJECT_FAULT( platform_error, "getNetworkTraffic" ); const char* ifa_name = nullptr; @@ -1095,7 +1101,7 @@ void initPdhStrings(SystemStatisticsState *state, std::string dataFolder) { } #endif -SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, SystemStatisticsState **statState) { +SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState** statState) { if( (*statState) == NULL ) (*statState) = new SystemStatisticsState(); SystemStatistics returnStats; @@ -1189,7 +1195,7 @@ SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, System uint64_t machineOutSegs = (*statState)->machineLastOutSegs; uint64_t machineRetransSegs = (*statState)->machineLastRetransSegs; - getNetworkTraffic(ip, machineNowSent, machineNowReceived, machineOutSegs, machineRetransSegs); + getNetworkTraffic(*ip, machineNowSent, machineNowReceived, machineOutSegs, machineRetransSegs); if( returnStats.initialized ) { returnStats.machineMegabitsSent = ((machineNowSent - (*statState)->machineLastSent) * 8e-6); returnStats.machineMegabitsReceived = ((machineNowReceived - (*statState)->machineLastReceived) * 8e-6); diff --git a/flow/Platform.h b/flow/Platform.h index 56b6fa9cdf..7cfbeab75f 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -245,7 +245,9 @@ struct SystemStatistics { struct SystemStatisticsState; -SystemStatistics getSystemStatistics(std::string dataFolder, uint32_t ip, SystemStatisticsState **statState); +class IPAddress; + +SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState **statState); double getProcessorTimeThread(); diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index bc7a21081a..207e5c30d7 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -43,19 +43,18 @@ void systemMonitor() { SystemStatistics getSystemStatistics() { static StatisticsState statState = StatisticsState(); + const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress(); return getSystemStatistics( - machineState.folder.present() ? machineState.folder.get() : "", - machineState.ip.present() ? machineState.ip.get() : 0, - &statState.systemState); + machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState.systemState); } #define TRACEALLOCATOR( size ) TraceEvent("MemSample").detail("Count", FastAllocator::getApproximateMemoryUnused()/size).detail("TotalSize", FastAllocator::getApproximateMemoryUnused()).detail("SampleCount", 1).detail("Hash", "FastAllocatedUnused" #size ).detail("Bt", "na") #define DETAILALLOCATORMEMUSAGE( size ) detail("TotalMemory"#size, FastAllocator::getTotalMemory()).detail("ApproximateUnusedMemory"#size, FastAllocator::getApproximateMemoryUnused()).detail("ActiveThreads"#size, FastAllocator::getActiveThreads()) SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *statState, bool machineMetrics) { - SystemStatistics currentStats = getSystemStatistics(machineState.folder.present() ? machineState.folder.get() : "", - machineState.ip.present() ? machineState.ip.get() : 0, - &statState->systemState); + const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress(); + SystemStatistics currentStats = getSystemStatistics(machineState.folder.present() ? machineState.folder.get() : "", + &ipAddr, &statState->systemState); NetworkData netData; netData.init(); if (!DEBUG_DETERMINISM && currentStats.initialized) { diff --git a/flow/SystemMonitor.h b/flow/SystemMonitor.h index a99d00b0cb..8c7d41c5ba 100644 --- a/flow/SystemMonitor.h +++ b/flow/SystemMonitor.h @@ -29,14 +29,15 @@ struct SystemMonitorMachineState { Optional folder; Optional> zoneId; Optional> machineId; - Optional ip; + Optional ip; double monitorStartTime; SystemMonitorMachineState() : monitorStartTime(0) {} - SystemMonitorMachineState(uint32_t ip) : ip(ip), monitorStartTime(0) {} - SystemMonitorMachineState(std::string folder, Optional> zoneId, Optional> machineId, uint32_t ip) - : folder(folder), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {} + explicit SystemMonitorMachineState(const IPAddress& ip) : ip(ip), monitorStartTime(0) {} + SystemMonitorMachineState(std::string folder, Optional> zoneId, + Optional> machineId, const IPAddress& ip) + : folder(folder), zoneId(zoneId), machineId(machineId), ip(ip), monitorStartTime(0) {} }; void initializeSystemMonitorMachineState(SystemMonitorMachineState machineState); diff --git a/flow/TDMetric.actor.h b/flow/TDMetric.actor.h index dfe153393d..84862aefcd 100755 --- a/flow/TDMetric.actor.h +++ b/flow/TDMetric.actor.h @@ -183,8 +183,7 @@ public: // Get and store the local address in the metric collection, but only if it is not 0.0.0.0:0 if( address.size() == 0 ) { NetworkAddress addr = g_network->getLocalAddress(); - if(addr.ip != 0 && addr.port != 0) - address = StringRef(addr.toString()); + if (addr.ip.isValid() && addr.port != 0) address = StringRef(addr.toString()); } return address.size() != 0; } diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 0fb6bcdfcd..ebbb22259a 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -333,7 +333,8 @@ public: void annotateEvent( TraceEventFields &fields ) { if(localAddress.present()) { - fields.addField("Machine", format("%d.%d.%d.%d:%d", (localAddress.get().ip>>24)&0xff, (localAddress.get().ip>>16)&0xff, (localAddress.get().ip>>8)&0xff, localAddress.get().ip&0xff, localAddress.get().port)); + fields.addField("Machine", + format("%s:%d", localAddress.get().ip.toString().c_str(), localAddress.get().port)); } fields.addField("LogGroup", logGroup); @@ -624,7 +625,7 @@ void openTraceFile(const NetworkAddress& na, uint64_t rollsize, uint64_t maxLogs if (baseOfBase.empty()) baseOfBase = "trace"; - std::string baseName = format("%s.%03d.%03d.%03d.%03d.%d", baseOfBase.c_str(), (na.ip>>24)&0xff, (na.ip>>16)&0xff, (na.ip>>8)&0xff, na.ip&0xff, na.port); + std::string baseName = format("%s.%s.%d", baseOfBase.c_str(), na.ip.toString().c_str(), na.port); g_traceLog.open( directory, baseName, logGroup, format("%lld", time(NULL)), rollsize, maxLogsSize, !g_network->isSimulated() ? na : Optional()); uncancellable(recurring(&flushTraceFile, FLOW_KNOBS->TRACE_FLUSH_INTERVAL, TaskFlushTrace)); @@ -716,7 +717,7 @@ bool TraceEvent::init() { detail("Type", type); if(g_network && g_network->isSimulated()) { NetworkAddress local = g_network->getLocalAddress(); - detailf("Machine", "%d.%d.%d.%d:%d", (local.ip>>24)&0xff, (local.ip>>16)&0xff, (local.ip>>8)&0xff, local.ip&0xff, local.port); + detailf("Machine", "%s:%d", local.ip.toString().c_str(), local.port); } detail("ID", id); if(err.isValid()) { @@ -1015,7 +1016,7 @@ void TraceBatch::dump() { std::string machine; if(g_network->isSimulated()) { NetworkAddress local = g_network->getLocalAddress(); - machine = format("%d.%d.%d.%d:%d", (local.ip>>24)&0xff,(local.ip>>16)&0xff,(local.ip>>8)&0xff,local.ip&0xff,local.port); + machine = format("%s:%d", local.ip.toString().c_str(), local.port); } for(int i = 0; i < attachBatch.size(); i++) { diff --git a/flow/network.cpp b/flow/network.cpp index 496537eacc..9e484d1dce 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -18,8 +18,54 @@ * limitations under the License. */ +#include "boost/asio.hpp" + #include "flow/network.h" #include "flow/flow.h" +#include "flow/UnitTest.h" + +IPAddress::IPAddress() : store({}), isV6addr(false) {} + +IPAddress::IPAddress(const IPAddressStore& v6addr) : store(v6addr), isV6addr(true) {} + +IPAddress::IPAddress(uint32_t v4addr) : store({}), isV6addr(false) { + uint32_t* parts = (uint32_t*)store.data(); + parts[0] = v4addr; +} + +uint32_t IPAddress::toV4() const { + const uint32_t* parts = (uint32_t*)store.data(); + return parts[0]; +} + +bool IPAddress::operator==(const IPAddress& addr) const { + return isV6addr == addr.isV6addr && store == addr.store; +} + +bool IPAddress::operator!=(const IPAddress& addr) const { + return !(*this == addr); +} + +bool IPAddress::operator<(const IPAddress& addr) const { + return isV6() == addr.isV6() ? store < addr.store : isV6() < addr.isV6(); +} + +std::string IPAddress::toString() const { + if (isV6()) { + return boost::asio::ip::address_v6(store).to_string(); + } else { + const uint32_t ip = toV4(); + return format("%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff); + } +} + +bool IPAddress::isValid() const { + if (!isV6()) { + return toV4() != 0; + } + + return std::any_of(store.begin(), store.end(), [](uint8_t part) { return part > 0; }); +} NetworkAddress NetworkAddress::parse( std::string const& s ) { bool isTLS = false; @@ -27,12 +73,31 @@ NetworkAddress NetworkAddress::parse( std::string const& s ) { if( s.size() > 4 && strcmp(s.c_str() + s.size() - 4, ":tls") == 0 ) { isTLS = true; f = s.substr(0, s.size() - 4); - } else + } else { f = s; - int a,b,c,d,port,count=-1; - if (sscanf(f.c_str(), "%d.%d.%d.%d:%d%n", &a,&b,&c,&d, &port, &count)<5 || count != f.size()) - throw connection_string_invalid(); - return NetworkAddress( (a<<24)+(b<<16)+(c<<8)+d, port, true, isTLS ); + } + + if (f[0] == '[') { + // IPv6 address/port pair is represented as "[ip]:port" + auto addrEnd = f.find_first_of(']'); + if (addrEnd == std::string::npos || f[addrEnd + 1] != ':') { + throw connection_string_invalid(); + } + + try { + auto port = std::stoi(f.substr(addrEnd + 2)); + auto addr = boost::asio::ip::address::from_string(f.substr(1, addrEnd - 1)); + ASSERT(addr.is_v6()); + return NetworkAddress(IPAddress(addr.to_v6().to_bytes()), port, true, isTLS); + } catch (...) { + throw connection_string_invalid(); + } + } else { + int a, b, c, d, port, count = -1; + if (sscanf(f.c_str(), "%d.%d.%d.%d:%d%n", &a, &b, &c, &d, &port, &count) < 5 || count != f.size()) + throw connection_string_invalid(); + return NetworkAddress((a << 24) + (b << 16) + (c << 8) + d, port, true, isTLS); + } } std::vector NetworkAddress::parseList( std::string const& addrs ) { @@ -49,11 +114,13 @@ std::vector NetworkAddress::parseList( std::string const& addrs } std::string NetworkAddress::toString() const { - return format( "%d.%d.%d.%d:%d%s", (ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff, port, isTLS() ? ":tls" : "" ); -} - -std::string toIPString(uint32_t ip) { - return format( "%d.%d.%d.%d", (ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff ); + const char* patt; + if (isV6()) { + patt = "[%s]:%d%s"; + } else { + patt = "%s:%d%s"; + } + return format(patt, ip.toString().c_str(), port, isTLS() ? ":tls" : ""); } std::string toIPVectorString(std::vector ips) { @@ -82,3 +149,26 @@ Future> INetworkConnections::connect( std::string host, s return connect(addr, host); }); } + +TEST_CASE("/flow/network/ipaddress") { + ASSERT(NetworkAddress::parse("[::1]:4800").toString() == "[::1]:4800"); + + { + auto addr = "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4800"; + auto addrParsed = NetworkAddress::parse(addr); + auto addrCompressed = "[2001:db8:85a3::8a2e:370:7334]:4800"; + ASSERT(addrParsed.isV6()); + ASSERT(!addrParsed.isTLS()); + ASSERT(addrParsed.toString() == addrCompressed); + } + + { + auto addr = "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:4800:tls"; + auto addrParsed = NetworkAddress::parse(addr); + auto addrCompressed = "[2001:db8:85a3::8a2e:370:7334]:4800:tls"; + ASSERT(addrParsed.isV6()); + ASSERT(addrParsed.isTLS()); + ASSERT(addrParsed.toString() == addrCompressed); + return Void(); + } +} diff --git a/flow/network.h b/flow/network.h index 4c3a2404df..cf4689fb84 100644 --- a/flow/network.h +++ b/flow/network.h @@ -22,6 +22,7 @@ #define FLOW_OPENNETWORK_H #pragma once +#include #include #include #include "flow/serialize.h" @@ -75,25 +76,78 @@ enum { class Void; +struct IPAddress { + // Represents both IPv4 and IPv6 address. For IPv4 addresses, + // only the first 32bits are relevant and rest are initialized to + // 0. + typedef std::array IPAddressStore; + + IPAddress(); + explicit IPAddress(const IPAddressStore& v6addr); + explicit IPAddress(uint32_t v4addr); + + bool isV6() const { return isV6addr; } + bool isV4() const { return !isV6addr; } + bool isValid() const; + + // Returns raw v4/v6 representation of address. Caller is responsible + // to call these functions safely. + uint32_t toV4() const; + const IPAddressStore& toV6() const { return store; } + + std::string toString() const; + + bool operator==(const IPAddress& addr) const; + bool operator!=(const IPAddress& addr) const; + bool operator<(const IPAddress& addr) const; + + template + void serialize(Ar& ar) { + serializer(ar, isV6addr); + if (isV6addr) { + serializer(ar, store); + } else { + uint32_t* parts = (uint32_t*)store.data(); + serializer(ar, parts[0]); + } + } + +private: + bool isV6addr; + IPAddressStore store; +}; + struct NetworkAddress { // A NetworkAddress identifies a particular running server (i.e. a TCP endpoint). - uint32_t ip; + IPAddress ip; uint16_t port; uint16_t flags; enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 }; - NetworkAddress() : ip(0), port(0), flags(FLAG_PRIVATE) {} - NetworkAddress( uint32_t ip, uint16_t port ) : ip(ip), port(port), flags(FLAG_PRIVATE) {} - NetworkAddress( uint32_t ip, uint16_t port, bool isPublic, bool isTLS ) : ip(ip), port(port), - flags( (isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0 ) ) {} + NetworkAddress() : ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE) {} + NetworkAddress(const IPAddress& address, uint16_t port, bool isPublic, bool isTLS) + : ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)) {} + NetworkAddress(uint32_t ip, uint16_t port, bool isPublic, bool isTLS) + : NetworkAddress(IPAddress(ip), port, isPublic, isTLS) {} - bool operator == (NetworkAddress const& r) const { return ip==r.ip && port==r.port && flags==r.flags; } - bool operator != (NetworkAddress const& r) const { return ip!=r.ip || port!=r.port || flags!=r.flags; } - bool operator< (NetworkAddress const& r) const { if (flags != r.flags) return flags < r.flags; if (ip != r.ip) return ip < r.ip; return port parseList( std::string const& ); @@ -101,13 +155,18 @@ struct NetworkAddress { template void serialize(Ar& ar) { - ar.serializeBinaryItem(*this); + if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { + uint32_t ipV4; + serializer(ar, ipV4, port, flags); + ip = IPAddress(ipV4); + } else { + serializer(ar, ip, port, flags); + } } }; typedef std::vector NetworkAddressList; -std::string toIPString(uint32_t ip); std::string toIPVectorString(std::vector ips); template class Future;